acmc.phen

phenotype.py module

This module provides functionality for managing phenotypes.

   1"""
   2phenotype.py module
   3
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator  # type: ignore
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28import acmc
  29from acmc import trud, omop, parse, util, logging_config as lc
  30
  31# set up logging
  32_logger = lc.setup_logger()
  33
  34pd.set_option("mode.chained_assignment", None)
  35
  36PHEN_DIR = "phen"
  37"""Default phenotype directory name"""
  38
  39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  40"""Default phenotype directory path"""
  41
  42CONCEPTS_DIR = "concepts"
  43"""Default concepts directory name"""
  44
  45MAP_DIR = "map"
  46"""Default map directory name"""
  47
  48CONCEPT_SET_DIR = "concept-sets"
  49"""Default concept set directory name"""
  50
  51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  52"""Default CSV concept set directory path"""
  53
  54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  55"""Default OMOP concept set directory path"""
  56
  57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  58"""List of default phenotype directories"""
  59
  60CONFIG_FILE = "config.yml"
  61"""Default configuration filename"""
  62
  63VOCAB_VERSION_FILE = "vocab_version.yml"
  64"""Default vocabulary version filename"""
  65
  66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  67"""List of semantic version increment types"""
  68
  69DEFAULT_VERSION_INC = "patch"
  70"""Default semantic version increment type"""
  71
  72DEFAULT_GIT_BRANCH = "main"
  73"""Default phenotype repo branch name"""
  74
  75SPLIT_COL_ACTION = "split_col"
  76"""Split column preprocessing action type"""
  77
  78CODES_COL_ACTION = "codes_col"
  79"""Codes column preprocessing action type"""
  80
  81DIVIDE_COL_ACTION = "divide_col"
  82"""Divide column preprocessing action type"""
  83
  84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  85"""List of column preprocessing action types"""
  86
  87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  88"""List of supported source concept coding list file types"""
  89
  90# config.yaml schema
  91CONFIG_SCHEMA = {
  92    "phenotype": {
  93        "type": "dict",
  94        "required": True,
  95        "schema": {
  96            "version": {
  97                "type": "string",
  98                "required": True,
  99                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
 100            },
 101            "omop": {
 102                "type": "dict",
 103                "required": True,
 104                "schema": {
 105                    "vocabulary_id": {"type": "string", "required": True},
 106                    "vocabulary_name": {"type": "string", "required": True},
 107                    "vocabulary_reference": {
 108                        "type": "string",
 109                        "required": True,
 110                        "regex": r"^https?://.*",  # Ensures it's a URL
 111                    },
 112                },
 113            },
 114            "map": {
 115                "type": "list",
 116                "schema": {
 117                    "type": "string",
 118                    "allowed": list(
 119                        parse.SUPPORTED_CODE_TYPES
 120                    ),  # Ensure only predefined values are allowed
 121                },
 122            },
 123            "concept_sets": {
 124                "type": "list",
 125                "required": True,
 126                "schema": {
 127                    "type": "dict",
 128                    "schema": {
 129                        "name": {"type": "string", "required": True},
 130                        "files": {
 131                            "type": "list",
 132                            "required": True,
 133                            "schema": {
 134                                "type": "dict",
 135                                "schema": {
 136                                    "path": {"type": "string", "required": True},
 137                                    "columns": {"type": "dict", "required": True},
 138                                    "category": {
 139                                        "type": "string"
 140                                    },  # Optional but must be string if present
 141                                    "actions": {
 142                                        "type": "dict",
 143                                        "schema": {
 144                                            "divide_col": {"type": "string"},
 145                                            "split_col": {"type": "string"},
 146                                            "codes_col": {"type": "string"},
 147                                        },
 148                                    },
 149                                },
 150                            },
 151                        },
 152                        "metadata": {"type": "dict", "required": False},
 153                    },
 154                },
 155            },
 156        },
 157    }
 158}
 159"""Phenotype config.yml schema definition"""
 160
 161
 162class PhenValidationException(Exception):
 163    """Custom exception class raised when validation errors in phenotype configuration file"""
 164
 165    def __init__(self, message, validation_errors=None):
 166        super().__init__(message)
 167        self.validation_errors = validation_errors
 168
 169
 170def _construct_git_url(remote_url: str):
 171    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 172    # check the url
 173    parsed_url = urlparse(remote_url)
 174
 175    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 176    # need to update this function if the URL scheme is different.
 177    if "github.com" in parsed_url.netloc:
 178        # get GitHub PAT from environment variable
 179        auth = os.getenv("ACMC_GITHUB_PAT")
 180        if not auth:
 181            raise ValueError(
 182                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 183            )
 184    else:
 185        # get GitLab PAT from environment variable
 186        auth = os.getenv("ACMC_GITLAB_PAT")
 187        if not auth:
 188            raise ValueError(
 189                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 190            )
 191        auth = f"oauth2:{auth}"
 192
 193    # Construct the new URL with credentials
 194    new_netloc = f"{auth}@{parsed_url.netloc}"
 195    return urlunparse(
 196        (
 197            parsed_url.scheme,
 198            new_netloc,
 199            parsed_url.path,
 200            parsed_url.params,
 201            parsed_url.query,
 202            parsed_url.fragment,
 203        )
 204    )
 205
 206
 207def _create_empty_git_dir(path: Path):
 208    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 209    path.mkdir(exist_ok=True)
 210    keep_path = path / ".gitkeep"
 211    keep_path.touch(exist_ok=True)
 212
 213
 214def _check_delete_dir(path: Path, msg: str) -> bool:
 215    """Checks on the command line if a user wants to delete a directory
 216
 217    Args:
 218        path (Path): path of the directory to be deleted
 219        msg (str): message to be displayed to the user
 220
 221    Returns:
 222        Boolean: True if deleted
 223    """
 224    deleted = False
 225
 226    user_input = input(f"{msg}").strip().lower()
 227    if user_input in ["yes", "y"]:
 228        shutil.rmtree(path)
 229        deleted = True
 230    else:
 231        _logger.info("Directory was not deleted.")
 232
 233    return deleted
 234
 235
 236def init(phen_dir: str, remote_url: str):
 237    """Initial phenotype directory as git repo with standard structure"""
 238    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 239    phen_path = Path(phen_dir)
 240
 241    # check if directory already exists and ask user if they want to recreate it
 242    if (
 243        phen_path.exists() and phen_path.is_dir()
 244    ):  # Check if it exists and is a directory
 245        configure = _check_delete_dir(
 246            phen_path,
 247            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 248        )
 249    else:
 250        configure = True
 251
 252    if not configure:
 253        _logger.info(f"Exiting, phenotype not initiatised")
 254        return
 255
 256    # Initialise repo from local or remote
 257    repo: Repo
 258
 259    # if remote then clone the repo otherwise init a local repo
 260    if remote_url != None:
 261        # add PAT token to the URL
 262        git_url = _construct_git_url(remote_url)
 263
 264        # clone the repo
 265        git_cmd = git.cmd.Git()
 266        git_cmd.clone(git_url, phen_path)
 267
 268        # open repo
 269        repo = Repo(phen_path)
 270        # check if there are any commits (new repo has no commits)
 271        if (
 272            len(repo.branches) == 0 or repo.head.is_detached
 273        ):  # Handle detached HEAD (e.g., after init)
 274            _logger.debug("The phen repository has no commits yet.")
 275            commit_count = 0
 276        else:
 277            # Get the total number of commits in the default branch
 278            commit_count = sum(1 for _ in repo.iter_commits())
 279            _logger.debug(f"Repo has previous commits: {commit_count}")
 280    else:
 281        # local repo, create the directories and init
 282        phen_path.mkdir(parents=True, exist_ok=True)
 283        _logger.debug(f"Phen directory '{phen_path}' has been created.")
 284        repo = git.Repo.init(phen_path)
 285        commit_count = 0
 286
 287    phen_path = phen_path.resolve()
 288    # initialise empty repos
 289    if commit_count == 0:
 290        # create initial commit
 291        initial_file_path = phen_path / "README.md"
 292        with open(initial_file_path, "w") as file:
 293            file.write(
 294                "# Initial commit\nThis is the first commit in the phen repository.\n"
 295            )
 296        repo.index.add([initial_file_path])
 297        repo.index.commit("Initial commit")
 298        commit_count = 1
 299
 300    # Checkout the phens default branch, creating it if it does not exist
 301    if DEFAULT_GIT_BRANCH in repo.branches:
 302        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 303        main_branch.checkout()
 304    else:
 305        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 306        main_branch.checkout()
 307
 308    # if the phen path does not contain the config file then initialise the phen type
 309    config_path = phen_path / CONFIG_FILE
 310    if config_path.exists():
 311        _logger.debug(f"Phenotype configuration files already exist")
 312        return
 313
 314    _logger.info("Creating phen directory structure and config files")
 315    for d in DEFAULT_PHEN_DIR_LIST:
 316        _create_empty_git_dir(phen_path / d)
 317
 318    # create empty phen config file
 319    config = {
 320        "phenotype": {
 321            "version": "0.0.0",
 322            "omop": {
 323                "vocabulary_id": "",
 324                "vocabulary_name": "",
 325                "vocabulary_reference": "",
 326            },
 327            "translate": [],
 328            "concept_sets": [],
 329        }
 330    }
 331
 332    with open(phen_path / CONFIG_FILE, "w") as file:
 333        yaml.dump(
 334            config,
 335            file,
 336            Dumper=util.QuotedDumper,
 337            default_flow_style=False,
 338            sort_keys=False,
 339            default_style='"',
 340        )
 341
 342    # add git ignore
 343    ignore_content = """# Ignore SQLite database files
 344*.db
 345*.sqlite3
 346 
 347# Ignore SQLite journal and metadata files
 348*.db-journal
 349*.sqlite3-journal
 350
 351# python
 352.ipynb_checkpoints
 353 """
 354    ignore_path = phen_path / ".gitignore"
 355    with open(ignore_path, "w") as file:
 356        file.write(ignore_content)
 357
 358    # add to git repo and commit
 359    for d in DEFAULT_PHEN_DIR_LIST:
 360        repo.git.add(phen_path / d)
 361    repo.git.add(all=True)
 362    repo.index.commit("initialised the phen git repo.")
 363
 364    _logger.info(f"Phenotype initialised successfully")
 365
 366
 367def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
 368    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 369
 370    Args:
 371        phen_dir (str): local directory path where the upstream repo is to be cloned
 372        upstream_url (str): url to the upstream repo
 373        upstream_version (str): version in the upstream repo to clone
 374        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 375
 376    Raises:
 377        ValueError: if the specified version is not in the upstream repo
 378        ValueError: if the upstream repo is not a valid phenotype repo
 379        ValueError: if there's any other problems with Git
 380    """
 381    _logger.info(
 382        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 383    )
 384
 385    phen_path = Path(phen_dir)
 386    # check if directory already exists and ask user if they want to recreate it
 387    if (
 388        phen_path.exists() and phen_path.is_dir()
 389    ):  # Check if it exists and is a directory
 390        configure = _check_delete_dir(
 391            phen_path,
 392            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 393        )
 394    else:
 395        configure = True
 396
 397    if not configure:
 398        _logger.info(f"Exiting, phenotype not initiatised")
 399        return
 400
 401    try:
 402        # Clone repo
 403        git_url = _construct_git_url(upstream_url)
 404        repo = git.Repo.clone_from(git_url, phen_path)
 405
 406        # Fetch all branches and tags
 407        repo.remotes.origin.fetch()
 408
 409        # Check if the version exists
 410        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 411        if upstream_version not in available_refs:
 412            raise ValueError(
 413                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 414            )
 415
 416        # Checkout the specified version
 417        repo.git.checkout(upstream_version)
 418        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 419        main_branch.checkout()
 420
 421        # Check if 'config.yml' exists in the root directory
 422        config_path = phen_path / "config.yml"
 423        if not os.path.isfile(config_path):
 424            raise ValueError(
 425                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
 426            )
 427
 428        # Validate the phenotype is compatible with the acmc tool
 429        validate(str(phen_path.resolve()))
 430
 431        # Delete each tag locally
 432        tags = repo.tags
 433        for tag in tags:
 434            repo.delete_tag(tag)
 435            _logger.debug(f"Deleted tags from forked repo: {tag}")
 436
 437        # Add upstream remote
 438        repo.create_remote("upstream", upstream_url)
 439        remote = repo.remotes["origin"]
 440        repo.delete_remote(remote)  # Remove existing origin
 441
 442        # Optionally set a new origin remote
 443        if new_origin_url:
 444            git_url = _construct_git_url(new_origin_url)
 445            repo.create_remote("origin", git_url)
 446            repo.git.push("--set-upstream", "origin", "main")
 447
 448        _logger.info(f"Repository forked successfully at {phen_path}")
 449        _logger.info(f"Upstream set to {upstream_url}")
 450        if new_origin_url:
 451            _logger.info(f"Origin set to {new_origin_url}")
 452
 453    except Exception as e:
 454        if phen_path.exists():
 455            shutil.rmtree(phen_path)
 456        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 457
 458
 459def validate(phen_dir: str):
 460    """Validates the phenotype directory is a git repo with standard structure"""
 461    _logger.info(f"Validating phenotype: {phen_dir}")
 462    phen_path = Path(phen_dir)
 463    if not phen_path.is_dir():
 464        raise NotADirectoryError(
 465            f"Error: '{str(phen_path.resolve())}' is not a directory"
 466        )
 467
 468    config_path = phen_path / CONFIG_FILE
 469    if not config_path.is_file():
 470        raise FileNotFoundError(
 471            f"Error: phen configuration file '{config_path}' does not exist."
 472        )
 473
 474    concepts_path = phen_path / CONCEPTS_DIR
 475    if not concepts_path.is_dir():
 476        raise FileNotFoundError(
 477            f"Error: source concepts directory {concepts_path} does not exist."
 478        )
 479
 480    # Calidate the directory is a git repo
 481    try:
 482        git.Repo(phen_path)
 483    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 484        raise Exception(f"Phen directory {phen_path} is not a git repo")
 485
 486    # Load configuration File
 487    if config_path.suffix == ".yml":
 488        try:
 489            with config_path.open("r") as file:
 490                phenotype = yaml.safe_load(file)
 491
 492            validator = Validator(CONFIG_SCHEMA)
 493            if validator.validate(phenotype):
 494                _logger.debug("YAML structure is valid.")
 495            else:
 496                _logger.error(f"YAML structure validation failed: {validator.errors}")
 497                raise Exception(f"YAML structure validation failed: {validator.errors}")
 498        except yaml.YAMLError as e:
 499            _logger.error(f"YAML syntax error: {e}")
 500            raise e
 501    else:
 502        raise Exception(
 503            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 504        )
 505
 506    # initiatise
 507    validation_errors = []
 508    phenotype = phenotype["phenotype"]
 509    code_types = parse.CodeTypeParser().code_types
 510
 511    # check the version number is of the format vn.n.n
 512    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 513    if not match:
 514        validation_errors.append(
 515            f"Invalid version format in configuration file: {phenotype['version']}"
 516        )
 517
 518    # create a list of all the concept set names defined in the concept set configuration
 519    concept_set_names = []
 520    for item in phenotype["concept_sets"]:
 521        if item["name"] in concept_set_names:
 522            validation_errors.append(
 523                f"Duplicate concept set defined in concept sets {item['name'] }"
 524            )
 525        else:
 526            concept_set_names.append(item["name"])
 527
 528    # check codes definition
 529    for files in phenotype["concept_sets"]:
 530        for item in files["files"]:
 531            # check concepte code file exists
 532            concept_code_file_path = concepts_path / item["path"]
 533            if not concept_code_file_path.exists():
 534                validation_errors.append(
 535                    f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 536                )
 537
 538            # check concepte code file is not empty
 539            if concept_code_file_path.stat().st_size == 0:
 540                validation_errors.append(
 541                    f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 542                )
 543
 544            # check code file type is supported
 545            if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 546                raise ValueError(
 547                    f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 548                )
 549
 550            # check columns specified are a supported medical coding type
 551            for column in item["columns"]:
 552                if column not in code_types:
 553                    validation_errors.append(
 554                        f"Column type {column} for file {concept_code_file_path} is not supported"
 555                    )
 556
 557            # check the actions are supported
 558            if "actions" in item:
 559                for action in item["actions"]:
 560                    if action not in COL_ACTIONS:
 561                        validation_errors.append(f"Action {action} is not supported")
 562
 563    if len(validation_errors) > 0:
 564        _logger.error(validation_errors)
 565        raise PhenValidationException(
 566            f"Configuration file {str(config_path.resolve())} failed validation",
 567            validation_errors,
 568        )
 569
 570    _logger.info(f"Phenotype validated successfully")
 571
 572
 573def _read_table_file(path: Path, excel_sheet: str = ""):
 574    """
 575    Load Code List File
 576    """
 577
 578    path = path.resolve()
 579    if path.suffix == ".csv":
 580        df = pd.read_csv(path, dtype=str)
 581    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 582        if excel_sheet != "":
 583            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 584        else:
 585            df = pd.read_excel(path, dtype=str)
 586    elif path.suffix == ".dta":
 587        df = pd.read_stata(path)
 588    else:
 589        raise ValueError(
 590            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 591        )
 592
 593    return df
 594
 595
 596def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 597    # Perform Structural Changes to file before preprocessing
 598    _logger.debug("Processing file structural actions")
 599    if (
 600        "actions" in concept_set
 601        and "split_col" in concept_set["actions"]
 602        and "codes_col" in concept_set["actions"]
 603    ):
 604        split_col = concept_set["actions"]["split_col"]
 605        codes_col = concept_set["actions"]["codes_col"]
 606        _logger.debug(
 607            "Action: Splitting",
 608            split_col,
 609            "column into:",
 610            df[split_col].unique(),
 611        )
 612        codes = df[codes_col]
 613        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 614        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 615        oh[oh == False] = np.nan  # replace 0s with None
 616        df = pd.concat([df, oh], axis=1)  # merge in new columns
 617
 618    return df
 619
 620
 621def _preprocess_source_concepts(
 622    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 623) -> Tuple[pd.DataFrame, list]:
 624    """Perform QA Checks on columns individually and append to df"""
 625    out = pd.DataFrame([])  # create output df to append to
 626    code_errors = []  # list of errors from processing
 627
 628    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 629    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 630
 631    # Preprocess codes
 632    code_types = parse.CodeTypeParser().code_types
 633    for code_type in concept_set["columns"]:
 634        parser = code_types[code_type]
 635        _logger.info(f"Processing {code_type} codes for {code_file_path}")
 636
 637        # get codes by column name
 638        source_col_name = concept_set["columns"][code_type]
 639        codes = df[source_col_name].dropna()
 640        codes = codes.astype(str)  # convert to string
 641        codes = codes.str.strip()  # remove excess spaces
 642
 643        # process codes, validating them using parser and returning the errors
 644        codes, errors = parser.process(codes, code_file_path)
 645        if len(errors) > 0:
 646            code_errors.extend(errors)
 647            _logger.warning(f"Codes validation failed with {len(errors)} errors")
 648
 649        # add processed codes to df
 650        new_col_name = f"{source_col_name}_SOURCE"
 651        df = df.rename(columns={source_col_name: new_col_name})
 652        process_codes = pd.DataFrame({code_type: codes}).join(df)
 653        out = pd.concat(
 654            [out, process_codes],
 655            ignore_index=True,
 656        )
 657
 658    _logger.debug(out.head())
 659
 660    return out, code_errors
 661
 662
 663# Translate Df with multiple codes into single code type Series
 664def translate_codes(
 665    source_df: pd.DataFrame,
 666    target_code_type: str,
 667    concept_name: str,
 668    not_translate: bool,
 669    do_reverse_translate: bool,
 670) -> pd.DataFrame:
 671    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 672
 673    # codes = pd.DataFrame([], dtype=str)
 674    codes = pd.DataFrame(
 675        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 676    )
 677    # Convert codes to target type
 678    _logger.info(f"Converting to target code type {target_code_type}")
 679
 680    for source_code_type in source_df.columns:
 681        # if target code type is the same as thet source code type, no translation, just appending source as target
 682        if source_code_type == target_code_type:
 683            copy_df = pd.DataFrame(
 684                {
 685                    "SOURCE_CONCEPT": source_df[source_code_type],
 686                    "SOURCE_CONCEPT_TYPE": source_code_type,
 687                    "CONCEPT": source_df[source_code_type],
 688                }
 689            )
 690            codes = pd.concat([codes, copy_df])
 691            _logger.debug(
 692                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 693            )
 694        elif not not_translate:
 695            # get the translation filename using source to target code types
 696            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 697            map_path = trud.PROCESSED_PATH / filename
 698
 699            filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet"
 700            map_path_reversed = trud.PROCESSED_PATH / filename_reversed
 701
 702            # do the mapping if it exists
 703            if map_path.exists():
 704                codes = _translate_codes(map_path, source_df, source_code_type, codes)
 705            # otherwise do reverse mapping if enabled and it exists
 706            elif do_reverse_translate and map_path_reversed.exists():
 707                codes = _translate_codes(
 708                    map_path_reversed, source_df, source_code_type, codes, reverse=True
 709                )
 710            else:
 711                _logger.warning(
 712                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 713                )
 714
 715    codes = codes.dropna()  # delete NaNs
 716
 717    # added concept set type to output if any translations
 718    if len(codes.index) > 0:
 719        codes["CONCEPT_SET"] = concept_name
 720    else:
 721        _logger.debug(f"No codes converted with target code type {target_code_type}")
 722
 723    return codes
 724
 725
 726def _translate_codes(
 727    map_path, source_df, source_code_type, codes, reverse=False
 728) -> pd.DataFrame:
 729    # get mapping
 730    df_map = pd.read_parquet(map_path)
 731
 732    # do mapping
 733    if not (reverse):
 734        translated_df = pd.merge(source_df[source_code_type], df_map, how="left")
 735    else:
 736        translated_df = pd.merge(
 737            source_df[source_code_type], df_map, how="left"
 738        )  # output codes from target as reversed
 739
 740    # normalise the output
 741    translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
 742    translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 743
 744    # add to list of codes
 745    codes = pd.concat([codes, translated_df])
 746
 747    return codes
 748
 749
 750def _write_code_errors(code_errors: list, code_errors_path: Path):
 751    err_df = pd.DataFrame(
 752        [
 753            {
 754                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 755                "VOCABULARY": err.code_type,
 756                "SOURCE": err.codes_file,
 757                "CAUSE": err.message,
 758            }
 759            for err in code_errors
 760            if err.mask is not None
 761        ]
 762    )
 763
 764    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 765    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 766    err_df.to_csv(code_errors_path, index=False, mode="w")
 767
 768
 769def write_vocab_version(phen_path: Path):
 770    # write the vocab version files
 771
 772    if not trud.VERSION_PATH.exists():
 773        raise FileNotFoundError(
 774            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 775        )
 776
 777    if not omop.VERSION_PATH.exists():
 778        raise FileNotFoundError(
 779            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 780        )
 781
 782    with trud.VERSION_PATH.open("r") as file:
 783        trud_version = yaml.safe_load(file)
 784
 785    with omop.VERSION_PATH.open("r") as file:
 786        omop_version = yaml.safe_load(file)
 787
 788    # Create the combined YAML structure
 789    version_data = {
 790        "versions": {
 791            "acmc": acmc.__version__,
 792            "trud": trud_version,
 793            "omop": omop_version,
 794        }
 795    }
 796
 797    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 798        yaml.dump(
 799            version_data,
 800            file,
 801            Dumper=util.QuotedDumper,
 802            default_flow_style=False,
 803            sort_keys=False,
 804            default_style='"',
 805        )
 806
 807
 808def map(
 809    phen_dir: str,
 810    target_code_type: str,
 811    not_translate: bool,
 812    no_metadata: bool,
 813    do_reverse_translate: bool,
 814):
 815    _logger.info(f"Processing phenotype: {phen_dir}")
 816
 817    # Validate configuration
 818    validate(phen_dir)
 819
 820    # initialise paths
 821    phen_path = Path(phen_dir)
 822    config_path = phen_path / CONFIG_FILE
 823
 824    # load configuration
 825    with config_path.open("r") as file:
 826        config = yaml.safe_load(file)
 827    phenotype = config["phenotype"]
 828
 829    if len(phenotype["map"]) == 0:
 830        raise ValueError(f"No map codes defined in the phenotype configuration")
 831
 832    if target_code_type is not None and target_code_type not in phenotype["map"]:
 833        raise ValueError(
 834            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 835        )
 836
 837    if target_code_type is not None:
 838        _map_target_code_type(
 839            phen_path,
 840            phenotype,
 841            target_code_type,
 842            not_translate,
 843            no_metadata,
 844            do_reverse_translate,
 845        )
 846    else:
 847        for t in phenotype["map"]:
 848            _map_target_code_type(
 849                phen_path,
 850                phenotype,
 851                t,
 852                not_translate,
 853                no_metadata,
 854                do_reverse_translate,
 855            )
 856
 857    _logger.info(f"Phenotype processed successfully")
 858
 859
 860def _map_target_code_type(
 861    phen_path: Path,
 862    phenotype: dict,
 863    target_code_type: str,
 864    not_translate: bool,
 865    no_metadata: bool,
 866    do_reverse_translate: bool,
 867):
 868    _logger.debug(f"Target coding format: {target_code_type}")
 869    concepts_path = phen_path / CONCEPTS_DIR
 870    # Create output dataframe
 871    out = pd.DataFrame([])
 872    code_errors = []
 873
 874    # Process each folder in codes section
 875    for files in phenotype["concept_sets"]:
 876        concept_set_name = files["name"]
 877        if "metadata" in files:
 878            concept_set_metadata = files["metadata"]
 879        else:
 880            concept_set_metadata = {}
 881        for concept_set in files["files"]:
 882            _logger.debug(f"--- {concept_set} ---")
 883
 884            # Load code file
 885            codes_file_path = Path(concepts_path / concept_set["path"])
 886            df = _read_table_file(codes_file_path)
 887
 888            # process structural actions
 889            df = _process_actions(df, concept_set)
 890
 891            # preprocessing and validate of source concepts
 892            _logger.debug("Processing and validating source concept codes")
 893            df, errors = _preprocess_source_concepts(
 894                df,
 895                concept_set,
 896                codes_file_path,
 897            )
 898
 899            # create df with just the source code columns
 900            source_column_names = list(concept_set["columns"].keys())
 901            source_df = df[source_column_names]
 902
 903            _logger.debug(source_df.columns)
 904            _logger.debug(source_df.head())
 905
 906            _logger.debug(
 907                f"Length of errors from _preprocess_source_concepts {len(errors)}"
 908            )
 909            if len(errors) > 0:
 910                code_errors.extend(errors)
 911            _logger.debug(f" Length of code_errors {len(code_errors)}")
 912
 913            # Map source concepts codes to target codes
 914            # if processing a source coding list with categorical data
 915            if (
 916                "actions" in concept_set
 917                and "divide_col" in concept_set["actions"]
 918                and len(df) > 0
 919            ):
 920                divide_col = concept_set["actions"]["divide_col"]
 921                _logger.debug(f"Action: Dividing Table by {divide_col}")
 922                _logger.debug(f"column into: {df[divide_col].unique()}")
 923                df_grp = df.groupby(divide_col)
 924                for cat, grp in df_grp:
 925                    if cat == concept_set["category"]:
 926                        grp = grp.drop(
 927                            columns=[divide_col]
 928                        )  # delete categorical column
 929                        source_df = grp[source_column_names]
 930                        trans_out = translate_codes(
 931                            source_df,
 932                            target_code_type=target_code_type,
 933                            concept_name=concept_set_name,
 934                            not_translate=not_translate,
 935                            do_reverse_translate=do_reverse_translate,
 936                        )
 937                        trans_out = add_metadata(
 938                            codes=trans_out,
 939                            metadata=concept_set_metadata,
 940                            no_metadata=no_metadata,
 941                        )
 942                        out = pd.concat([out, trans_out])
 943            else:
 944                source_df = df[source_column_names]
 945                trans_out = translate_codes(
 946                    source_df,
 947                    target_code_type=target_code_type,
 948                    concept_name=concept_set_name,
 949                    not_translate=not_translate,
 950                    do_reverse_translate=do_reverse_translate,
 951                )
 952                trans_out = add_metadata(
 953                    codes=trans_out,
 954                    metadata=concept_set_metadata,
 955                    no_metadata=no_metadata,
 956                )
 957                out = pd.concat([out, trans_out])
 958
 959    if len(code_errors) > 0:
 960        _logger.error(f"The map processing has {len(code_errors)} errors")
 961        error_path = phen_path / MAP_DIR / "errors"
 962        error_path.mkdir(parents=True, exist_ok=True)
 963        error_filename = f"{target_code_type}-code-errors.csv"
 964        _write_code_errors(code_errors, error_path / error_filename)
 965
 966    # Check there is output from processing
 967    if len(out.index) == 0:
 968        _logger.error(f"No output after map processing")
 969        raise Exception(
 970            f"No output after map processing, check config {str(phen_path.resolve())}"
 971        )
 972
 973    # final processing
 974    out = out.reset_index(drop=True)
 975    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 976    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 977
 978    # out_count = len(out.index)
 979    # added metadata
 980    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 981    # result_list = []
 982    # for files in phenotype["concept_sets"]:
 983    #     concept_set_name = files["name"]
 984    #     for concept_set in files["files"]:
 985    #         source_column_names = list(concept_set["columns"].keys())
 986    #         for source_concept_type in source_column_names:
 987    #             # Filter output based on the current source_concept_type
 988    #             out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 989    #             filtered_count = len(out_filtered_df.index)
 990
 991    #             # Remove the source type columns except the current type will leave the metadata and the join
 992    #             remove_types = [
 993    #                 type for type in source_column_names if type != source_concept_type
 994    #             ]
 995    #             metadata_df = df.drop(columns=remove_types)
 996    #             metadata_df = metadata_df.rename(
 997    #                 columns={source_concept_type: "SOURCE_CONCEPT"}
 998    #             )
 999    #             metadata_df_count = len(metadata_df.index)
1000
1001    # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
1002    # result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
1003    # result_count = len(result.index)
1004
1005    #             _logger.debug(
1006    #                 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
1007    #             )
1008
1009    #             # Append the result to the result_list
1010    #             result_list.append(result)
1011
1012    # Concatenate all the results into a single DataFrame
1013    # final_out = pd.concat(result_list, ignore_index=True)
1014    # final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
1015    # _logger.debug(
1016    #     f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
1017    # )
1018
1019    # Save output to map directory
1020    output_filename = target_code_type + ".csv"
1021    map_path = phen_path / MAP_DIR / output_filename
1022    out.to_csv(map_path, index=False)
1023    _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
1024
1025    # save concept sets as separate files
1026    concept_set_path = phen_path / CSV_PATH / target_code_type
1027
1028    # empty the concept-set directory except for hiddle files, e.g. .git
1029    if concept_set_path.exists():
1030        for item in concept_set_path.iterdir():
1031            if not item.name.startswith("."):
1032                item.unlink()
1033    else:
1034        concept_set_path.mkdir(parents=True, exist_ok=True)
1035
1036    # write each concept as a separate file
1037    for name, concept in out.groupby("CONCEPT_SET"):
1038        concept = concept.sort_values(by="CONCEPT")  # sort rows
1039        concept = concept.dropna(how="all", axis=1)  # remove empty cols
1040        concept = concept.reindex(
1041            sorted(concept.columns), axis=1
1042        )  # sort cols alphabetically
1043        filename = f"{name}.csv"
1044        concept_path = concept_set_path / filename
1045        concept.to_csv(concept_path, index=False)
1046
1047    write_vocab_version(phen_path)
1048
1049    _logger.info(f"Phenotype processed target code type {target_code_type}")
1050
1051
1052# Add metadata dict to each row of Df codes
1053def add_metadata(
1054    codes: pd.DataFrame,
1055    metadata: dict,
1056    no_metadata: bool,
1057) -> pd.DataFrame:
1058    """Add concept set metadata, stored as a dictionary, to each concept row"""
1059
1060    if not no_metadata:
1061        for meta_name, meta_value in metadata.items():
1062            codes[meta_name] = meta_value
1063            _logger.debug(
1064                f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}"
1065            )
1066
1067    return codes
1068
1069
1070def _generate_version_tag(
1071    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
1072) -> str:
1073    # Get all valid semantic version tags
1074    versions = []
1075    for tag in repo.tags:
1076        if tag.name.startswith("v"):
1077            tag_name = tag.name[1:]  # Remove the first character
1078        else:
1079            tag_name = tag.name
1080        if semver.Version.is_valid(tag_name):
1081            versions.append(semver.Version.parse(tag_name))
1082
1083    _logger.debug(f"Versions: {versions}")
1084    # Determine the next version
1085    if not versions:
1086        new_version = semver.Version(0, 0, 1)
1087    else:
1088        latest_version = max(versions)
1089        if increment == "major":
1090            new_version = latest_version.bump_major()
1091        elif increment == "minor":
1092            new_version = latest_version.bump_minor()
1093        else:
1094            new_version = latest_version.bump_patch()
1095
1096    # Create the new tag
1097    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
1098
1099    return new_version_str
1100
1101
1102def publish(
1103    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1104):
1105    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1106
1107    # Validate config
1108    validate(phen_dir)
1109    phen_path = Path(phen_dir)
1110
1111    # load git repo and set the branch
1112    repo = git.Repo(phen_path)
1113    if DEFAULT_GIT_BRANCH in repo.branches:
1114        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1115        main_branch.checkout()
1116    else:
1117        raise AttributeError(
1118            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1119        )
1120
1121    # check if any changes to publish
1122    if not repo.is_dirty() and not repo.untracked_files:
1123        if remote_url is not None and "origin" not in repo.remotes:
1124            _logger.info(f"First publish to remote url {remote_url}")
1125        else:
1126            _logger.info("Nothing to publish, no changes to the repo")
1127            return
1128
1129    # get next version
1130    new_version_str = _generate_version_tag(repo, increment)
1131    _logger.info(f"New version: {new_version_str}")
1132
1133    # Write version in configuration file
1134    config_path = phen_path / CONFIG_FILE
1135    with config_path.open("r") as file:
1136        config = yaml.safe_load(file)
1137
1138    config["phenotype"]["version"] = new_version_str
1139    with open(config_path, "w") as file:
1140        yaml.dump(
1141            config,
1142            file,
1143            Dumper=util.QuotedDumper,
1144            default_flow_style=False,
1145            sort_keys=False,
1146            default_style='"',
1147        )
1148
1149    # Add and commit changes to repo including version updates
1150    commit_message = f"Committing updates to phenotype {phen_path}"
1151    repo.git.add("--all")
1152    repo.index.commit(commit_message)
1153
1154    # Add tag to the repo
1155    repo.create_tag(new_version_str)
1156
1157    # push to origin if a remote repo
1158    if remote_url is not None and "origin" not in repo.remotes:
1159        git_url = _construct_git_url(remote_url)
1160        repo.create_remote("origin", git_url)
1161
1162    try:
1163        if "origin" in repo.remotes:
1164            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1165            origin = repo.remotes.origin
1166            _logger.info(f"Pushing main branch to remote repo")
1167            repo.git.push("--set-upstream", "origin", "main")
1168            _logger.info(f"Pushing version tags to remote git repo")
1169            origin.push(tags=True)
1170            _logger.debug("Changes pushed to 'origin'")
1171        else:
1172            _logger.debug("Remote 'origin' is not set")
1173    except Exception as e:
1174        tag_ref = repo.tags[new_version_str]
1175        repo.delete_tag(tag_ref)
1176        repo.git.reset("--soft", "HEAD~1")
1177        raise e
1178
1179    _logger.info(f"Phenotype published successfully")
1180
1181
1182def export(phen_dir: str, version: str):
1183    """Exports a phen repo at a specific tagged version into a target directory"""
1184    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1185
1186    # validate configuration
1187    validate(phen_dir)
1188    phen_path = Path(phen_dir)
1189
1190    # load configuration
1191    config_path = phen_path / CONFIG_FILE
1192    with config_path.open("r") as file:
1193        config = yaml.safe_load(file)
1194
1195    map_path = phen_path / MAP_DIR
1196    if not map_path.exists():
1197        _logger.warning(f"Map path does not exist '{map_path}'")
1198
1199    export_path = phen_path / OMOP_PATH
1200    # check export directory exists and if not create it
1201    if not export_path.exists():
1202        export_path.mkdir(parents=True)
1203        _logger.debug(f"OMOP export directory '{export_path}' created.")
1204
1205    # omop export db
1206    export_db_path = omop.export(
1207        map_path,
1208        export_path,
1209        config["phenotype"]["version"],
1210        config["phenotype"]["omop"],
1211    )
1212
1213    _logger.info(f"Phenotype exported successfully")
1214
1215
1216def copy(phen_dir: str, target_dir: str, version: str):
1217    """Copys a phen repo at a specific tagged version into a target directory"""
1218
1219    # Validate
1220    validate(phen_dir)
1221    phen_path = Path(phen_dir)
1222
1223    # Check target directory exists
1224    target_path = Path(target_dir)
1225    if not target_path.exists():
1226        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1227
1228    # Set copy directory
1229    copy_path = target_path / version
1230    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1231
1232    if (
1233        copy_path.exists() and copy_path.is_dir()
1234    ):  # Check if it exists and is a directory
1235        copy = _check_delete_dir(
1236            copy_path,
1237            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1238        )
1239    else:
1240        copy = True
1241
1242    if not copy:
1243        _logger.info(f"Not copying the version {version}")
1244        return
1245
1246    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1247    repo = git.Repo.clone_from(phen_path, copy_path)
1248
1249    # Check out the latest commit or specified version
1250    if version:
1251        # Checkout a specific version (e.g., branch, tag, or commit hash)
1252        _logger.info(f"Checking out version {version}...")
1253        repo.git.checkout(version)
1254    else:
1255        # Checkout the latest commit (HEAD)
1256        _logger.info(f"Checking out the latest commit...")
1257        repo.git.checkout("HEAD")
1258
1259    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1260
1261    _logger.info(f"Phenotype copied successfully")
1262
1263
1264# Convert concept_sets list into dictionaries
1265def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1266    """Extracts concepts as {name: file_path} dictionary and a name set."""
1267    concepts_dict = {
1268        item["name"]: [file["path"] for file in item["files"]]
1269        for item in config_data["phenotype"]["concept_sets"]
1270    }
1271    name_set = set(concepts_dict.keys())
1272    return concepts_dict, name_set
1273
1274
1275def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1276    """
1277    Extracts clean keys from a DeepDiff dictionary.
1278
1279    :param diff: DeepDiff result dictionary
1280    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1281    :return: A set of clean key names
1282    """
1283    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1284
1285
1286def diff_config(old_config: dict, new_config: dict) -> str:
1287    report = f"\n# Changes to phenotype configuration\n"
1288    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1289
1290    old_concepts, old_names = extract_concepts(old_config)
1291    new_concepts, new_names = extract_concepts(new_config)
1292
1293    # Check added and removed concept set names
1294    added_names = new_names - old_names  # Names that appear in new but not in old
1295    removed_names = old_names - new_names  # Names that were in old but not in new
1296
1297    # find file path changes for unchanged names
1298    unchanged_names = old_names & new_names  # Names that exist in both
1299    file_diff = DeepDiff(
1300        {name: old_concepts[name] for name in unchanged_names},
1301        {name: new_concepts[name] for name in unchanged_names},
1302    )
1303
1304    # Find renamed concepts (same file, different name)
1305    renamed_concepts = []
1306    for removed in removed_names:
1307        old_path = old_concepts[removed]
1308        for added in added_names:
1309            new_path = new_concepts[added]
1310            if old_path == new_path:
1311                renamed_concepts.append((removed, added))
1312
1313    # Remove renamed concepts from added and removed sets
1314    for old_name, new_name in renamed_concepts:
1315        added_names.discard(new_name)
1316        removed_names.discard(old_name)
1317
1318    # generate config report
1319    if added_names:
1320        report += "## Added Concepts\n"
1321        for name in added_names:
1322            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1323        report += "\n"
1324
1325    if removed_names:
1326        report += "## Removed Concepts\n"
1327        for name in removed_names:
1328            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1329        report += "\n"
1330
1331    if renamed_concepts:
1332        report += "## Renamed Concepts\n"
1333        for old_name, new_name in renamed_concepts:
1334            report += (
1335                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1336            )
1337        report += "\n"
1338
1339    if "values_changed" in file_diff:
1340        report += "## Updated File Paths\n"
1341        for name, change in file_diff["values_changed"].items():
1342            old_file = change["old_value"]
1343            new_file = change["new_value"]
1344            clean_name = name.split("root['")[1].split("']")[0]
1345            report += (
1346                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1347            )
1348        report += "\n"
1349
1350    if not (
1351        added_names
1352        or removed_names
1353        or renamed_concepts
1354        or file_diff.get("values_changed")
1355    ):
1356        report += "No changes in concept sets.\n"
1357
1358    return report
1359
1360
1361def diff_map_files(
1362    old_map_path: Path,
1363    new_map_path: Path,
1364    output_changed_concepts: bool,
1365    csv_output_path: Path,
1366) -> str:
1367    old_output_files = [
1368        file.name
1369        for file in old_map_path.iterdir()
1370        if file.is_file() and not file.name.startswith(".")
1371    ]
1372    new_output_files = [
1373        file.name
1374        for file in new_map_path.iterdir()
1375        if file.is_file() and not file.name.startswith(".")
1376    ]
1377
1378    # Convert the lists to sets for easy comparison
1379    old_output_set = set(old_output_files)
1380    new_output_set = set(new_output_files)
1381
1382    # Outputs that are in old_output_set but not in new_output_set (removed files)
1383    removed_outputs = old_output_set - new_output_set
1384    # Outputs that are in new_output_set but not in old_output_set (added files)
1385    added_outputs = new_output_set - old_output_set
1386    # Outputs that are the intersection of old_output_set and new_output_set
1387    common_outputs = old_output_set & new_output_set
1388
1389    report = f"\n# Changes to available translations\n"
1390    report += f"This compares the coding translations files available.\n\n"
1391    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1392    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1393    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1394
1395    # Step N: Compare common outputs between versions
1396    report += f"# Changes to concepts in translation files\n\n"
1397    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1398    for file in common_outputs:
1399        old_output = old_map_path / file
1400        new_output = new_map_path / file
1401
1402        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1403        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1404
1405        df1 = pd.read_csv(old_output)
1406        df1_count = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1407        df2 = pd.read_csv(new_output)
1408        df2_count = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1409
1410        # Check for added and removed concepts
1411        report += f"- File {file}\n"
1412        sorted_list = sorted(list(set(df1_count.index) - set(df2_count.index)))
1413        report += f"- Removed concepts {sorted_list}\n"
1414        sorted_list = sorted(list(set(df2_count.index) - set(df1_count.index)))
1415        report += f"- Added concepts {sorted_list}\n"
1416
1417        # Find differences in rows between df1 and df2
1418        out = df1.merge(
1419            df2,
1420            on=["CONCEPT", "CONCEPT_SET"],
1421            how="outer",
1422            suffixes=["_old", "_new"],
1423            indicator=True,
1424        )
1425        out = out[out["_merge"] != "both"]  # only select rows that are different
1426        out["_merge"] = out["_merge"].cat.rename_categories(
1427            {"left_only": "Removed", "right_only": "Added"}
1428        )  # rename categories
1429        out.sort_values(by=["CONCEPT_SET", "_merge"], inplace=True)
1430
1431        # Count the number of added and removed concepts for each concept set
1432        report += f"- Changed concepts:\n"
1433        for concept_set_name, grp in out.groupby("CONCEPT_SET"):
1434            counts = grp["_merge"].value_counts()
1435            report += "\t - {} +{} -{}\n".format(
1436                concept_set_name, counts["Added"], counts["Removed"]
1437            )
1438        report += "\n"
1439
1440        # Write the output to a CSV file
1441        if output_changed_concepts and out.shape[0] > 0:
1442            csv_filename = f"{Path(file).stem}_diff.csv"
1443            csv_filepath = csv_output_path / csv_filename
1444            out.to_csv(csv_filepath, index=False)
1445            _logger.debug(f"CSV of changes written to {str(csv_filepath.resolve())}")
1446
1447    return report
1448
1449
1450def diff_phen(
1451    new_phen_path: Path,
1452    new_version: str,
1453    old_phen_path: Path,
1454    old_version: str,
1455    report_path: Path,
1456    csv_output_path: Path,
1457    not_check_config: bool,
1458    output_changed_concepts: bool,
1459):
1460    """Compare the differences between two versions of a phenotype"""
1461
1462    # write report heading
1463    report = f"# Phenotype Comparison Report\n"
1464
1465    # Step 1: check differences configuration files
1466    if not not_check_config:
1467        # validate phenotypes
1468        _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1469        validate(str(old_phen_path.resolve()))
1470        _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1471        validate(str(new_phen_path.resolve()))
1472
1473        # get old and new config
1474        old_config_path = old_phen_path / CONFIG_FILE
1475        with old_config_path.open("r") as file:
1476            old_config = yaml.safe_load(file)
1477        new_config_path = new_phen_path / CONFIG_FILE
1478        with new_config_path.open("r") as file:
1479            new_config = yaml.safe_load(file)
1480
1481        # write report
1482        report += f"## Original phenotype\n"
1483        report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1484        report += f"  - {old_version}\n"
1485        report += f"  - {str(old_phen_path.resolve())}\n"
1486        report += f"## Changed phenotype:\n"
1487        report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1488        report += f"  - {new_version}\n"
1489        report += f"  - {str(new_phen_path.resolve())}\n"
1490
1491        # Convert list of dicts into a dict: {name: file}
1492        report += diff_config(old_config, new_config)
1493
1494    # Step 2: check differences between map files
1495    # List files from output directories
1496    old_map_path = old_phen_path / MAP_DIR
1497    new_map_path = new_phen_path / MAP_DIR
1498    report += diff_map_files(
1499        old_map_path, new_map_path, output_changed_concepts, csv_output_path
1500    )
1501
1502    # initialise report file
1503    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1504    report_file = open(report_path, "w")
1505    report_file.write(report)
1506    report_file.close()
1507
1508    _logger.info(f"Phenotypes diff'd successfully")
1509
1510
1511def diff(
1512    phen_dir: str,
1513    version: str,
1514    old_phen_dir: str,
1515    old_version: str,
1516    not_check_config: bool,
1517    output_changed_concepts: bool,
1518):
1519    # make tmp directory .acmc
1520    timestamp = time.strftime("%Y%m%d_%H%M%S")
1521    temp_dir = Path(f".acmc/diff_{timestamp}")
1522
1523    changed_phen_path = Path(phen_dir)
1524    if not changed_phen_path.exists():
1525        raise ValueError(
1526            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1527        )
1528
1529    old_phen_path = Path(old_phen_dir)
1530    if not old_phen_path.exists():
1531        raise ValueError(
1532            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1533        )
1534
1535    try:
1536        # Create the directory
1537        temp_dir.mkdir(parents=True, exist_ok=True)
1538        _logger.debug(f"Temporary directory created: {temp_dir}")
1539
1540        # Create temporary directories
1541        changed_path = temp_dir / "changed"
1542        changed_path.mkdir(parents=True, exist_ok=True)
1543        old_path = temp_dir / "old"
1544        old_path.mkdir(parents=True, exist_ok=True)
1545
1546        # checkout changed
1547        if version == "latest":
1548            _logger.debug(
1549                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1550            )
1551            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1552        else:
1553            _logger.debug(
1554                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1555            )
1556            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1557            changed_repo.git.checkout(version)
1558
1559        # checkout old
1560        if old_version == "latest":
1561            _logger.debug(
1562                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1563            )
1564            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1565        else:
1566            _logger.debug(
1567                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1568            )
1569            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1570            old_repo.git.checkout(old_version)
1571
1572        report_filename = f"{version}_{old_version}_diff.md"
1573        report_path = changed_phen_path / report_filename
1574        csv_output_path = changed_phen_path
1575        # diff old with new
1576        diff_phen(
1577            changed_path,
1578            version,
1579            old_path,
1580            old_version,
1581            report_path,
1582            csv_output_path,
1583            not_check_config,
1584            output_changed_concepts,
1585        )
1586
1587    finally:
1588        # clean up tmp directory
1589        if temp_dir.exists():
1590            shutil.rmtree(temp_dir)
PHEN_DIR = 'phen'

Default phenotype directory name

DEFAULT_PHEN_PATH = PosixPath('workspace/phen')

Default phenotype directory path

CONCEPTS_DIR = 'concepts'

Default concepts directory name

MAP_DIR = 'map'

Default map directory name

CONCEPT_SET_DIR = 'concept-sets'

Default concept set directory name

CSV_PATH = PosixPath('concept-sets/csv')

Default CSV concept set directory path

OMOP_PATH = PosixPath('concept-sets/omop')

Default OMOP concept set directory path

DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']

List of default phenotype directories

CONFIG_FILE = 'config.yml'

Default configuration filename

VOCAB_VERSION_FILE = 'vocab_version.yml'

Default vocabulary version filename

SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']

List of semantic version increment types

DEFAULT_VERSION_INC = 'patch'

Default semantic version increment type

DEFAULT_GIT_BRANCH = 'main'

Default phenotype repo branch name

SPLIT_COL_ACTION = 'split_col'

Split column preprocessing action type

CODES_COL_ACTION = 'codes_col'

Codes column preprocessing action type

DIVIDE_COL_ACTION = 'divide_col'

Divide column preprocessing action type

COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']

List of column preprocessing action types

CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']

List of supported source concept coding list file types

CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['read3', 'opcs4', 'snomed', 'atc', 'icd10', 'read2']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'files': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}, 'split_col': {'type': 'string'}, 'codes_col': {'type': 'string'}}}}}}, 'metadata': {'type': 'dict', 'required': False}}}}}}}

Phenotype config.yml schema definition

class PhenValidationException(builtins.Exception):
163class PhenValidationException(Exception):
164    """Custom exception class raised when validation errors in phenotype configuration file"""
165
166    def __init__(self, message, validation_errors=None):
167        super().__init__(message)
168        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
166    def __init__(self, message, validation_errors=None):
167        super().__init__(message)
168        self.validation_errors = validation_errors
validation_errors
def init(phen_dir: str, remote_url: str):
237def init(phen_dir: str, remote_url: str):
238    """Initial phenotype directory as git repo with standard structure"""
239    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
240    phen_path = Path(phen_dir)
241
242    # check if directory already exists and ask user if they want to recreate it
243    if (
244        phen_path.exists() and phen_path.is_dir()
245    ):  # Check if it exists and is a directory
246        configure = _check_delete_dir(
247            phen_path,
248            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
249        )
250    else:
251        configure = True
252
253    if not configure:
254        _logger.info(f"Exiting, phenotype not initiatised")
255        return
256
257    # Initialise repo from local or remote
258    repo: Repo
259
260    # if remote then clone the repo otherwise init a local repo
261    if remote_url != None:
262        # add PAT token to the URL
263        git_url = _construct_git_url(remote_url)
264
265        # clone the repo
266        git_cmd = git.cmd.Git()
267        git_cmd.clone(git_url, phen_path)
268
269        # open repo
270        repo = Repo(phen_path)
271        # check if there are any commits (new repo has no commits)
272        if (
273            len(repo.branches) == 0 or repo.head.is_detached
274        ):  # Handle detached HEAD (e.g., after init)
275            _logger.debug("The phen repository has no commits yet.")
276            commit_count = 0
277        else:
278            # Get the total number of commits in the default branch
279            commit_count = sum(1 for _ in repo.iter_commits())
280            _logger.debug(f"Repo has previous commits: {commit_count}")
281    else:
282        # local repo, create the directories and init
283        phen_path.mkdir(parents=True, exist_ok=True)
284        _logger.debug(f"Phen directory '{phen_path}' has been created.")
285        repo = git.Repo.init(phen_path)
286        commit_count = 0
287
288    phen_path = phen_path.resolve()
289    # initialise empty repos
290    if commit_count == 0:
291        # create initial commit
292        initial_file_path = phen_path / "README.md"
293        with open(initial_file_path, "w") as file:
294            file.write(
295                "# Initial commit\nThis is the first commit in the phen repository.\n"
296            )
297        repo.index.add([initial_file_path])
298        repo.index.commit("Initial commit")
299        commit_count = 1
300
301    # Checkout the phens default branch, creating it if it does not exist
302    if DEFAULT_GIT_BRANCH in repo.branches:
303        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
304        main_branch.checkout()
305    else:
306        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
307        main_branch.checkout()
308
309    # if the phen path does not contain the config file then initialise the phen type
310    config_path = phen_path / CONFIG_FILE
311    if config_path.exists():
312        _logger.debug(f"Phenotype configuration files already exist")
313        return
314
315    _logger.info("Creating phen directory structure and config files")
316    for d in DEFAULT_PHEN_DIR_LIST:
317        _create_empty_git_dir(phen_path / d)
318
319    # create empty phen config file
320    config = {
321        "phenotype": {
322            "version": "0.0.0",
323            "omop": {
324                "vocabulary_id": "",
325                "vocabulary_name": "",
326                "vocabulary_reference": "",
327            },
328            "translate": [],
329            "concept_sets": [],
330        }
331    }
332
333    with open(phen_path / CONFIG_FILE, "w") as file:
334        yaml.dump(
335            config,
336            file,
337            Dumper=util.QuotedDumper,
338            default_flow_style=False,
339            sort_keys=False,
340            default_style='"',
341        )
342
343    # add git ignore
344    ignore_content = """# Ignore SQLite database files
345*.db
346*.sqlite3
347 
348# Ignore SQLite journal and metadata files
349*.db-journal
350*.sqlite3-journal
351
352# python
353.ipynb_checkpoints
354 """
355    ignore_path = phen_path / ".gitignore"
356    with open(ignore_path, "w") as file:
357        file.write(ignore_content)
358
359    # add to git repo and commit
360    for d in DEFAULT_PHEN_DIR_LIST:
361        repo.git.add(phen_path / d)
362    repo.git.add(all=True)
363    repo.index.commit("initialised the phen git repo.")
364
365    _logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
368def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
369    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
370
371    Args:
372        phen_dir (str): local directory path where the upstream repo is to be cloned
373        upstream_url (str): url to the upstream repo
374        upstream_version (str): version in the upstream repo to clone
375        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
376
377    Raises:
378        ValueError: if the specified version is not in the upstream repo
379        ValueError: if the upstream repo is not a valid phenotype repo
380        ValueError: if there's any other problems with Git
381    """
382    _logger.info(
383        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
384    )
385
386    phen_path = Path(phen_dir)
387    # check if directory already exists and ask user if they want to recreate it
388    if (
389        phen_path.exists() and phen_path.is_dir()
390    ):  # Check if it exists and is a directory
391        configure = _check_delete_dir(
392            phen_path,
393            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
394        )
395    else:
396        configure = True
397
398    if not configure:
399        _logger.info(f"Exiting, phenotype not initiatised")
400        return
401
402    try:
403        # Clone repo
404        git_url = _construct_git_url(upstream_url)
405        repo = git.Repo.clone_from(git_url, phen_path)
406
407        # Fetch all branches and tags
408        repo.remotes.origin.fetch()
409
410        # Check if the version exists
411        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
412        if upstream_version not in available_refs:
413            raise ValueError(
414                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
415            )
416
417        # Checkout the specified version
418        repo.git.checkout(upstream_version)
419        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
420        main_branch.checkout()
421
422        # Check if 'config.yml' exists in the root directory
423        config_path = phen_path / "config.yml"
424        if not os.path.isfile(config_path):
425            raise ValueError(
426                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
427            )
428
429        # Validate the phenotype is compatible with the acmc tool
430        validate(str(phen_path.resolve()))
431
432        # Delete each tag locally
433        tags = repo.tags
434        for tag in tags:
435            repo.delete_tag(tag)
436            _logger.debug(f"Deleted tags from forked repo: {tag}")
437
438        # Add upstream remote
439        repo.create_remote("upstream", upstream_url)
440        remote = repo.remotes["origin"]
441        repo.delete_remote(remote)  # Remove existing origin
442
443        # Optionally set a new origin remote
444        if new_origin_url:
445            git_url = _construct_git_url(new_origin_url)
446            repo.create_remote("origin", git_url)
447            repo.git.push("--set-upstream", "origin", "main")
448
449        _logger.info(f"Repository forked successfully at {phen_path}")
450        _logger.info(f"Upstream set to {upstream_url}")
451        if new_origin_url:
452            _logger.info(f"Origin set to {new_origin_url}")
453
454    except Exception as e:
455        if phen_path.exists():
456            shutil.rmtree(phen_path)
457        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Arguments:
  • phen_dir (str): local directory path where the upstream repo is to be cloned
  • upstream_url (str): url to the upstream repo
  • upstream_version (str): version in the upstream repo to clone
  • new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
  • ValueError: if the specified version is not in the upstream repo
  • ValueError: if the upstream repo is not a valid phenotype repo
  • ValueError: if there's any other problems with Git
def validate(phen_dir: str):
460def validate(phen_dir: str):
461    """Validates the phenotype directory is a git repo with standard structure"""
462    _logger.info(f"Validating phenotype: {phen_dir}")
463    phen_path = Path(phen_dir)
464    if not phen_path.is_dir():
465        raise NotADirectoryError(
466            f"Error: '{str(phen_path.resolve())}' is not a directory"
467        )
468
469    config_path = phen_path / CONFIG_FILE
470    if not config_path.is_file():
471        raise FileNotFoundError(
472            f"Error: phen configuration file '{config_path}' does not exist."
473        )
474
475    concepts_path = phen_path / CONCEPTS_DIR
476    if not concepts_path.is_dir():
477        raise FileNotFoundError(
478            f"Error: source concepts directory {concepts_path} does not exist."
479        )
480
481    # Calidate the directory is a git repo
482    try:
483        git.Repo(phen_path)
484    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
485        raise Exception(f"Phen directory {phen_path} is not a git repo")
486
487    # Load configuration File
488    if config_path.suffix == ".yml":
489        try:
490            with config_path.open("r") as file:
491                phenotype = yaml.safe_load(file)
492
493            validator = Validator(CONFIG_SCHEMA)
494            if validator.validate(phenotype):
495                _logger.debug("YAML structure is valid.")
496            else:
497                _logger.error(f"YAML structure validation failed: {validator.errors}")
498                raise Exception(f"YAML structure validation failed: {validator.errors}")
499        except yaml.YAMLError as e:
500            _logger.error(f"YAML syntax error: {e}")
501            raise e
502    else:
503        raise Exception(
504            f"Unsupported configuration filetype: {str(config_path.resolve())}"
505        )
506
507    # initiatise
508    validation_errors = []
509    phenotype = phenotype["phenotype"]
510    code_types = parse.CodeTypeParser().code_types
511
512    # check the version number is of the format vn.n.n
513    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
514    if not match:
515        validation_errors.append(
516            f"Invalid version format in configuration file: {phenotype['version']}"
517        )
518
519    # create a list of all the concept set names defined in the concept set configuration
520    concept_set_names = []
521    for item in phenotype["concept_sets"]:
522        if item["name"] in concept_set_names:
523            validation_errors.append(
524                f"Duplicate concept set defined in concept sets {item['name'] }"
525            )
526        else:
527            concept_set_names.append(item["name"])
528
529    # check codes definition
530    for files in phenotype["concept_sets"]:
531        for item in files["files"]:
532            # check concepte code file exists
533            concept_code_file_path = concepts_path / item["path"]
534            if not concept_code_file_path.exists():
535                validation_errors.append(
536                    f"Coding file {str(concept_code_file_path.resolve())} does not exist"
537                )
538
539            # check concepte code file is not empty
540            if concept_code_file_path.stat().st_size == 0:
541                validation_errors.append(
542                    f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
543                )
544
545            # check code file type is supported
546            if concept_code_file_path.suffix not in CODE_FILE_TYPES:
547                raise ValueError(
548                    f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
549                )
550
551            # check columns specified are a supported medical coding type
552            for column in item["columns"]:
553                if column not in code_types:
554                    validation_errors.append(
555                        f"Column type {column} for file {concept_code_file_path} is not supported"
556                    )
557
558            # check the actions are supported
559            if "actions" in item:
560                for action in item["actions"]:
561                    if action not in COL_ACTIONS:
562                        validation_errors.append(f"Action {action} is not supported")
563
564    if len(validation_errors) > 0:
565        _logger.error(validation_errors)
566        raise PhenValidationException(
567            f"Configuration file {str(config_path.resolve())} failed validation",
568            validation_errors,
569        )
570
571    _logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str, not_translate: bool, do_reverse_translate: bool) -> pandas.core.frame.DataFrame:
665def translate_codes(
666    source_df: pd.DataFrame,
667    target_code_type: str,
668    concept_name: str,
669    not_translate: bool,
670    do_reverse_translate: bool,
671) -> pd.DataFrame:
672    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
673
674    # codes = pd.DataFrame([], dtype=str)
675    codes = pd.DataFrame(
676        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
677    )
678    # Convert codes to target type
679    _logger.info(f"Converting to target code type {target_code_type}")
680
681    for source_code_type in source_df.columns:
682        # if target code type is the same as thet source code type, no translation, just appending source as target
683        if source_code_type == target_code_type:
684            copy_df = pd.DataFrame(
685                {
686                    "SOURCE_CONCEPT": source_df[source_code_type],
687                    "SOURCE_CONCEPT_TYPE": source_code_type,
688                    "CONCEPT": source_df[source_code_type],
689                }
690            )
691            codes = pd.concat([codes, copy_df])
692            _logger.debug(
693                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
694            )
695        elif not not_translate:
696            # get the translation filename using source to target code types
697            filename = f"{source_code_type}_to_{target_code_type}.parquet"
698            map_path = trud.PROCESSED_PATH / filename
699
700            filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet"
701            map_path_reversed = trud.PROCESSED_PATH / filename_reversed
702
703            # do the mapping if it exists
704            if map_path.exists():
705                codes = _translate_codes(map_path, source_df, source_code_type, codes)
706            # otherwise do reverse mapping if enabled and it exists
707            elif do_reverse_translate and map_path_reversed.exists():
708                codes = _translate_codes(
709                    map_path_reversed, source_df, source_code_type, codes, reverse=True
710                )
711            else:
712                _logger.warning(
713                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
714                )
715
716    codes = codes.dropna()  # delete NaNs
717
718    # added concept set type to output if any translations
719    if len(codes.index) > 0:
720        codes["CONCEPT_SET"] = concept_name
721    else:
722        _logger.debug(f"No codes converted with target code type {target_code_type}")
723
724    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def write_vocab_version(phen_path: pathlib.Path):
770def write_vocab_version(phen_path: Path):
771    # write the vocab version files
772
773    if not trud.VERSION_PATH.exists():
774        raise FileNotFoundError(
775            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
776        )
777
778    if not omop.VERSION_PATH.exists():
779        raise FileNotFoundError(
780            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
781        )
782
783    with trud.VERSION_PATH.open("r") as file:
784        trud_version = yaml.safe_load(file)
785
786    with omop.VERSION_PATH.open("r") as file:
787        omop_version = yaml.safe_load(file)
788
789    # Create the combined YAML structure
790    version_data = {
791        "versions": {
792            "acmc": acmc.__version__,
793            "trud": trud_version,
794            "omop": omop_version,
795        }
796    }
797
798    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
799        yaml.dump(
800            version_data,
801            file,
802            Dumper=util.QuotedDumper,
803            default_flow_style=False,
804            sort_keys=False,
805            default_style='"',
806        )
def map( phen_dir: str, target_code_type: str, not_translate: bool, no_metadata: bool, do_reverse_translate: bool):
809def map(
810    phen_dir: str,
811    target_code_type: str,
812    not_translate: bool,
813    no_metadata: bool,
814    do_reverse_translate: bool,
815):
816    _logger.info(f"Processing phenotype: {phen_dir}")
817
818    # Validate configuration
819    validate(phen_dir)
820
821    # initialise paths
822    phen_path = Path(phen_dir)
823    config_path = phen_path / CONFIG_FILE
824
825    # load configuration
826    with config_path.open("r") as file:
827        config = yaml.safe_load(file)
828    phenotype = config["phenotype"]
829
830    if len(phenotype["map"]) == 0:
831        raise ValueError(f"No map codes defined in the phenotype configuration")
832
833    if target_code_type is not None and target_code_type not in phenotype["map"]:
834        raise ValueError(
835            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
836        )
837
838    if target_code_type is not None:
839        _map_target_code_type(
840            phen_path,
841            phenotype,
842            target_code_type,
843            not_translate,
844            no_metadata,
845            do_reverse_translate,
846        )
847    else:
848        for t in phenotype["map"]:
849            _map_target_code_type(
850                phen_path,
851                phenotype,
852                t,
853                not_translate,
854                no_metadata,
855                do_reverse_translate,
856            )
857
858    _logger.info(f"Phenotype processed successfully")
def add_metadata( codes: pandas.core.frame.DataFrame, metadata: dict, no_metadata: bool) -> pandas.core.frame.DataFrame:
1054def add_metadata(
1055    codes: pd.DataFrame,
1056    metadata: dict,
1057    no_metadata: bool,
1058) -> pd.DataFrame:
1059    """Add concept set metadata, stored as a dictionary, to each concept row"""
1060
1061    if not no_metadata:
1062        for meta_name, meta_value in metadata.items():
1063            codes[meta_name] = meta_value
1064            _logger.debug(
1065                f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}"
1066            )
1067
1068    return codes

Add concept set metadata, stored as a dictionary, to each concept row

def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
1103def publish(
1104    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1105):
1106    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1107
1108    # Validate config
1109    validate(phen_dir)
1110    phen_path = Path(phen_dir)
1111
1112    # load git repo and set the branch
1113    repo = git.Repo(phen_path)
1114    if DEFAULT_GIT_BRANCH in repo.branches:
1115        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1116        main_branch.checkout()
1117    else:
1118        raise AttributeError(
1119            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1120        )
1121
1122    # check if any changes to publish
1123    if not repo.is_dirty() and not repo.untracked_files:
1124        if remote_url is not None and "origin" not in repo.remotes:
1125            _logger.info(f"First publish to remote url {remote_url}")
1126        else:
1127            _logger.info("Nothing to publish, no changes to the repo")
1128            return
1129
1130    # get next version
1131    new_version_str = _generate_version_tag(repo, increment)
1132    _logger.info(f"New version: {new_version_str}")
1133
1134    # Write version in configuration file
1135    config_path = phen_path / CONFIG_FILE
1136    with config_path.open("r") as file:
1137        config = yaml.safe_load(file)
1138
1139    config["phenotype"]["version"] = new_version_str
1140    with open(config_path, "w") as file:
1141        yaml.dump(
1142            config,
1143            file,
1144            Dumper=util.QuotedDumper,
1145            default_flow_style=False,
1146            sort_keys=False,
1147            default_style='"',
1148        )
1149
1150    # Add and commit changes to repo including version updates
1151    commit_message = f"Committing updates to phenotype {phen_path}"
1152    repo.git.add("--all")
1153    repo.index.commit(commit_message)
1154
1155    # Add tag to the repo
1156    repo.create_tag(new_version_str)
1157
1158    # push to origin if a remote repo
1159    if remote_url is not None and "origin" not in repo.remotes:
1160        git_url = _construct_git_url(remote_url)
1161        repo.create_remote("origin", git_url)
1162
1163    try:
1164        if "origin" in repo.remotes:
1165            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1166            origin = repo.remotes.origin
1167            _logger.info(f"Pushing main branch to remote repo")
1168            repo.git.push("--set-upstream", "origin", "main")
1169            _logger.info(f"Pushing version tags to remote git repo")
1170            origin.push(tags=True)
1171            _logger.debug("Changes pushed to 'origin'")
1172        else:
1173            _logger.debug("Remote 'origin' is not set")
1174    except Exception as e:
1175        tag_ref = repo.tags[new_version_str]
1176        repo.delete_tag(tag_ref)
1177        repo.git.reset("--soft", "HEAD~1")
1178        raise e
1179
1180    _logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1183def export(phen_dir: str, version: str):
1184    """Exports a phen repo at a specific tagged version into a target directory"""
1185    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1186
1187    # validate configuration
1188    validate(phen_dir)
1189    phen_path = Path(phen_dir)
1190
1191    # load configuration
1192    config_path = phen_path / CONFIG_FILE
1193    with config_path.open("r") as file:
1194        config = yaml.safe_load(file)
1195
1196    map_path = phen_path / MAP_DIR
1197    if not map_path.exists():
1198        _logger.warning(f"Map path does not exist '{map_path}'")
1199
1200    export_path = phen_path / OMOP_PATH
1201    # check export directory exists and if not create it
1202    if not export_path.exists():
1203        export_path.mkdir(parents=True)
1204        _logger.debug(f"OMOP export directory '{export_path}' created.")
1205
1206    # omop export db
1207    export_db_path = omop.export(
1208        map_path,
1209        export_path,
1210        config["phenotype"]["version"],
1211        config["phenotype"]["omop"],
1212    )
1213
1214    _logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1217def copy(phen_dir: str, target_dir: str, version: str):
1218    """Copys a phen repo at a specific tagged version into a target directory"""
1219
1220    # Validate
1221    validate(phen_dir)
1222    phen_path = Path(phen_dir)
1223
1224    # Check target directory exists
1225    target_path = Path(target_dir)
1226    if not target_path.exists():
1227        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1228
1229    # Set copy directory
1230    copy_path = target_path / version
1231    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1232
1233    if (
1234        copy_path.exists() and copy_path.is_dir()
1235    ):  # Check if it exists and is a directory
1236        copy = _check_delete_dir(
1237            copy_path,
1238            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1239        )
1240    else:
1241        copy = True
1242
1243    if not copy:
1244        _logger.info(f"Not copying the version {version}")
1245        return
1246
1247    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1248    repo = git.Repo.clone_from(phen_path, copy_path)
1249
1250    # Check out the latest commit or specified version
1251    if version:
1252        # Checkout a specific version (e.g., branch, tag, or commit hash)
1253        _logger.info(f"Checking out version {version}...")
1254        repo.git.checkout(version)
1255    else:
1256        # Checkout the latest commit (HEAD)
1257        _logger.info(f"Checking out the latest commit...")
1258        repo.git.checkout("HEAD")
1259
1260    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1261
1262    _logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1266def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1267    """Extracts concepts as {name: file_path} dictionary and a name set."""
1268    concepts_dict = {
1269        item["name"]: [file["path"] for file in item["files"]]
1270        for item in config_data["phenotype"]["concept_sets"]
1271    }
1272    name_set = set(concepts_dict.keys())
1273    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def diff_config(old_config: dict, new_config: dict) -> str:
1287def diff_config(old_config: dict, new_config: dict) -> str:
1288    report = f"\n# Changes to phenotype configuration\n"
1289    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1290
1291    old_concepts, old_names = extract_concepts(old_config)
1292    new_concepts, new_names = extract_concepts(new_config)
1293
1294    # Check added and removed concept set names
1295    added_names = new_names - old_names  # Names that appear in new but not in old
1296    removed_names = old_names - new_names  # Names that were in old but not in new
1297
1298    # find file path changes for unchanged names
1299    unchanged_names = old_names & new_names  # Names that exist in both
1300    file_diff = DeepDiff(
1301        {name: old_concepts[name] for name in unchanged_names},
1302        {name: new_concepts[name] for name in unchanged_names},
1303    )
1304
1305    # Find renamed concepts (same file, different name)
1306    renamed_concepts = []
1307    for removed in removed_names:
1308        old_path = old_concepts[removed]
1309        for added in added_names:
1310            new_path = new_concepts[added]
1311            if old_path == new_path:
1312                renamed_concepts.append((removed, added))
1313
1314    # Remove renamed concepts from added and removed sets
1315    for old_name, new_name in renamed_concepts:
1316        added_names.discard(new_name)
1317        removed_names.discard(old_name)
1318
1319    # generate config report
1320    if added_names:
1321        report += "## Added Concepts\n"
1322        for name in added_names:
1323            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1324        report += "\n"
1325
1326    if removed_names:
1327        report += "## Removed Concepts\n"
1328        for name in removed_names:
1329            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1330        report += "\n"
1331
1332    if renamed_concepts:
1333        report += "## Renamed Concepts\n"
1334        for old_name, new_name in renamed_concepts:
1335            report += (
1336                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1337            )
1338        report += "\n"
1339
1340    if "values_changed" in file_diff:
1341        report += "## Updated File Paths\n"
1342        for name, change in file_diff["values_changed"].items():
1343            old_file = change["old_value"]
1344            new_file = change["new_value"]
1345            clean_name = name.split("root['")[1].split("']")[0]
1346            report += (
1347                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1348            )
1349        report += "\n"
1350
1351    if not (
1352        added_names
1353        or removed_names
1354        or renamed_concepts
1355        or file_diff.get("values_changed")
1356    ):
1357        report += "No changes in concept sets.\n"
1358
1359    return report
def diff_map_files( old_map_path: pathlib.Path, new_map_path: pathlib.Path, output_changed_concepts: bool, csv_output_path: pathlib.Path) -> str:
1362def diff_map_files(
1363    old_map_path: Path,
1364    new_map_path: Path,
1365    output_changed_concepts: bool,
1366    csv_output_path: Path,
1367) -> str:
1368    old_output_files = [
1369        file.name
1370        for file in old_map_path.iterdir()
1371        if file.is_file() and not file.name.startswith(".")
1372    ]
1373    new_output_files = [
1374        file.name
1375        for file in new_map_path.iterdir()
1376        if file.is_file() and not file.name.startswith(".")
1377    ]
1378
1379    # Convert the lists to sets for easy comparison
1380    old_output_set = set(old_output_files)
1381    new_output_set = set(new_output_files)
1382
1383    # Outputs that are in old_output_set but not in new_output_set (removed files)
1384    removed_outputs = old_output_set - new_output_set
1385    # Outputs that are in new_output_set but not in old_output_set (added files)
1386    added_outputs = new_output_set - old_output_set
1387    # Outputs that are the intersection of old_output_set and new_output_set
1388    common_outputs = old_output_set & new_output_set
1389
1390    report = f"\n# Changes to available translations\n"
1391    report += f"This compares the coding translations files available.\n\n"
1392    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1393    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1394    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1395
1396    # Step N: Compare common outputs between versions
1397    report += f"# Changes to concepts in translation files\n\n"
1398    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1399    for file in common_outputs:
1400        old_output = old_map_path / file
1401        new_output = new_map_path / file
1402
1403        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1404        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1405
1406        df1 = pd.read_csv(old_output)
1407        df1_count = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1408        df2 = pd.read_csv(new_output)
1409        df2_count = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1410
1411        # Check for added and removed concepts
1412        report += f"- File {file}\n"
1413        sorted_list = sorted(list(set(df1_count.index) - set(df2_count.index)))
1414        report += f"- Removed concepts {sorted_list}\n"
1415        sorted_list = sorted(list(set(df2_count.index) - set(df1_count.index)))
1416        report += f"- Added concepts {sorted_list}\n"
1417
1418        # Find differences in rows between df1 and df2
1419        out = df1.merge(
1420            df2,
1421            on=["CONCEPT", "CONCEPT_SET"],
1422            how="outer",
1423            suffixes=["_old", "_new"],
1424            indicator=True,
1425        )
1426        out = out[out["_merge"] != "both"]  # only select rows that are different
1427        out["_merge"] = out["_merge"].cat.rename_categories(
1428            {"left_only": "Removed", "right_only": "Added"}
1429        )  # rename categories
1430        out.sort_values(by=["CONCEPT_SET", "_merge"], inplace=True)
1431
1432        # Count the number of added and removed concepts for each concept set
1433        report += f"- Changed concepts:\n"
1434        for concept_set_name, grp in out.groupby("CONCEPT_SET"):
1435            counts = grp["_merge"].value_counts()
1436            report += "\t - {} +{} -{}\n".format(
1437                concept_set_name, counts["Added"], counts["Removed"]
1438            )
1439        report += "\n"
1440
1441        # Write the output to a CSV file
1442        if output_changed_concepts and out.shape[0] > 0:
1443            csv_filename = f"{Path(file).stem}_diff.csv"
1444            csv_filepath = csv_output_path / csv_filename
1445            out.to_csv(csv_filepath, index=False)
1446            _logger.debug(f"CSV of changes written to {str(csv_filepath.resolve())}")
1447
1448    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path, csv_output_path: pathlib.Path, not_check_config: bool, output_changed_concepts: bool):
1451def diff_phen(
1452    new_phen_path: Path,
1453    new_version: str,
1454    old_phen_path: Path,
1455    old_version: str,
1456    report_path: Path,
1457    csv_output_path: Path,
1458    not_check_config: bool,
1459    output_changed_concepts: bool,
1460):
1461    """Compare the differences between two versions of a phenotype"""
1462
1463    # write report heading
1464    report = f"# Phenotype Comparison Report\n"
1465
1466    # Step 1: check differences configuration files
1467    if not not_check_config:
1468        # validate phenotypes
1469        _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1470        validate(str(old_phen_path.resolve()))
1471        _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1472        validate(str(new_phen_path.resolve()))
1473
1474        # get old and new config
1475        old_config_path = old_phen_path / CONFIG_FILE
1476        with old_config_path.open("r") as file:
1477            old_config = yaml.safe_load(file)
1478        new_config_path = new_phen_path / CONFIG_FILE
1479        with new_config_path.open("r") as file:
1480            new_config = yaml.safe_load(file)
1481
1482        # write report
1483        report += f"## Original phenotype\n"
1484        report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1485        report += f"  - {old_version}\n"
1486        report += f"  - {str(old_phen_path.resolve())}\n"
1487        report += f"## Changed phenotype:\n"
1488        report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1489        report += f"  - {new_version}\n"
1490        report += f"  - {str(new_phen_path.resolve())}\n"
1491
1492        # Convert list of dicts into a dict: {name: file}
1493        report += diff_config(old_config, new_config)
1494
1495    # Step 2: check differences between map files
1496    # List files from output directories
1497    old_map_path = old_phen_path / MAP_DIR
1498    new_map_path = new_phen_path / MAP_DIR
1499    report += diff_map_files(
1500        old_map_path, new_map_path, output_changed_concepts, csv_output_path
1501    )
1502
1503    # initialise report file
1504    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1505    report_file = open(report_path, "w")
1506    report_file.write(report)
1507    report_file.close()
1508
1509    _logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff( phen_dir: str, version: str, old_phen_dir: str, old_version: str, not_check_config: bool, output_changed_concepts: bool):
1512def diff(
1513    phen_dir: str,
1514    version: str,
1515    old_phen_dir: str,
1516    old_version: str,
1517    not_check_config: bool,
1518    output_changed_concepts: bool,
1519):
1520    # make tmp directory .acmc
1521    timestamp = time.strftime("%Y%m%d_%H%M%S")
1522    temp_dir = Path(f".acmc/diff_{timestamp}")
1523
1524    changed_phen_path = Path(phen_dir)
1525    if not changed_phen_path.exists():
1526        raise ValueError(
1527            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1528        )
1529
1530    old_phen_path = Path(old_phen_dir)
1531    if not old_phen_path.exists():
1532        raise ValueError(
1533            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1534        )
1535
1536    try:
1537        # Create the directory
1538        temp_dir.mkdir(parents=True, exist_ok=True)
1539        _logger.debug(f"Temporary directory created: {temp_dir}")
1540
1541        # Create temporary directories
1542        changed_path = temp_dir / "changed"
1543        changed_path.mkdir(parents=True, exist_ok=True)
1544        old_path = temp_dir / "old"
1545        old_path.mkdir(parents=True, exist_ok=True)
1546
1547        # checkout changed
1548        if version == "latest":
1549            _logger.debug(
1550                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1551            )
1552            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1553        else:
1554            _logger.debug(
1555                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1556            )
1557            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1558            changed_repo.git.checkout(version)
1559
1560        # checkout old
1561        if old_version == "latest":
1562            _logger.debug(
1563                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1564            )
1565            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1566        else:
1567            _logger.debug(
1568                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1569            )
1570            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1571            old_repo.git.checkout(old_version)
1572
1573        report_filename = f"{version}_{old_version}_diff.md"
1574        report_path = changed_phen_path / report_filename
1575        csv_output_path = changed_phen_path
1576        # diff old with new
1577        diff_phen(
1578            changed_path,
1579            version,
1580            old_path,
1581            old_version,
1582            report_path,
1583            csv_output_path,
1584            not_check_config,
1585            output_changed_concepts,
1586        )
1587
1588    finally:
1589        # clean up tmp directory
1590        if temp_dir.exists():
1591            shutil.rmtree(temp_dir)