acmc.phen
phenotype.py module
This module provides functionality for managing phenotypes.
1""" 2phenotype.py module 3 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator # type: ignore 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28import acmc 29from acmc import trud, omop, parse, util, logging_config as lc 30 31# set up logging 32_logger = lc.setup_logger() 33 34pd.set_option("mode.chained_assignment", None) 35 36PHEN_DIR = "phen" 37"""Default phenotype directory name""" 38 39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 40"""Default phenotype directory path""" 41 42CONCEPTS_DIR = "concepts" 43"""Default concepts directory name""" 44 45MAP_DIR = "map" 46"""Default map directory name""" 47 48CONCEPT_SET_DIR = "concept-sets" 49"""Default concept set directory name""" 50 51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 52"""Default CSV concept set directory path""" 53 54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 55"""Default OMOP concept set directory path""" 56 57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 58"""List of default phenotype directories""" 59 60CONFIG_FILE = "config.yml" 61"""Default configuration filename""" 62 63VOCAB_VERSION_FILE = "vocab_version.yml" 64"""Default vocabulary version filename""" 65 66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 67"""List of semantic version increment types""" 68 69DEFAULT_VERSION_INC = "patch" 70"""Default semantic version increment type""" 71 72DEFAULT_GIT_BRANCH = "main" 73"""Default phenotype repo branch name""" 74 75SPLIT_COL_ACTION = "split_col" 76"""Split column preprocessing action type""" 77 78CODES_COL_ACTION = "codes_col" 79"""Codes column preprocessing action type""" 80 81DIVIDE_COL_ACTION = "divide_col" 82"""Divide column preprocessing action type""" 83 84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 85"""List of column preprocessing action types""" 86 87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 88"""List of supported source concept coding list file types""" 89 90# config.yaml schema 91CONFIG_SCHEMA = { 92 "phenotype": { 93 "type": "dict", 94 "required": True, 95 "schema": { 96 "version": { 97 "type": "string", 98 "required": True, 99 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 100 }, 101 "omop": { 102 "type": "dict", 103 "required": True, 104 "schema": { 105 "vocabulary_id": {"type": "string", "required": True}, 106 "vocabulary_name": {"type": "string", "required": True}, 107 "vocabulary_reference": { 108 "type": "string", 109 "required": True, 110 "regex": r"^https?://.*", # Ensures it's a URL 111 }, 112 }, 113 }, 114 "map": { 115 "type": "list", 116 "schema": { 117 "type": "string", 118 "allowed": list( 119 parse.SUPPORTED_CODE_TYPES 120 ), # Ensure only predefined values are allowed 121 }, 122 }, 123 "concept_sets": { 124 "type": "list", 125 "required": True, 126 "schema": { 127 "type": "dict", 128 "schema": { 129 "name": {"type": "string", "required": True}, 130 "files": { 131 "type": "list", 132 "required": True, 133 "schema": { 134 "type": "dict", 135 "schema": { 136 "path": {"type": "string", "required": True}, 137 "columns": {"type": "dict", "required": True}, 138 "category": { 139 "type": "string" 140 }, # Optional but must be string if present 141 "actions": { 142 "type": "dict", 143 "schema": { 144 "divide_col": {"type": "string"}, 145 "split_col": {"type": "string"}, 146 "codes_col": {"type": "string"}, 147 }, 148 }, 149 }, 150 }, 151 }, 152 "metadata": {"type": "dict", "required": False}, 153 }, 154 }, 155 }, 156 }, 157 } 158} 159"""Phenotype config.yml schema definition""" 160 161 162class PhenValidationException(Exception): 163 """Custom exception class raised when validation errors in phenotype configuration file""" 164 165 def __init__(self, message, validation_errors=None): 166 super().__init__(message) 167 self.validation_errors = validation_errors 168 169 170def _construct_git_url(remote_url: str): 171 """Constructs a git url for github or gitlab including a PAT token environment variable""" 172 # check the url 173 parsed_url = urlparse(remote_url) 174 175 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 176 # need to update this function if the URL scheme is different. 177 if "github.com" in parsed_url.netloc: 178 # get GitHub PAT from environment variable 179 auth = os.getenv("ACMC_GITHUB_PAT") 180 if not auth: 181 raise ValueError( 182 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 183 ) 184 else: 185 # get GitLab PAT from environment variable 186 auth = os.getenv("ACMC_GITLAB_PAT") 187 if not auth: 188 raise ValueError( 189 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 190 ) 191 auth = f"oauth2:{auth}" 192 193 # Construct the new URL with credentials 194 new_netloc = f"{auth}@{parsed_url.netloc}" 195 return urlunparse( 196 ( 197 parsed_url.scheme, 198 new_netloc, 199 parsed_url.path, 200 parsed_url.params, 201 parsed_url.query, 202 parsed_url.fragment, 203 ) 204 ) 205 206 207def _create_empty_git_dir(path: Path): 208 """Creates a directory with a .gitkeep file so that it's tracked in git""" 209 path.mkdir(exist_ok=True) 210 keep_path = path / ".gitkeep" 211 keep_path.touch(exist_ok=True) 212 213 214def _check_delete_dir(path: Path, msg: str) -> bool: 215 """Checks on the command line if a user wants to delete a directory 216 217 Args: 218 path (Path): path of the directory to be deleted 219 msg (str): message to be displayed to the user 220 221 Returns: 222 Boolean: True if deleted 223 """ 224 deleted = False 225 226 user_input = input(f"{msg}").strip().lower() 227 if user_input in ["yes", "y"]: 228 shutil.rmtree(path) 229 deleted = True 230 else: 231 _logger.info("Directory was not deleted.") 232 233 return deleted 234 235 236def init(phen_dir: str, remote_url: str): 237 """Initial phenotype directory as git repo with standard structure""" 238 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 239 phen_path = Path(phen_dir) 240 241 # check if directory already exists and ask user if they want to recreate it 242 if ( 243 phen_path.exists() and phen_path.is_dir() 244 ): # Check if it exists and is a directory 245 configure = _check_delete_dir( 246 phen_path, 247 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 248 ) 249 else: 250 configure = True 251 252 if not configure: 253 _logger.info(f"Exiting, phenotype not initiatised") 254 return 255 256 # Initialise repo from local or remote 257 repo: Repo 258 259 # if remote then clone the repo otherwise init a local repo 260 if remote_url != None: 261 # add PAT token to the URL 262 git_url = _construct_git_url(remote_url) 263 264 # clone the repo 265 git_cmd = git.cmd.Git() 266 git_cmd.clone(git_url, phen_path) 267 268 # open repo 269 repo = Repo(phen_path) 270 # check if there are any commits (new repo has no commits) 271 if ( 272 len(repo.branches) == 0 or repo.head.is_detached 273 ): # Handle detached HEAD (e.g., after init) 274 _logger.debug("The phen repository has no commits yet.") 275 commit_count = 0 276 else: 277 # Get the total number of commits in the default branch 278 commit_count = sum(1 for _ in repo.iter_commits()) 279 _logger.debug(f"Repo has previous commits: {commit_count}") 280 else: 281 # local repo, create the directories and init 282 phen_path.mkdir(parents=True, exist_ok=True) 283 _logger.debug(f"Phen directory '{phen_path}' has been created.") 284 repo = git.Repo.init(phen_path) 285 commit_count = 0 286 287 phen_path = phen_path.resolve() 288 # initialise empty repos 289 if commit_count == 0: 290 # create initial commit 291 initial_file_path = phen_path / "README.md" 292 with open(initial_file_path, "w") as file: 293 file.write( 294 "# Initial commit\nThis is the first commit in the phen repository.\n" 295 ) 296 repo.index.add([initial_file_path]) 297 repo.index.commit("Initial commit") 298 commit_count = 1 299 300 # Checkout the phens default branch, creating it if it does not exist 301 if DEFAULT_GIT_BRANCH in repo.branches: 302 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 303 main_branch.checkout() 304 else: 305 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 306 main_branch.checkout() 307 308 # if the phen path does not contain the config file then initialise the phen type 309 config_path = phen_path / CONFIG_FILE 310 if config_path.exists(): 311 _logger.debug(f"Phenotype configuration files already exist") 312 return 313 314 _logger.info("Creating phen directory structure and config files") 315 for d in DEFAULT_PHEN_DIR_LIST: 316 _create_empty_git_dir(phen_path / d) 317 318 # create empty phen config file 319 config = { 320 "phenotype": { 321 "version": "0.0.0", 322 "omop": { 323 "vocabulary_id": "", 324 "vocabulary_name": "", 325 "vocabulary_reference": "", 326 }, 327 "translate": [], 328 "concept_sets": [], 329 } 330 } 331 332 with open(phen_path / CONFIG_FILE, "w") as file: 333 yaml.dump( 334 config, 335 file, 336 Dumper=util.QuotedDumper, 337 default_flow_style=False, 338 sort_keys=False, 339 default_style='"', 340 ) 341 342 # add git ignore 343 ignore_content = """# Ignore SQLite database files 344*.db 345*.sqlite3 346 347# Ignore SQLite journal and metadata files 348*.db-journal 349*.sqlite3-journal 350 351# python 352.ipynb_checkpoints 353 """ 354 ignore_path = phen_path / ".gitignore" 355 with open(ignore_path, "w") as file: 356 file.write(ignore_content) 357 358 # add to git repo and commit 359 for d in DEFAULT_PHEN_DIR_LIST: 360 repo.git.add(phen_path / d) 361 repo.git.add(all=True) 362 repo.index.commit("initialised the phen git repo.") 363 364 _logger.info(f"Phenotype initialised successfully") 365 366 367def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 368 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 369 370 Args: 371 phen_dir (str): local directory path where the upstream repo is to be cloned 372 upstream_url (str): url to the upstream repo 373 upstream_version (str): version in the upstream repo to clone 374 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 375 376 Raises: 377 ValueError: if the specified version is not in the upstream repo 378 ValueError: if the upstream repo is not a valid phenotype repo 379 ValueError: if there's any other problems with Git 380 """ 381 _logger.info( 382 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 383 ) 384 385 phen_path = Path(phen_dir) 386 # check if directory already exists and ask user if they want to recreate it 387 if ( 388 phen_path.exists() and phen_path.is_dir() 389 ): # Check if it exists and is a directory 390 configure = _check_delete_dir( 391 phen_path, 392 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 393 ) 394 else: 395 configure = True 396 397 if not configure: 398 _logger.info(f"Exiting, phenotype not initiatised") 399 return 400 401 try: 402 # Clone repo 403 git_url = _construct_git_url(upstream_url) 404 repo = git.Repo.clone_from(git_url, phen_path) 405 406 # Fetch all branches and tags 407 repo.remotes.origin.fetch() 408 409 # Check if the version exists 410 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 411 if upstream_version not in available_refs: 412 raise ValueError( 413 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 414 ) 415 416 # Checkout the specified version 417 repo.git.checkout(upstream_version) 418 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 419 main_branch.checkout() 420 421 # Check if 'config.yml' exists in the root directory 422 config_path = phen_path / "config.yml" 423 if not os.path.isfile(config_path): 424 raise ValueError( 425 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 426 ) 427 428 # Validate the phenotype is compatible with the acmc tool 429 validate(str(phen_path.resolve())) 430 431 # Delete each tag locally 432 tags = repo.tags 433 for tag in tags: 434 repo.delete_tag(tag) 435 _logger.debug(f"Deleted tags from forked repo: {tag}") 436 437 # Add upstream remote 438 repo.create_remote("upstream", upstream_url) 439 remote = repo.remotes["origin"] 440 repo.delete_remote(remote) # Remove existing origin 441 442 # Optionally set a new origin remote 443 if new_origin_url: 444 git_url = _construct_git_url(new_origin_url) 445 repo.create_remote("origin", git_url) 446 repo.git.push("--set-upstream", "origin", "main") 447 448 _logger.info(f"Repository forked successfully at {phen_path}") 449 _logger.info(f"Upstream set to {upstream_url}") 450 if new_origin_url: 451 _logger.info(f"Origin set to {new_origin_url}") 452 453 except Exception as e: 454 if phen_path.exists(): 455 shutil.rmtree(phen_path) 456 raise ValueError(f"Error occurred during repository fork: {str(e)}") 457 458 459def validate(phen_dir: str): 460 """Validates the phenotype directory is a git repo with standard structure""" 461 _logger.info(f"Validating phenotype: {phen_dir}") 462 phen_path = Path(phen_dir) 463 if not phen_path.is_dir(): 464 raise NotADirectoryError( 465 f"Error: '{str(phen_path.resolve())}' is not a directory" 466 ) 467 468 config_path = phen_path / CONFIG_FILE 469 if not config_path.is_file(): 470 raise FileNotFoundError( 471 f"Error: phen configuration file '{config_path}' does not exist." 472 ) 473 474 concepts_path = phen_path / CONCEPTS_DIR 475 if not concepts_path.is_dir(): 476 raise FileNotFoundError( 477 f"Error: source concepts directory {concepts_path} does not exist." 478 ) 479 480 # Calidate the directory is a git repo 481 try: 482 git.Repo(phen_path) 483 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 484 raise Exception(f"Phen directory {phen_path} is not a git repo") 485 486 # Load configuration File 487 if config_path.suffix == ".yml": 488 try: 489 with config_path.open("r") as file: 490 phenotype = yaml.safe_load(file) 491 492 validator = Validator(CONFIG_SCHEMA) 493 if validator.validate(phenotype): 494 _logger.debug("YAML structure is valid.") 495 else: 496 _logger.error(f"YAML structure validation failed: {validator.errors}") 497 raise Exception(f"YAML structure validation failed: {validator.errors}") 498 except yaml.YAMLError as e: 499 _logger.error(f"YAML syntax error: {e}") 500 raise e 501 else: 502 raise Exception( 503 f"Unsupported configuration filetype: {str(config_path.resolve())}" 504 ) 505 506 # initiatise 507 validation_errors = [] 508 phenotype = phenotype["phenotype"] 509 code_types = parse.CodeTypeParser().code_types 510 511 # check the version number is of the format vn.n.n 512 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 513 if not match: 514 validation_errors.append( 515 f"Invalid version format in configuration file: {phenotype['version']}" 516 ) 517 518 # create a list of all the concept set names defined in the concept set configuration 519 concept_set_names = [] 520 for item in phenotype["concept_sets"]: 521 if item["name"] in concept_set_names: 522 validation_errors.append( 523 f"Duplicate concept set defined in concept sets {item['name'] }" 524 ) 525 else: 526 concept_set_names.append(item["name"]) 527 528 # check codes definition 529 for files in phenotype["concept_sets"]: 530 for item in files["files"]: 531 # check concepte code file exists 532 concept_code_file_path = concepts_path / item["path"] 533 if not concept_code_file_path.exists(): 534 validation_errors.append( 535 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 536 ) 537 538 # check concepte code file is not empty 539 if concept_code_file_path.stat().st_size == 0: 540 validation_errors.append( 541 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 542 ) 543 544 # check code file type is supported 545 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 546 raise ValueError( 547 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 548 ) 549 550 # check columns specified are a supported medical coding type 551 for column in item["columns"]: 552 if column not in code_types: 553 validation_errors.append( 554 f"Column type {column} for file {concept_code_file_path} is not supported" 555 ) 556 557 # check the actions are supported 558 if "actions" in item: 559 for action in item["actions"]: 560 if action not in COL_ACTIONS: 561 validation_errors.append(f"Action {action} is not supported") 562 563 if len(validation_errors) > 0: 564 _logger.error(validation_errors) 565 raise PhenValidationException( 566 f"Configuration file {str(config_path.resolve())} failed validation", 567 validation_errors, 568 ) 569 570 _logger.info(f"Phenotype validated successfully") 571 572 573def _read_table_file(path: Path, excel_sheet: str = ""): 574 """ 575 Load Code List File 576 """ 577 578 path = path.resolve() 579 if path.suffix == ".csv": 580 df = pd.read_csv(path, dtype=str) 581 elif path.suffix == ".xlsx" or path.suffix == ".xls": 582 if excel_sheet != "": 583 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 584 else: 585 df = pd.read_excel(path, dtype=str) 586 elif path.suffix == ".dta": 587 df = pd.read_stata(path) 588 else: 589 raise ValueError( 590 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 591 ) 592 593 return df 594 595 596def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 597 # Perform Structural Changes to file before preprocessing 598 _logger.debug("Processing file structural actions") 599 if ( 600 "actions" in concept_set 601 and "split_col" in concept_set["actions"] 602 and "codes_col" in concept_set["actions"] 603 ): 604 split_col = concept_set["actions"]["split_col"] 605 codes_col = concept_set["actions"]["codes_col"] 606 _logger.debug( 607 "Action: Splitting", 608 split_col, 609 "column into:", 610 df[split_col].unique(), 611 ) 612 codes = df[codes_col] 613 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 614 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 615 oh[oh == False] = np.nan # replace 0s with None 616 df = pd.concat([df, oh], axis=1) # merge in new columns 617 618 return df 619 620 621def _preprocess_source_concepts( 622 df: pd.DataFrame, concept_set: dict, code_file_path: Path 623) -> Tuple[pd.DataFrame, list]: 624 """Perform QA Checks on columns individually and append to df""" 625 out = pd.DataFrame([]) # create output df to append to 626 code_errors = [] # list of errors from processing 627 628 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 629 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 630 631 # Preprocess codes 632 code_types = parse.CodeTypeParser().code_types 633 for code_type in concept_set["columns"]: 634 parser = code_types[code_type] 635 _logger.info(f"Processing {code_type} codes for {code_file_path}") 636 637 # get codes by column name 638 source_col_name = concept_set["columns"][code_type] 639 codes = df[source_col_name].dropna() 640 codes = codes.astype(str) # convert to string 641 codes = codes.str.strip() # remove excess spaces 642 643 # process codes, validating them using parser and returning the errors 644 codes, errors = parser.process(codes, code_file_path) 645 if len(errors) > 0: 646 code_errors.extend(errors) 647 _logger.warning(f"Codes validation failed with {len(errors)} errors") 648 649 # add processed codes to df 650 new_col_name = f"{source_col_name}_SOURCE" 651 df = df.rename(columns={source_col_name: new_col_name}) 652 process_codes = pd.DataFrame({code_type: codes}).join(df) 653 out = pd.concat( 654 [out, process_codes], 655 ignore_index=True, 656 ) 657 658 _logger.debug(out.head()) 659 660 return out, code_errors 661 662 663# Translate Df with multiple codes into single code type Series 664def translate_codes( 665 source_df: pd.DataFrame, 666 target_code_type: str, 667 concept_name: str, 668 not_translate: bool, 669 do_reverse_translate: bool, 670) -> pd.DataFrame: 671 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 672 673 # codes = pd.DataFrame([], dtype=str) 674 codes = pd.DataFrame( 675 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 676 ) 677 # Convert codes to target type 678 _logger.info(f"Converting to target code type {target_code_type}") 679 680 for source_code_type in source_df.columns: 681 # if target code type is the same as thet source code type, no translation, just appending source as target 682 if source_code_type == target_code_type: 683 copy_df = pd.DataFrame( 684 { 685 "SOURCE_CONCEPT": source_df[source_code_type], 686 "SOURCE_CONCEPT_TYPE": source_code_type, 687 "CONCEPT": source_df[source_code_type], 688 } 689 ) 690 codes = pd.concat([codes, copy_df]) 691 _logger.debug( 692 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 693 ) 694 elif not not_translate: 695 # get the translation filename using source to target code types 696 filename = f"{source_code_type}_to_{target_code_type}.parquet" 697 map_path = trud.PROCESSED_PATH / filename 698 699 filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet" 700 map_path_reversed = trud.PROCESSED_PATH / filename_reversed 701 702 # do the mapping if it exists 703 if map_path.exists(): 704 codes = _translate_codes(map_path, source_df, source_code_type, codes) 705 # otherwise do reverse mapping if enabled and it exists 706 elif do_reverse_translate and map_path_reversed.exists(): 707 codes = _translate_codes( 708 map_path_reversed, source_df, source_code_type, codes, reverse=True 709 ) 710 else: 711 _logger.warning( 712 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 713 ) 714 715 codes = codes.dropna() # delete NaNs 716 717 # added concept set type to output if any translations 718 if len(codes.index) > 0: 719 codes["CONCEPT_SET"] = concept_name 720 else: 721 _logger.debug(f"No codes converted with target code type {target_code_type}") 722 723 return codes 724 725 726def _translate_codes( 727 map_path, source_df, source_code_type, codes, reverse=False 728) -> pd.DataFrame: 729 # get mapping 730 df_map = pd.read_parquet(map_path) 731 732 # do mapping 733 if not (reverse): 734 translated_df = pd.merge(source_df[source_code_type], df_map, how="left") 735 else: 736 translated_df = pd.merge( 737 source_df[source_code_type], df_map, how="left" 738 ) # output codes from target as reversed 739 740 # normalise the output 741 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 742 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 743 744 # add to list of codes 745 codes = pd.concat([codes, translated_df]) 746 747 return codes 748 749 750def _write_code_errors(code_errors: list, code_errors_path: Path): 751 err_df = pd.DataFrame( 752 [ 753 { 754 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 755 "VOCABULARY": err.code_type, 756 "SOURCE": err.codes_file, 757 "CAUSE": err.message, 758 } 759 for err in code_errors 760 if err.mask is not None 761 ] 762 ) 763 764 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 765 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 766 err_df.to_csv(code_errors_path, index=False, mode="w") 767 768 769def write_vocab_version(phen_path: Path): 770 # write the vocab version files 771 772 if not trud.VERSION_PATH.exists(): 773 raise FileNotFoundError( 774 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 775 ) 776 777 if not omop.VERSION_PATH.exists(): 778 raise FileNotFoundError( 779 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 780 ) 781 782 with trud.VERSION_PATH.open("r") as file: 783 trud_version = yaml.safe_load(file) 784 785 with omop.VERSION_PATH.open("r") as file: 786 omop_version = yaml.safe_load(file) 787 788 # Create the combined YAML structure 789 version_data = { 790 "versions": { 791 "acmc": acmc.__version__, 792 "trud": trud_version, 793 "omop": omop_version, 794 } 795 } 796 797 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 798 yaml.dump( 799 version_data, 800 file, 801 Dumper=util.QuotedDumper, 802 default_flow_style=False, 803 sort_keys=False, 804 default_style='"', 805 ) 806 807 808def map( 809 phen_dir: str, 810 target_code_type: str, 811 not_translate: bool, 812 no_metadata: bool, 813 do_reverse_translate: bool, 814): 815 _logger.info(f"Processing phenotype: {phen_dir}") 816 817 # Validate configuration 818 validate(phen_dir) 819 820 # initialise paths 821 phen_path = Path(phen_dir) 822 config_path = phen_path / CONFIG_FILE 823 824 # load configuration 825 with config_path.open("r") as file: 826 config = yaml.safe_load(file) 827 phenotype = config["phenotype"] 828 829 if len(phenotype["map"]) == 0: 830 raise ValueError(f"No map codes defined in the phenotype configuration") 831 832 if target_code_type is not None and target_code_type not in phenotype["map"]: 833 raise ValueError( 834 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 835 ) 836 837 if target_code_type is not None: 838 _map_target_code_type( 839 phen_path, 840 phenotype, 841 target_code_type, 842 not_translate, 843 no_metadata, 844 do_reverse_translate, 845 ) 846 else: 847 for t in phenotype["map"]: 848 _map_target_code_type( 849 phen_path, 850 phenotype, 851 t, 852 not_translate, 853 no_metadata, 854 do_reverse_translate, 855 ) 856 857 _logger.info(f"Phenotype processed successfully") 858 859 860def _map_target_code_type( 861 phen_path: Path, 862 phenotype: dict, 863 target_code_type: str, 864 not_translate: bool, 865 no_metadata: bool, 866 do_reverse_translate: bool, 867): 868 _logger.debug(f"Target coding format: {target_code_type}") 869 concepts_path = phen_path / CONCEPTS_DIR 870 # Create output dataframe 871 out = pd.DataFrame([]) 872 code_errors = [] 873 874 # Process each folder in codes section 875 for files in phenotype["concept_sets"]: 876 concept_set_name = files["name"] 877 if "metadata" in files: 878 concept_set_metadata = files["metadata"] 879 else: 880 concept_set_metadata = {} 881 for concept_set in files["files"]: 882 _logger.debug(f"--- {concept_set} ---") 883 884 # Load code file 885 codes_file_path = Path(concepts_path / concept_set["path"]) 886 df = _read_table_file(codes_file_path) 887 888 # process structural actions 889 df = _process_actions(df, concept_set) 890 891 # preprocessing and validate of source concepts 892 _logger.debug("Processing and validating source concept codes") 893 df, errors = _preprocess_source_concepts( 894 df, 895 concept_set, 896 codes_file_path, 897 ) 898 899 # create df with just the source code columns 900 source_column_names = list(concept_set["columns"].keys()) 901 source_df = df[source_column_names] 902 903 _logger.debug(source_df.columns) 904 _logger.debug(source_df.head()) 905 906 _logger.debug( 907 f"Length of errors from _preprocess_source_concepts {len(errors)}" 908 ) 909 if len(errors) > 0: 910 code_errors.extend(errors) 911 _logger.debug(f" Length of code_errors {len(code_errors)}") 912 913 # Map source concepts codes to target codes 914 # if processing a source coding list with categorical data 915 if ( 916 "actions" in concept_set 917 and "divide_col" in concept_set["actions"] 918 and len(df) > 0 919 ): 920 divide_col = concept_set["actions"]["divide_col"] 921 _logger.debug(f"Action: Dividing Table by {divide_col}") 922 _logger.debug(f"column into: {df[divide_col].unique()}") 923 df_grp = df.groupby(divide_col) 924 for cat, grp in df_grp: 925 if cat == concept_set["category"]: 926 grp = grp.drop( 927 columns=[divide_col] 928 ) # delete categorical column 929 source_df = grp[source_column_names] 930 trans_out = translate_codes( 931 source_df, 932 target_code_type=target_code_type, 933 concept_name=concept_set_name, 934 not_translate=not_translate, 935 do_reverse_translate=do_reverse_translate, 936 ) 937 trans_out = add_metadata( 938 codes=trans_out, 939 metadata=concept_set_metadata, 940 no_metadata=no_metadata, 941 ) 942 out = pd.concat([out, trans_out]) 943 else: 944 source_df = df[source_column_names] 945 trans_out = translate_codes( 946 source_df, 947 target_code_type=target_code_type, 948 concept_name=concept_set_name, 949 not_translate=not_translate, 950 do_reverse_translate=do_reverse_translate, 951 ) 952 trans_out = add_metadata( 953 codes=trans_out, 954 metadata=concept_set_metadata, 955 no_metadata=no_metadata, 956 ) 957 out = pd.concat([out, trans_out]) 958 959 if len(code_errors) > 0: 960 _logger.error(f"The map processing has {len(code_errors)} errors") 961 error_path = phen_path / MAP_DIR / "errors" 962 error_path.mkdir(parents=True, exist_ok=True) 963 error_filename = f"{target_code_type}-code-errors.csv" 964 _write_code_errors(code_errors, error_path / error_filename) 965 966 # Check there is output from processing 967 if len(out.index) == 0: 968 _logger.error(f"No output after map processing") 969 raise Exception( 970 f"No output after map processing, check config {str(phen_path.resolve())}" 971 ) 972 973 # final processing 974 out = out.reset_index(drop=True) 975 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 976 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 977 978 # out_count = len(out.index) 979 # added metadata 980 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 981 # result_list = [] 982 # for files in phenotype["concept_sets"]: 983 # concept_set_name = files["name"] 984 # for concept_set in files["files"]: 985 # source_column_names = list(concept_set["columns"].keys()) 986 # for source_concept_type in source_column_names: 987 # # Filter output based on the current source_concept_type 988 # out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 989 # filtered_count = len(out_filtered_df.index) 990 991 # # Remove the source type columns except the current type will leave the metadata and the join 992 # remove_types = [ 993 # type for type in source_column_names if type != source_concept_type 994 # ] 995 # metadata_df = df.drop(columns=remove_types) 996 # metadata_df = metadata_df.rename( 997 # columns={source_concept_type: "SOURCE_CONCEPT"} 998 # ) 999 # metadata_df_count = len(metadata_df.index) 1000 1001 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 1002 # result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 1003 # result_count = len(result.index) 1004 1005 # _logger.debug( 1006 # f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 1007 # ) 1008 1009 # # Append the result to the result_list 1010 # result_list.append(result) 1011 1012 # Concatenate all the results into a single DataFrame 1013 # final_out = pd.concat(result_list, ignore_index=True) 1014 # final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 1015 # _logger.debug( 1016 # f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 1017 # ) 1018 1019 # Save output to map directory 1020 output_filename = target_code_type + ".csv" 1021 map_path = phen_path / MAP_DIR / output_filename 1022 out.to_csv(map_path, index=False) 1023 _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 1024 1025 # save concept sets as separate files 1026 concept_set_path = phen_path / CSV_PATH / target_code_type 1027 1028 # empty the concept-set directory except for hiddle files, e.g. .git 1029 if concept_set_path.exists(): 1030 for item in concept_set_path.iterdir(): 1031 if not item.name.startswith("."): 1032 item.unlink() 1033 else: 1034 concept_set_path.mkdir(parents=True, exist_ok=True) 1035 1036 # write each concept as a separate file 1037 for name, concept in out.groupby("CONCEPT_SET"): 1038 concept = concept.sort_values(by="CONCEPT") # sort rows 1039 concept = concept.dropna(how="all", axis=1) # remove empty cols 1040 concept = concept.reindex( 1041 sorted(concept.columns), axis=1 1042 ) # sort cols alphabetically 1043 filename = f"{name}.csv" 1044 concept_path = concept_set_path / filename 1045 concept.to_csv(concept_path, index=False) 1046 1047 write_vocab_version(phen_path) 1048 1049 _logger.info(f"Phenotype processed target code type {target_code_type}") 1050 1051 1052# Add metadata dict to each row of Df codes 1053def add_metadata( 1054 codes: pd.DataFrame, 1055 metadata: dict, 1056 no_metadata: bool, 1057) -> pd.DataFrame: 1058 """Add concept set metadata, stored as a dictionary, to each concept row""" 1059 1060 if not no_metadata: 1061 for meta_name, meta_value in metadata.items(): 1062 codes[meta_name] = meta_value 1063 _logger.debug( 1064 f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}" 1065 ) 1066 1067 return codes 1068 1069 1070def _generate_version_tag( 1071 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 1072) -> str: 1073 # Get all valid semantic version tags 1074 versions = [] 1075 for tag in repo.tags: 1076 if tag.name.startswith("v"): 1077 tag_name = tag.name[1:] # Remove the first character 1078 else: 1079 tag_name = tag.name 1080 if semver.Version.is_valid(tag_name): 1081 versions.append(semver.Version.parse(tag_name)) 1082 1083 _logger.debug(f"Versions: {versions}") 1084 # Determine the next version 1085 if not versions: 1086 new_version = semver.Version(0, 0, 1) 1087 else: 1088 latest_version = max(versions) 1089 if increment == "major": 1090 new_version = latest_version.bump_major() 1091 elif increment == "minor": 1092 new_version = latest_version.bump_minor() 1093 else: 1094 new_version = latest_version.bump_patch() 1095 1096 # Create the new tag 1097 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 1098 1099 return new_version_str 1100 1101 1102def publish( 1103 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1104): 1105 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1106 1107 # Validate config 1108 validate(phen_dir) 1109 phen_path = Path(phen_dir) 1110 1111 # load git repo and set the branch 1112 repo = git.Repo(phen_path) 1113 if DEFAULT_GIT_BRANCH in repo.branches: 1114 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1115 main_branch.checkout() 1116 else: 1117 raise AttributeError( 1118 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1119 ) 1120 1121 # check if any changes to publish 1122 if not repo.is_dirty() and not repo.untracked_files: 1123 if remote_url is not None and "origin" not in repo.remotes: 1124 _logger.info(f"First publish to remote url {remote_url}") 1125 else: 1126 _logger.info("Nothing to publish, no changes to the repo") 1127 return 1128 1129 # get next version 1130 new_version_str = _generate_version_tag(repo, increment) 1131 _logger.info(f"New version: {new_version_str}") 1132 1133 # Write version in configuration file 1134 config_path = phen_path / CONFIG_FILE 1135 with config_path.open("r") as file: 1136 config = yaml.safe_load(file) 1137 1138 config["phenotype"]["version"] = new_version_str 1139 with open(config_path, "w") as file: 1140 yaml.dump( 1141 config, 1142 file, 1143 Dumper=util.QuotedDumper, 1144 default_flow_style=False, 1145 sort_keys=False, 1146 default_style='"', 1147 ) 1148 1149 # Add and commit changes to repo including version updates 1150 commit_message = f"Committing updates to phenotype {phen_path}" 1151 repo.git.add("--all") 1152 repo.index.commit(commit_message) 1153 1154 # Add tag to the repo 1155 repo.create_tag(new_version_str) 1156 1157 # push to origin if a remote repo 1158 if remote_url is not None and "origin" not in repo.remotes: 1159 git_url = _construct_git_url(remote_url) 1160 repo.create_remote("origin", git_url) 1161 1162 try: 1163 if "origin" in repo.remotes: 1164 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1165 origin = repo.remotes.origin 1166 _logger.info(f"Pushing main branch to remote repo") 1167 repo.git.push("--set-upstream", "origin", "main") 1168 _logger.info(f"Pushing version tags to remote git repo") 1169 origin.push(tags=True) 1170 _logger.debug("Changes pushed to 'origin'") 1171 else: 1172 _logger.debug("Remote 'origin' is not set") 1173 except Exception as e: 1174 tag_ref = repo.tags[new_version_str] 1175 repo.delete_tag(tag_ref) 1176 repo.git.reset("--soft", "HEAD~1") 1177 raise e 1178 1179 _logger.info(f"Phenotype published successfully") 1180 1181 1182def export(phen_dir: str, version: str): 1183 """Exports a phen repo at a specific tagged version into a target directory""" 1184 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1185 1186 # validate configuration 1187 validate(phen_dir) 1188 phen_path = Path(phen_dir) 1189 1190 # load configuration 1191 config_path = phen_path / CONFIG_FILE 1192 with config_path.open("r") as file: 1193 config = yaml.safe_load(file) 1194 1195 map_path = phen_path / MAP_DIR 1196 if not map_path.exists(): 1197 _logger.warning(f"Map path does not exist '{map_path}'") 1198 1199 export_path = phen_path / OMOP_PATH 1200 # check export directory exists and if not create it 1201 if not export_path.exists(): 1202 export_path.mkdir(parents=True) 1203 _logger.debug(f"OMOP export directory '{export_path}' created.") 1204 1205 # omop export db 1206 export_db_path = omop.export( 1207 map_path, 1208 export_path, 1209 config["phenotype"]["version"], 1210 config["phenotype"]["omop"], 1211 ) 1212 1213 _logger.info(f"Phenotype exported successfully") 1214 1215 1216def copy(phen_dir: str, target_dir: str, version: str): 1217 """Copys a phen repo at a specific tagged version into a target directory""" 1218 1219 # Validate 1220 validate(phen_dir) 1221 phen_path = Path(phen_dir) 1222 1223 # Check target directory exists 1224 target_path = Path(target_dir) 1225 if not target_path.exists(): 1226 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1227 1228 # Set copy directory 1229 copy_path = target_path / version 1230 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1231 1232 if ( 1233 copy_path.exists() and copy_path.is_dir() 1234 ): # Check if it exists and is a directory 1235 copy = _check_delete_dir( 1236 copy_path, 1237 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1238 ) 1239 else: 1240 copy = True 1241 1242 if not copy: 1243 _logger.info(f"Not copying the version {version}") 1244 return 1245 1246 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1247 repo = git.Repo.clone_from(phen_path, copy_path) 1248 1249 # Check out the latest commit or specified version 1250 if version: 1251 # Checkout a specific version (e.g., branch, tag, or commit hash) 1252 _logger.info(f"Checking out version {version}...") 1253 repo.git.checkout(version) 1254 else: 1255 # Checkout the latest commit (HEAD) 1256 _logger.info(f"Checking out the latest commit...") 1257 repo.git.checkout("HEAD") 1258 1259 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1260 1261 _logger.info(f"Phenotype copied successfully") 1262 1263 1264# Convert concept_sets list into dictionaries 1265def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1266 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1267 concepts_dict = { 1268 item["name"]: [file["path"] for file in item["files"]] 1269 for item in config_data["phenotype"]["concept_sets"] 1270 } 1271 name_set = set(concepts_dict.keys()) 1272 return concepts_dict, name_set 1273 1274 1275def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1276 """ 1277 Extracts clean keys from a DeepDiff dictionary. 1278 1279 :param diff: DeepDiff result dictionary 1280 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1281 :return: A set of clean key names 1282 """ 1283 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1284 1285 1286def diff_config(old_config: dict, new_config: dict) -> str: 1287 report = f"\n# Changes to phenotype configuration\n" 1288 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1289 1290 old_concepts, old_names = extract_concepts(old_config) 1291 new_concepts, new_names = extract_concepts(new_config) 1292 1293 # Check added and removed concept set names 1294 added_names = new_names - old_names # Names that appear in new but not in old 1295 removed_names = old_names - new_names # Names that were in old but not in new 1296 1297 # find file path changes for unchanged names 1298 unchanged_names = old_names & new_names # Names that exist in both 1299 file_diff = DeepDiff( 1300 {name: old_concepts[name] for name in unchanged_names}, 1301 {name: new_concepts[name] for name in unchanged_names}, 1302 ) 1303 1304 # Find renamed concepts (same file, different name) 1305 renamed_concepts = [] 1306 for removed in removed_names: 1307 old_path = old_concepts[removed] 1308 for added in added_names: 1309 new_path = new_concepts[added] 1310 if old_path == new_path: 1311 renamed_concepts.append((removed, added)) 1312 1313 # Remove renamed concepts from added and removed sets 1314 for old_name, new_name in renamed_concepts: 1315 added_names.discard(new_name) 1316 removed_names.discard(old_name) 1317 1318 # generate config report 1319 if added_names: 1320 report += "## Added Concepts\n" 1321 for name in added_names: 1322 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1323 report += "\n" 1324 1325 if removed_names: 1326 report += "## Removed Concepts\n" 1327 for name in removed_names: 1328 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1329 report += "\n" 1330 1331 if renamed_concepts: 1332 report += "## Renamed Concepts\n" 1333 for old_name, new_name in renamed_concepts: 1334 report += ( 1335 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1336 ) 1337 report += "\n" 1338 1339 if "values_changed" in file_diff: 1340 report += "## Updated File Paths\n" 1341 for name, change in file_diff["values_changed"].items(): 1342 old_file = change["old_value"] 1343 new_file = change["new_value"] 1344 clean_name = name.split("root['")[1].split("']")[0] 1345 report += ( 1346 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1347 ) 1348 report += "\n" 1349 1350 if not ( 1351 added_names 1352 or removed_names 1353 or renamed_concepts 1354 or file_diff.get("values_changed") 1355 ): 1356 report += "No changes in concept sets.\n" 1357 1358 return report 1359 1360 1361def diff_map_files( 1362 old_map_path: Path, 1363 new_map_path: Path, 1364 output_changed_concepts: bool, 1365 csv_output_path: Path, 1366) -> str: 1367 old_output_files = [ 1368 file.name 1369 for file in old_map_path.iterdir() 1370 if file.is_file() and not file.name.startswith(".") 1371 ] 1372 new_output_files = [ 1373 file.name 1374 for file in new_map_path.iterdir() 1375 if file.is_file() and not file.name.startswith(".") 1376 ] 1377 1378 # Convert the lists to sets for easy comparison 1379 old_output_set = set(old_output_files) 1380 new_output_set = set(new_output_files) 1381 1382 # Outputs that are in old_output_set but not in new_output_set (removed files) 1383 removed_outputs = old_output_set - new_output_set 1384 # Outputs that are in new_output_set but not in old_output_set (added files) 1385 added_outputs = new_output_set - old_output_set 1386 # Outputs that are the intersection of old_output_set and new_output_set 1387 common_outputs = old_output_set & new_output_set 1388 1389 report = f"\n# Changes to available translations\n" 1390 report += f"This compares the coding translations files available.\n\n" 1391 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1392 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1393 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1394 1395 # Step N: Compare common outputs between versions 1396 report += f"# Changes to concepts in translation files\n\n" 1397 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1398 for file in common_outputs: 1399 old_output = old_map_path / file 1400 new_output = new_map_path / file 1401 1402 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1403 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1404 1405 df1 = pd.read_csv(old_output) 1406 df1_count = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1407 df2 = pd.read_csv(new_output) 1408 df2_count = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1409 1410 # Check for added and removed concepts 1411 report += f"- File {file}\n" 1412 sorted_list = sorted(list(set(df1_count.index) - set(df2_count.index))) 1413 report += f"- Removed concepts {sorted_list}\n" 1414 sorted_list = sorted(list(set(df2_count.index) - set(df1_count.index))) 1415 report += f"- Added concepts {sorted_list}\n" 1416 1417 # Find differences in rows between df1 and df2 1418 out = df1.merge( 1419 df2, 1420 on=["CONCEPT", "CONCEPT_SET"], 1421 how="outer", 1422 suffixes=["_old", "_new"], 1423 indicator=True, 1424 ) 1425 out = out[out["_merge"] != "both"] # only select rows that are different 1426 out["_merge"] = out["_merge"].cat.rename_categories( 1427 {"left_only": "Removed", "right_only": "Added"} 1428 ) # rename categories 1429 out.sort_values(by=["CONCEPT_SET", "_merge"], inplace=True) 1430 1431 # Count the number of added and removed concepts for each concept set 1432 report += f"- Changed concepts:\n" 1433 for concept_set_name, grp in out.groupby("CONCEPT_SET"): 1434 counts = grp["_merge"].value_counts() 1435 report += "\t - {} +{} -{}\n".format( 1436 concept_set_name, counts["Added"], counts["Removed"] 1437 ) 1438 report += "\n" 1439 1440 # Write the output to a CSV file 1441 if output_changed_concepts and out.shape[0] > 0: 1442 csv_filename = f"{Path(file).stem}_diff.csv" 1443 csv_filepath = csv_output_path / csv_filename 1444 out.to_csv(csv_filepath, index=False) 1445 _logger.debug(f"CSV of changes written to {str(csv_filepath.resolve())}") 1446 1447 return report 1448 1449 1450def diff_phen( 1451 new_phen_path: Path, 1452 new_version: str, 1453 old_phen_path: Path, 1454 old_version: str, 1455 report_path: Path, 1456 csv_output_path: Path, 1457 not_check_config: bool, 1458 output_changed_concepts: bool, 1459): 1460 """Compare the differences between two versions of a phenotype""" 1461 1462 # write report heading 1463 report = f"# Phenotype Comparison Report\n" 1464 1465 # Step 1: check differences configuration files 1466 if not not_check_config: 1467 # validate phenotypes 1468 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1469 validate(str(old_phen_path.resolve())) 1470 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1471 validate(str(new_phen_path.resolve())) 1472 1473 # get old and new config 1474 old_config_path = old_phen_path / CONFIG_FILE 1475 with old_config_path.open("r") as file: 1476 old_config = yaml.safe_load(file) 1477 new_config_path = new_phen_path / CONFIG_FILE 1478 with new_config_path.open("r") as file: 1479 new_config = yaml.safe_load(file) 1480 1481 # write report 1482 report += f"## Original phenotype\n" 1483 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1484 report += f" - {old_version}\n" 1485 report += f" - {str(old_phen_path.resolve())}\n" 1486 report += f"## Changed phenotype:\n" 1487 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1488 report += f" - {new_version}\n" 1489 report += f" - {str(new_phen_path.resolve())}\n" 1490 1491 # Convert list of dicts into a dict: {name: file} 1492 report += diff_config(old_config, new_config) 1493 1494 # Step 2: check differences between map files 1495 # List files from output directories 1496 old_map_path = old_phen_path / MAP_DIR 1497 new_map_path = new_phen_path / MAP_DIR 1498 report += diff_map_files( 1499 old_map_path, new_map_path, output_changed_concepts, csv_output_path 1500 ) 1501 1502 # initialise report file 1503 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1504 report_file = open(report_path, "w") 1505 report_file.write(report) 1506 report_file.close() 1507 1508 _logger.info(f"Phenotypes diff'd successfully") 1509 1510 1511def diff( 1512 phen_dir: str, 1513 version: str, 1514 old_phen_dir: str, 1515 old_version: str, 1516 not_check_config: bool, 1517 output_changed_concepts: bool, 1518): 1519 # make tmp directory .acmc 1520 timestamp = time.strftime("%Y%m%d_%H%M%S") 1521 temp_dir = Path(f".acmc/diff_{timestamp}") 1522 1523 changed_phen_path = Path(phen_dir) 1524 if not changed_phen_path.exists(): 1525 raise ValueError( 1526 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1527 ) 1528 1529 old_phen_path = Path(old_phen_dir) 1530 if not old_phen_path.exists(): 1531 raise ValueError( 1532 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1533 ) 1534 1535 try: 1536 # Create the directory 1537 temp_dir.mkdir(parents=True, exist_ok=True) 1538 _logger.debug(f"Temporary directory created: {temp_dir}") 1539 1540 # Create temporary directories 1541 changed_path = temp_dir / "changed" 1542 changed_path.mkdir(parents=True, exist_ok=True) 1543 old_path = temp_dir / "old" 1544 old_path.mkdir(parents=True, exist_ok=True) 1545 1546 # checkout changed 1547 if version == "latest": 1548 _logger.debug( 1549 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1550 ) 1551 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1552 else: 1553 _logger.debug( 1554 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1555 ) 1556 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1557 changed_repo.git.checkout(version) 1558 1559 # checkout old 1560 if old_version == "latest": 1561 _logger.debug( 1562 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1563 ) 1564 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1565 else: 1566 _logger.debug( 1567 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1568 ) 1569 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1570 old_repo.git.checkout(old_version) 1571 1572 report_filename = f"{version}_{old_version}_diff.md" 1573 report_path = changed_phen_path / report_filename 1574 csv_output_path = changed_phen_path 1575 # diff old with new 1576 diff_phen( 1577 changed_path, 1578 version, 1579 old_path, 1580 old_version, 1581 report_path, 1582 csv_output_path, 1583 not_check_config, 1584 output_changed_concepts, 1585 ) 1586 1587 finally: 1588 # clean up tmp directory 1589 if temp_dir.exists(): 1590 shutil.rmtree(temp_dir)
Default phenotype directory name
Default phenotype directory path
Default concepts directory name
Default map directory name
Default concept set directory name
Default CSV concept set directory path
Default OMOP concept set directory path
List of default phenotype directories
Default configuration filename
Default vocabulary version filename
List of semantic version increment types
Default semantic version increment type
Default phenotype repo branch name
Split column preprocessing action type
Codes column preprocessing action type
Divide column preprocessing action type
List of column preprocessing action types
List of supported source concept coding list file types
Phenotype config.yml schema definition
163class PhenValidationException(Exception): 164 """Custom exception class raised when validation errors in phenotype configuration file""" 165 166 def __init__(self, message, validation_errors=None): 167 super().__init__(message) 168 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
237def init(phen_dir: str, remote_url: str): 238 """Initial phenotype directory as git repo with standard structure""" 239 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 240 phen_path = Path(phen_dir) 241 242 # check if directory already exists and ask user if they want to recreate it 243 if ( 244 phen_path.exists() and phen_path.is_dir() 245 ): # Check if it exists and is a directory 246 configure = _check_delete_dir( 247 phen_path, 248 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 249 ) 250 else: 251 configure = True 252 253 if not configure: 254 _logger.info(f"Exiting, phenotype not initiatised") 255 return 256 257 # Initialise repo from local or remote 258 repo: Repo 259 260 # if remote then clone the repo otherwise init a local repo 261 if remote_url != None: 262 # add PAT token to the URL 263 git_url = _construct_git_url(remote_url) 264 265 # clone the repo 266 git_cmd = git.cmd.Git() 267 git_cmd.clone(git_url, phen_path) 268 269 # open repo 270 repo = Repo(phen_path) 271 # check if there are any commits (new repo has no commits) 272 if ( 273 len(repo.branches) == 0 or repo.head.is_detached 274 ): # Handle detached HEAD (e.g., after init) 275 _logger.debug("The phen repository has no commits yet.") 276 commit_count = 0 277 else: 278 # Get the total number of commits in the default branch 279 commit_count = sum(1 for _ in repo.iter_commits()) 280 _logger.debug(f"Repo has previous commits: {commit_count}") 281 else: 282 # local repo, create the directories and init 283 phen_path.mkdir(parents=True, exist_ok=True) 284 _logger.debug(f"Phen directory '{phen_path}' has been created.") 285 repo = git.Repo.init(phen_path) 286 commit_count = 0 287 288 phen_path = phen_path.resolve() 289 # initialise empty repos 290 if commit_count == 0: 291 # create initial commit 292 initial_file_path = phen_path / "README.md" 293 with open(initial_file_path, "w") as file: 294 file.write( 295 "# Initial commit\nThis is the first commit in the phen repository.\n" 296 ) 297 repo.index.add([initial_file_path]) 298 repo.index.commit("Initial commit") 299 commit_count = 1 300 301 # Checkout the phens default branch, creating it if it does not exist 302 if DEFAULT_GIT_BRANCH in repo.branches: 303 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 304 main_branch.checkout() 305 else: 306 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 307 main_branch.checkout() 308 309 # if the phen path does not contain the config file then initialise the phen type 310 config_path = phen_path / CONFIG_FILE 311 if config_path.exists(): 312 _logger.debug(f"Phenotype configuration files already exist") 313 return 314 315 _logger.info("Creating phen directory structure and config files") 316 for d in DEFAULT_PHEN_DIR_LIST: 317 _create_empty_git_dir(phen_path / d) 318 319 # create empty phen config file 320 config = { 321 "phenotype": { 322 "version": "0.0.0", 323 "omop": { 324 "vocabulary_id": "", 325 "vocabulary_name": "", 326 "vocabulary_reference": "", 327 }, 328 "translate": [], 329 "concept_sets": [], 330 } 331 } 332 333 with open(phen_path / CONFIG_FILE, "w") as file: 334 yaml.dump( 335 config, 336 file, 337 Dumper=util.QuotedDumper, 338 default_flow_style=False, 339 sort_keys=False, 340 default_style='"', 341 ) 342 343 # add git ignore 344 ignore_content = """# Ignore SQLite database files 345*.db 346*.sqlite3 347 348# Ignore SQLite journal and metadata files 349*.db-journal 350*.sqlite3-journal 351 352# python 353.ipynb_checkpoints 354 """ 355 ignore_path = phen_path / ".gitignore" 356 with open(ignore_path, "w") as file: 357 file.write(ignore_content) 358 359 # add to git repo and commit 360 for d in DEFAULT_PHEN_DIR_LIST: 361 repo.git.add(phen_path / d) 362 repo.git.add(all=True) 363 repo.index.commit("initialised the phen git repo.") 364 365 _logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
368def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 369 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 370 371 Args: 372 phen_dir (str): local directory path where the upstream repo is to be cloned 373 upstream_url (str): url to the upstream repo 374 upstream_version (str): version in the upstream repo to clone 375 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 376 377 Raises: 378 ValueError: if the specified version is not in the upstream repo 379 ValueError: if the upstream repo is not a valid phenotype repo 380 ValueError: if there's any other problems with Git 381 """ 382 _logger.info( 383 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 384 ) 385 386 phen_path = Path(phen_dir) 387 # check if directory already exists and ask user if they want to recreate it 388 if ( 389 phen_path.exists() and phen_path.is_dir() 390 ): # Check if it exists and is a directory 391 configure = _check_delete_dir( 392 phen_path, 393 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 394 ) 395 else: 396 configure = True 397 398 if not configure: 399 _logger.info(f"Exiting, phenotype not initiatised") 400 return 401 402 try: 403 # Clone repo 404 git_url = _construct_git_url(upstream_url) 405 repo = git.Repo.clone_from(git_url, phen_path) 406 407 # Fetch all branches and tags 408 repo.remotes.origin.fetch() 409 410 # Check if the version exists 411 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 412 if upstream_version not in available_refs: 413 raise ValueError( 414 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 415 ) 416 417 # Checkout the specified version 418 repo.git.checkout(upstream_version) 419 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 420 main_branch.checkout() 421 422 # Check if 'config.yml' exists in the root directory 423 config_path = phen_path / "config.yml" 424 if not os.path.isfile(config_path): 425 raise ValueError( 426 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 427 ) 428 429 # Validate the phenotype is compatible with the acmc tool 430 validate(str(phen_path.resolve())) 431 432 # Delete each tag locally 433 tags = repo.tags 434 for tag in tags: 435 repo.delete_tag(tag) 436 _logger.debug(f"Deleted tags from forked repo: {tag}") 437 438 # Add upstream remote 439 repo.create_remote("upstream", upstream_url) 440 remote = repo.remotes["origin"] 441 repo.delete_remote(remote) # Remove existing origin 442 443 # Optionally set a new origin remote 444 if new_origin_url: 445 git_url = _construct_git_url(new_origin_url) 446 repo.create_remote("origin", git_url) 447 repo.git.push("--set-upstream", "origin", "main") 448 449 _logger.info(f"Repository forked successfully at {phen_path}") 450 _logger.info(f"Upstream set to {upstream_url}") 451 if new_origin_url: 452 _logger.info(f"Origin set to {new_origin_url}") 453 454 except Exception as e: 455 if phen_path.exists(): 456 shutil.rmtree(phen_path) 457 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Arguments:
- phen_dir (str): local directory path where the upstream repo is to be cloned
- upstream_url (str): url to the upstream repo
- upstream_version (str): version in the upstream repo to clone
- new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
- ValueError: if the specified version is not in the upstream repo
- ValueError: if the upstream repo is not a valid phenotype repo
- ValueError: if there's any other problems with Git
460def validate(phen_dir: str): 461 """Validates the phenotype directory is a git repo with standard structure""" 462 _logger.info(f"Validating phenotype: {phen_dir}") 463 phen_path = Path(phen_dir) 464 if not phen_path.is_dir(): 465 raise NotADirectoryError( 466 f"Error: '{str(phen_path.resolve())}' is not a directory" 467 ) 468 469 config_path = phen_path / CONFIG_FILE 470 if not config_path.is_file(): 471 raise FileNotFoundError( 472 f"Error: phen configuration file '{config_path}' does not exist." 473 ) 474 475 concepts_path = phen_path / CONCEPTS_DIR 476 if not concepts_path.is_dir(): 477 raise FileNotFoundError( 478 f"Error: source concepts directory {concepts_path} does not exist." 479 ) 480 481 # Calidate the directory is a git repo 482 try: 483 git.Repo(phen_path) 484 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 485 raise Exception(f"Phen directory {phen_path} is not a git repo") 486 487 # Load configuration File 488 if config_path.suffix == ".yml": 489 try: 490 with config_path.open("r") as file: 491 phenotype = yaml.safe_load(file) 492 493 validator = Validator(CONFIG_SCHEMA) 494 if validator.validate(phenotype): 495 _logger.debug("YAML structure is valid.") 496 else: 497 _logger.error(f"YAML structure validation failed: {validator.errors}") 498 raise Exception(f"YAML structure validation failed: {validator.errors}") 499 except yaml.YAMLError as e: 500 _logger.error(f"YAML syntax error: {e}") 501 raise e 502 else: 503 raise Exception( 504 f"Unsupported configuration filetype: {str(config_path.resolve())}" 505 ) 506 507 # initiatise 508 validation_errors = [] 509 phenotype = phenotype["phenotype"] 510 code_types = parse.CodeTypeParser().code_types 511 512 # check the version number is of the format vn.n.n 513 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 514 if not match: 515 validation_errors.append( 516 f"Invalid version format in configuration file: {phenotype['version']}" 517 ) 518 519 # create a list of all the concept set names defined in the concept set configuration 520 concept_set_names = [] 521 for item in phenotype["concept_sets"]: 522 if item["name"] in concept_set_names: 523 validation_errors.append( 524 f"Duplicate concept set defined in concept sets {item['name'] }" 525 ) 526 else: 527 concept_set_names.append(item["name"]) 528 529 # check codes definition 530 for files in phenotype["concept_sets"]: 531 for item in files["files"]: 532 # check concepte code file exists 533 concept_code_file_path = concepts_path / item["path"] 534 if not concept_code_file_path.exists(): 535 validation_errors.append( 536 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 537 ) 538 539 # check concepte code file is not empty 540 if concept_code_file_path.stat().st_size == 0: 541 validation_errors.append( 542 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 543 ) 544 545 # check code file type is supported 546 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 547 raise ValueError( 548 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 549 ) 550 551 # check columns specified are a supported medical coding type 552 for column in item["columns"]: 553 if column not in code_types: 554 validation_errors.append( 555 f"Column type {column} for file {concept_code_file_path} is not supported" 556 ) 557 558 # check the actions are supported 559 if "actions" in item: 560 for action in item["actions"]: 561 if action not in COL_ACTIONS: 562 validation_errors.append(f"Action {action} is not supported") 563 564 if len(validation_errors) > 0: 565 _logger.error(validation_errors) 566 raise PhenValidationException( 567 f"Configuration file {str(config_path.resolve())} failed validation", 568 validation_errors, 569 ) 570 571 _logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
665def translate_codes( 666 source_df: pd.DataFrame, 667 target_code_type: str, 668 concept_name: str, 669 not_translate: bool, 670 do_reverse_translate: bool, 671) -> pd.DataFrame: 672 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 673 674 # codes = pd.DataFrame([], dtype=str) 675 codes = pd.DataFrame( 676 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 677 ) 678 # Convert codes to target type 679 _logger.info(f"Converting to target code type {target_code_type}") 680 681 for source_code_type in source_df.columns: 682 # if target code type is the same as thet source code type, no translation, just appending source as target 683 if source_code_type == target_code_type: 684 copy_df = pd.DataFrame( 685 { 686 "SOURCE_CONCEPT": source_df[source_code_type], 687 "SOURCE_CONCEPT_TYPE": source_code_type, 688 "CONCEPT": source_df[source_code_type], 689 } 690 ) 691 codes = pd.concat([codes, copy_df]) 692 _logger.debug( 693 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 694 ) 695 elif not not_translate: 696 # get the translation filename using source to target code types 697 filename = f"{source_code_type}_to_{target_code_type}.parquet" 698 map_path = trud.PROCESSED_PATH / filename 699 700 filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet" 701 map_path_reversed = trud.PROCESSED_PATH / filename_reversed 702 703 # do the mapping if it exists 704 if map_path.exists(): 705 codes = _translate_codes(map_path, source_df, source_code_type, codes) 706 # otherwise do reverse mapping if enabled and it exists 707 elif do_reverse_translate and map_path_reversed.exists(): 708 codes = _translate_codes( 709 map_path_reversed, source_df, source_code_type, codes, reverse=True 710 ) 711 else: 712 _logger.warning( 713 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 714 ) 715 716 codes = codes.dropna() # delete NaNs 717 718 # added concept set type to output if any translations 719 if len(codes.index) > 0: 720 codes["CONCEPT_SET"] = concept_name 721 else: 722 _logger.debug(f"No codes converted with target code type {target_code_type}") 723 724 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
770def write_vocab_version(phen_path: Path): 771 # write the vocab version files 772 773 if not trud.VERSION_PATH.exists(): 774 raise FileNotFoundError( 775 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 776 ) 777 778 if not omop.VERSION_PATH.exists(): 779 raise FileNotFoundError( 780 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 781 ) 782 783 with trud.VERSION_PATH.open("r") as file: 784 trud_version = yaml.safe_load(file) 785 786 with omop.VERSION_PATH.open("r") as file: 787 omop_version = yaml.safe_load(file) 788 789 # Create the combined YAML structure 790 version_data = { 791 "versions": { 792 "acmc": acmc.__version__, 793 "trud": trud_version, 794 "omop": omop_version, 795 } 796 } 797 798 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 799 yaml.dump( 800 version_data, 801 file, 802 Dumper=util.QuotedDumper, 803 default_flow_style=False, 804 sort_keys=False, 805 default_style='"', 806 )
809def map( 810 phen_dir: str, 811 target_code_type: str, 812 not_translate: bool, 813 no_metadata: bool, 814 do_reverse_translate: bool, 815): 816 _logger.info(f"Processing phenotype: {phen_dir}") 817 818 # Validate configuration 819 validate(phen_dir) 820 821 # initialise paths 822 phen_path = Path(phen_dir) 823 config_path = phen_path / CONFIG_FILE 824 825 # load configuration 826 with config_path.open("r") as file: 827 config = yaml.safe_load(file) 828 phenotype = config["phenotype"] 829 830 if len(phenotype["map"]) == 0: 831 raise ValueError(f"No map codes defined in the phenotype configuration") 832 833 if target_code_type is not None and target_code_type not in phenotype["map"]: 834 raise ValueError( 835 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 836 ) 837 838 if target_code_type is not None: 839 _map_target_code_type( 840 phen_path, 841 phenotype, 842 target_code_type, 843 not_translate, 844 no_metadata, 845 do_reverse_translate, 846 ) 847 else: 848 for t in phenotype["map"]: 849 _map_target_code_type( 850 phen_path, 851 phenotype, 852 t, 853 not_translate, 854 no_metadata, 855 do_reverse_translate, 856 ) 857 858 _logger.info(f"Phenotype processed successfully")
1054def add_metadata( 1055 codes: pd.DataFrame, 1056 metadata: dict, 1057 no_metadata: bool, 1058) -> pd.DataFrame: 1059 """Add concept set metadata, stored as a dictionary, to each concept row""" 1060 1061 if not no_metadata: 1062 for meta_name, meta_value in metadata.items(): 1063 codes[meta_name] = meta_value 1064 _logger.debug( 1065 f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}" 1066 ) 1067 1068 return codes
Add concept set metadata, stored as a dictionary, to each concept row
1103def publish( 1104 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1105): 1106 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1107 1108 # Validate config 1109 validate(phen_dir) 1110 phen_path = Path(phen_dir) 1111 1112 # load git repo and set the branch 1113 repo = git.Repo(phen_path) 1114 if DEFAULT_GIT_BRANCH in repo.branches: 1115 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1116 main_branch.checkout() 1117 else: 1118 raise AttributeError( 1119 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1120 ) 1121 1122 # check if any changes to publish 1123 if not repo.is_dirty() and not repo.untracked_files: 1124 if remote_url is not None and "origin" not in repo.remotes: 1125 _logger.info(f"First publish to remote url {remote_url}") 1126 else: 1127 _logger.info("Nothing to publish, no changes to the repo") 1128 return 1129 1130 # get next version 1131 new_version_str = _generate_version_tag(repo, increment) 1132 _logger.info(f"New version: {new_version_str}") 1133 1134 # Write version in configuration file 1135 config_path = phen_path / CONFIG_FILE 1136 with config_path.open("r") as file: 1137 config = yaml.safe_load(file) 1138 1139 config["phenotype"]["version"] = new_version_str 1140 with open(config_path, "w") as file: 1141 yaml.dump( 1142 config, 1143 file, 1144 Dumper=util.QuotedDumper, 1145 default_flow_style=False, 1146 sort_keys=False, 1147 default_style='"', 1148 ) 1149 1150 # Add and commit changes to repo including version updates 1151 commit_message = f"Committing updates to phenotype {phen_path}" 1152 repo.git.add("--all") 1153 repo.index.commit(commit_message) 1154 1155 # Add tag to the repo 1156 repo.create_tag(new_version_str) 1157 1158 # push to origin if a remote repo 1159 if remote_url is not None and "origin" not in repo.remotes: 1160 git_url = _construct_git_url(remote_url) 1161 repo.create_remote("origin", git_url) 1162 1163 try: 1164 if "origin" in repo.remotes: 1165 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1166 origin = repo.remotes.origin 1167 _logger.info(f"Pushing main branch to remote repo") 1168 repo.git.push("--set-upstream", "origin", "main") 1169 _logger.info(f"Pushing version tags to remote git repo") 1170 origin.push(tags=True) 1171 _logger.debug("Changes pushed to 'origin'") 1172 else: 1173 _logger.debug("Remote 'origin' is not set") 1174 except Exception as e: 1175 tag_ref = repo.tags[new_version_str] 1176 repo.delete_tag(tag_ref) 1177 repo.git.reset("--soft", "HEAD~1") 1178 raise e 1179 1180 _logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1183def export(phen_dir: str, version: str): 1184 """Exports a phen repo at a specific tagged version into a target directory""" 1185 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1186 1187 # validate configuration 1188 validate(phen_dir) 1189 phen_path = Path(phen_dir) 1190 1191 # load configuration 1192 config_path = phen_path / CONFIG_FILE 1193 with config_path.open("r") as file: 1194 config = yaml.safe_load(file) 1195 1196 map_path = phen_path / MAP_DIR 1197 if not map_path.exists(): 1198 _logger.warning(f"Map path does not exist '{map_path}'") 1199 1200 export_path = phen_path / OMOP_PATH 1201 # check export directory exists and if not create it 1202 if not export_path.exists(): 1203 export_path.mkdir(parents=True) 1204 _logger.debug(f"OMOP export directory '{export_path}' created.") 1205 1206 # omop export db 1207 export_db_path = omop.export( 1208 map_path, 1209 export_path, 1210 config["phenotype"]["version"], 1211 config["phenotype"]["omop"], 1212 ) 1213 1214 _logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1217def copy(phen_dir: str, target_dir: str, version: str): 1218 """Copys a phen repo at a specific tagged version into a target directory""" 1219 1220 # Validate 1221 validate(phen_dir) 1222 phen_path = Path(phen_dir) 1223 1224 # Check target directory exists 1225 target_path = Path(target_dir) 1226 if not target_path.exists(): 1227 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1228 1229 # Set copy directory 1230 copy_path = target_path / version 1231 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1232 1233 if ( 1234 copy_path.exists() and copy_path.is_dir() 1235 ): # Check if it exists and is a directory 1236 copy = _check_delete_dir( 1237 copy_path, 1238 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1239 ) 1240 else: 1241 copy = True 1242 1243 if not copy: 1244 _logger.info(f"Not copying the version {version}") 1245 return 1246 1247 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1248 repo = git.Repo.clone_from(phen_path, copy_path) 1249 1250 # Check out the latest commit or specified version 1251 if version: 1252 # Checkout a specific version (e.g., branch, tag, or commit hash) 1253 _logger.info(f"Checking out version {version}...") 1254 repo.git.checkout(version) 1255 else: 1256 # Checkout the latest commit (HEAD) 1257 _logger.info(f"Checking out the latest commit...") 1258 repo.git.checkout("HEAD") 1259 1260 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1261 1262 _logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1266def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1267 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1268 concepts_dict = { 1269 item["name"]: [file["path"] for file in item["files"]] 1270 for item in config_data["phenotype"]["concept_sets"] 1271 } 1272 name_set = set(concepts_dict.keys()) 1273 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1287def diff_config(old_config: dict, new_config: dict) -> str: 1288 report = f"\n# Changes to phenotype configuration\n" 1289 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1290 1291 old_concepts, old_names = extract_concepts(old_config) 1292 new_concepts, new_names = extract_concepts(new_config) 1293 1294 # Check added and removed concept set names 1295 added_names = new_names - old_names # Names that appear in new but not in old 1296 removed_names = old_names - new_names # Names that were in old but not in new 1297 1298 # find file path changes for unchanged names 1299 unchanged_names = old_names & new_names # Names that exist in both 1300 file_diff = DeepDiff( 1301 {name: old_concepts[name] for name in unchanged_names}, 1302 {name: new_concepts[name] for name in unchanged_names}, 1303 ) 1304 1305 # Find renamed concepts (same file, different name) 1306 renamed_concepts = [] 1307 for removed in removed_names: 1308 old_path = old_concepts[removed] 1309 for added in added_names: 1310 new_path = new_concepts[added] 1311 if old_path == new_path: 1312 renamed_concepts.append((removed, added)) 1313 1314 # Remove renamed concepts from added and removed sets 1315 for old_name, new_name in renamed_concepts: 1316 added_names.discard(new_name) 1317 removed_names.discard(old_name) 1318 1319 # generate config report 1320 if added_names: 1321 report += "## Added Concepts\n" 1322 for name in added_names: 1323 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1324 report += "\n" 1325 1326 if removed_names: 1327 report += "## Removed Concepts\n" 1328 for name in removed_names: 1329 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1330 report += "\n" 1331 1332 if renamed_concepts: 1333 report += "## Renamed Concepts\n" 1334 for old_name, new_name in renamed_concepts: 1335 report += ( 1336 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1337 ) 1338 report += "\n" 1339 1340 if "values_changed" in file_diff: 1341 report += "## Updated File Paths\n" 1342 for name, change in file_diff["values_changed"].items(): 1343 old_file = change["old_value"] 1344 new_file = change["new_value"] 1345 clean_name = name.split("root['")[1].split("']")[0] 1346 report += ( 1347 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1348 ) 1349 report += "\n" 1350 1351 if not ( 1352 added_names 1353 or removed_names 1354 or renamed_concepts 1355 or file_diff.get("values_changed") 1356 ): 1357 report += "No changes in concept sets.\n" 1358 1359 return report
1362def diff_map_files( 1363 old_map_path: Path, 1364 new_map_path: Path, 1365 output_changed_concepts: bool, 1366 csv_output_path: Path, 1367) -> str: 1368 old_output_files = [ 1369 file.name 1370 for file in old_map_path.iterdir() 1371 if file.is_file() and not file.name.startswith(".") 1372 ] 1373 new_output_files = [ 1374 file.name 1375 for file in new_map_path.iterdir() 1376 if file.is_file() and not file.name.startswith(".") 1377 ] 1378 1379 # Convert the lists to sets for easy comparison 1380 old_output_set = set(old_output_files) 1381 new_output_set = set(new_output_files) 1382 1383 # Outputs that are in old_output_set but not in new_output_set (removed files) 1384 removed_outputs = old_output_set - new_output_set 1385 # Outputs that are in new_output_set but not in old_output_set (added files) 1386 added_outputs = new_output_set - old_output_set 1387 # Outputs that are the intersection of old_output_set and new_output_set 1388 common_outputs = old_output_set & new_output_set 1389 1390 report = f"\n# Changes to available translations\n" 1391 report += f"This compares the coding translations files available.\n\n" 1392 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1393 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1394 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1395 1396 # Step N: Compare common outputs between versions 1397 report += f"# Changes to concepts in translation files\n\n" 1398 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1399 for file in common_outputs: 1400 old_output = old_map_path / file 1401 new_output = new_map_path / file 1402 1403 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1404 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1405 1406 df1 = pd.read_csv(old_output) 1407 df1_count = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1408 df2 = pd.read_csv(new_output) 1409 df2_count = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1410 1411 # Check for added and removed concepts 1412 report += f"- File {file}\n" 1413 sorted_list = sorted(list(set(df1_count.index) - set(df2_count.index))) 1414 report += f"- Removed concepts {sorted_list}\n" 1415 sorted_list = sorted(list(set(df2_count.index) - set(df1_count.index))) 1416 report += f"- Added concepts {sorted_list}\n" 1417 1418 # Find differences in rows between df1 and df2 1419 out = df1.merge( 1420 df2, 1421 on=["CONCEPT", "CONCEPT_SET"], 1422 how="outer", 1423 suffixes=["_old", "_new"], 1424 indicator=True, 1425 ) 1426 out = out[out["_merge"] != "both"] # only select rows that are different 1427 out["_merge"] = out["_merge"].cat.rename_categories( 1428 {"left_only": "Removed", "right_only": "Added"} 1429 ) # rename categories 1430 out.sort_values(by=["CONCEPT_SET", "_merge"], inplace=True) 1431 1432 # Count the number of added and removed concepts for each concept set 1433 report += f"- Changed concepts:\n" 1434 for concept_set_name, grp in out.groupby("CONCEPT_SET"): 1435 counts = grp["_merge"].value_counts() 1436 report += "\t - {} +{} -{}\n".format( 1437 concept_set_name, counts["Added"], counts["Removed"] 1438 ) 1439 report += "\n" 1440 1441 # Write the output to a CSV file 1442 if output_changed_concepts and out.shape[0] > 0: 1443 csv_filename = f"{Path(file).stem}_diff.csv" 1444 csv_filepath = csv_output_path / csv_filename 1445 out.to_csv(csv_filepath, index=False) 1446 _logger.debug(f"CSV of changes written to {str(csv_filepath.resolve())}") 1447 1448 return report
1451def diff_phen( 1452 new_phen_path: Path, 1453 new_version: str, 1454 old_phen_path: Path, 1455 old_version: str, 1456 report_path: Path, 1457 csv_output_path: Path, 1458 not_check_config: bool, 1459 output_changed_concepts: bool, 1460): 1461 """Compare the differences between two versions of a phenotype""" 1462 1463 # write report heading 1464 report = f"# Phenotype Comparison Report\n" 1465 1466 # Step 1: check differences configuration files 1467 if not not_check_config: 1468 # validate phenotypes 1469 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1470 validate(str(old_phen_path.resolve())) 1471 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1472 validate(str(new_phen_path.resolve())) 1473 1474 # get old and new config 1475 old_config_path = old_phen_path / CONFIG_FILE 1476 with old_config_path.open("r") as file: 1477 old_config = yaml.safe_load(file) 1478 new_config_path = new_phen_path / CONFIG_FILE 1479 with new_config_path.open("r") as file: 1480 new_config = yaml.safe_load(file) 1481 1482 # write report 1483 report += f"## Original phenotype\n" 1484 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1485 report += f" - {old_version}\n" 1486 report += f" - {str(old_phen_path.resolve())}\n" 1487 report += f"## Changed phenotype:\n" 1488 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1489 report += f" - {new_version}\n" 1490 report += f" - {str(new_phen_path.resolve())}\n" 1491 1492 # Convert list of dicts into a dict: {name: file} 1493 report += diff_config(old_config, new_config) 1494 1495 # Step 2: check differences between map files 1496 # List files from output directories 1497 old_map_path = old_phen_path / MAP_DIR 1498 new_map_path = new_phen_path / MAP_DIR 1499 report += diff_map_files( 1500 old_map_path, new_map_path, output_changed_concepts, csv_output_path 1501 ) 1502 1503 # initialise report file 1504 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1505 report_file = open(report_path, "w") 1506 report_file.write(report) 1507 report_file.close() 1508 1509 _logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1512def diff( 1513 phen_dir: str, 1514 version: str, 1515 old_phen_dir: str, 1516 old_version: str, 1517 not_check_config: bool, 1518 output_changed_concepts: bool, 1519): 1520 # make tmp directory .acmc 1521 timestamp = time.strftime("%Y%m%d_%H%M%S") 1522 temp_dir = Path(f".acmc/diff_{timestamp}") 1523 1524 changed_phen_path = Path(phen_dir) 1525 if not changed_phen_path.exists(): 1526 raise ValueError( 1527 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1528 ) 1529 1530 old_phen_path = Path(old_phen_dir) 1531 if not old_phen_path.exists(): 1532 raise ValueError( 1533 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1534 ) 1535 1536 try: 1537 # Create the directory 1538 temp_dir.mkdir(parents=True, exist_ok=True) 1539 _logger.debug(f"Temporary directory created: {temp_dir}") 1540 1541 # Create temporary directories 1542 changed_path = temp_dir / "changed" 1543 changed_path.mkdir(parents=True, exist_ok=True) 1544 old_path = temp_dir / "old" 1545 old_path.mkdir(parents=True, exist_ok=True) 1546 1547 # checkout changed 1548 if version == "latest": 1549 _logger.debug( 1550 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1551 ) 1552 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1553 else: 1554 _logger.debug( 1555 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1556 ) 1557 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1558 changed_repo.git.checkout(version) 1559 1560 # checkout old 1561 if old_version == "latest": 1562 _logger.debug( 1563 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1564 ) 1565 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1566 else: 1567 _logger.debug( 1568 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1569 ) 1570 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1571 old_repo.git.checkout(old_version) 1572 1573 report_filename = f"{version}_{old_version}_diff.md" 1574 report_path = changed_phen_path / report_filename 1575 csv_output_path = changed_phen_path 1576 # diff old with new 1577 diff_phen( 1578 changed_path, 1579 version, 1580 old_path, 1581 old_version, 1582 report_path, 1583 csv_output_path, 1584 not_check_config, 1585 output_changed_concepts, 1586 ) 1587 1588 finally: 1589 # clean up tmp directory 1590 if temp_dir.exists(): 1591 shutil.rmtree(temp_dir)