Skip to content

Input to Database

bin.input_to_database

Script to crawl Synapse folder for a center, validate, and update database tables.

Attributes

logger = logging.getLogger(__name__) module-attribute

parser = argparse.ArgumentParser(description='GENIE center ') module-attribute

args = parser.parse_args() module-attribute

Functions

main(process, project_id, center=None, delete_old=False, only_validate=False, oncotree_link=None, genie_annotation_pkg=None, create_new_maf_database=False, debug=False, format_registry=None)

Invoke the GENIE ETL pipeline from data input files to synapse tables

PARAMETER DESCRIPTION
process

main or mutation processing

TYPE: str

project_id

Synapse project id that houses GENIE project

TYPE: str

center

GENIE center. Defaults to None.

TYPE: str DEFAULT: None

delete_old

True to delete all old input/processed files. Defaults to False.

TYPE: bool DEFAULT: False

only_validate

True if only validate files. Defaults to False.

TYPE: bool DEFAULT: False

oncotree_link

Link to oncotree version. Defaults to None.

TYPE: str DEFAULT: None

genie_annotation_pkg

vcf/maf conversion tools. Defaults to None.

TYPE: str DEFAULT: None

create_new_maf_database

To create new maf table. Defaults to False.

TYPE: bool DEFAULT: False

debug

Debug mode. Defaults to False.

TYPE: bool DEFAULT: False

format_registry

File format registry python package. Defaults to None.

TYPE: str DEFAULT: None

RAISES DESCRIPTION
ValueError

If invalid center name is specified

Exception

If processing is already happening.

Source code in bin/input_to_database.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def main(
    process: str,
    project_id: str,
    center=None,
    delete_old=False,
    only_validate=False,
    oncotree_link=None,
    genie_annotation_pkg=None,
    create_new_maf_database=False,
    debug=False,
    format_registry=None,
):
    """Invoke the GENIE ETL pipeline from data input files to synapse tables

    Args:
        process (str): main or mutation processing
        project_id (str): Synapse project id that houses GENIE project
        center (str, optional): GENIE center. Defaults to None.
        delete_old (bool, optional): True to delete all old input/processed files.
                                     Defaults to False.
        only_validate (bool, optional): True if only validate files. Defaults to False.
        oncotree_link (str, optional): Link to oncotree version. Defaults to None.
        genie_annotation_pkg (str, optional): vcf/maf conversion tools.
                                              Defaults to None.
        create_new_maf_database (bool, optional): To create new maf table.
                                                  Defaults to False.
        debug (bool, optional): Debug mode. Defaults to False.
        format_registry (str, optional): File format registry python package.
                                         Defaults to None.

    Raises:
        ValueError: If invalid center name is specified
        Exception: If processing is already happening.
    """

    syn = process_functions.synapse_login(debug=debug)

    # Get project GENIE configurations
    genie_config = extract.get_genie_config(syn=syn, project_id=project_id)

    # Filter for specific center
    if center is not None:
        if center not in genie_config["center_config"].keys():
            raise ValueError(
                "Must specify one of these centers: {}".format(
                    ", ".join(genie_config["center_config"].keys())
                )
            )
        centers = [center]
    else:
        # TODO: add in logic to exclude sites from processing
        centers = list(genie_config["center_config"].keys())

    # HACK: Modify oncotree link config
    if oncotree_link is None:
        onco_link_ent = syn.get(genie_config["oncotreeLink"])
        oncotree_link = onco_link_ent.externalURL
    genie_config["oncotreeLink"] = oncotree_link
    # Check if you can connect to oncotree link,
    # if not then don't run validation / processing
    process_functions.checkUrl(genie_config["oncotreeLink"])

    # HACK: Add genie annotation package to config
    if process == "mutation" and genie_annotation_pkg is None:
        raise ValueError("Must define genie annotation pkg if mutation processing")
    genie_config["genie_annotation_pkg"] = genie_annotation_pkg

    # HACK: This is essential, because Synapse has concurrency update issues
    center_mapping_ent = syn.get(genie_config["centerMapping"])
    if center_mapping_ent.get("isProcessing", ["True"])[0] == "True":
        raise Exception(
            "Processing/validation is currently happening.  Please change/add the "
            f"'isProcessing' annotation on {genie_config['centerMapping']} "
            "to False to enable processing"
        )
    else:
        center_mapping_ent.isProcessing = "True"
        center_mapping_ent = syn.store(center_mapping_ent)

    # HACK: Create new maf database, should only happen once if its specified
    # Will modify genie configuration
    if create_new_maf_database:
        today = date.today()
        table_name = f"Narrow MAF Database - {today}"
        # filetype = "vcf2maf"
        # syn7208886 is the GENIE staging project to archive maf table
        new_tables = process_functions.create_new_fileformat_table(
            syn, "vcf2maf", table_name, project_id, "syn7208886"
        )
        syn.setPermissions(new_tables["newdb_ent"].id, 3326313, [])
        genie_config["vcf2maf"] = new_tables["newdb_ent"].id

    # Get file format classes
    format_registry = config.collect_format_types(args.format_registry_packages)

    # Start GENIE processing
    for process_center in centers:
        # Check if the genie genome nexus is up, if not then don't run
        # processing
        process_functions.checkUrl("http://genie.genomenexus.org/")
        input_to_database.center_input_to_database(
            syn=syn,
            project_id=project_id,
            center=process_center,
            process=process,
            only_validate=only_validate,
            delete_old=delete_old,
            format_registry=format_registry,
            genie_config=genie_config,
        )

    # HACK: To ensure that this is the new entity
    center_mapping_ent = syn.get(genie_config["centerMapping"])
    center_mapping_ent.isProcessing = "False"
    # No need to return ent variable because it is unused
    syn.store(center_mapping_ent)

    error_tracker_synid = genie_config["errorTracker"]
    # Only write out invalid reasons if the center
    # isnt specified and if only validate
    if center is None and only_validate:
        logger.info("WRITING INVALID REASONS TO CENTER STAGING DIRS")
        write_invalid_reasons.write(
            syn, genie_config["centerMapping"], error_tracker_synid
        )
    logger.info("INPUT TO DATABASE COMPLETE")