Skip to content

load

genie.load

This module contains all the functions that stores data to Synapse

Attributes

__version__ = '17.0.0' module-attribute

logger = logging.getLogger(__name__) module-attribute

Functions

store_file(syn, filepath, parentid, name=None, annotations=None, used=None, version_comment=None)

Stores file into Synapse

PARAMETER DESCRIPTION
syn

Synapse connection

TYPE: Synapse

filepath

Path to file

TYPE: str

parentid

Synapse Id of a folder or project

TYPE: str

name

Name of entity. Defaults to basename of your file path.

TYPE: str DEFAULT: None

annotations

Synapse annotations to the File Entity. Defaults to None.

TYPE: Dict DEFAULT: None

used

Entities used to generate file. Defaults to None.

TYPE: List[str] DEFAULT: None

version_comment

File Entity version comment. Defaults to None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
File

synapseclient.File: Synapse File entity

Source code in genie/load.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def store_file(
    syn: synapseclient.Synapse,
    filepath: str,
    parentid: str,
    name: Optional[str] = None,
    annotations: Optional[Dict] = None,
    used: Optional[Union[List[str], str]] = None,
    version_comment: Optional[str] = None,
) -> synapseclient.File:
    """Stores file into Synapse

    Args:
        syn (synapseclient.Synapse): Synapse connection
        filepath (str): Path to file
        parentid (str): Synapse Id of a folder or project
        name (str, optional): Name of entity. Defaults to basename of your file path.
        annotations (Dict, optional): Synapse annotations to the File Entity. Defaults to None.
        used (List[str], optional): Entities used to generate file. Defaults to None.
        version_comment (str, optional): File Entity version comment. Defaults to None.

    Returns:
        synapseclient.File: Synapse File entity
    """
    file_ent = synapseclient.File(
        filepath, parentId=parentid, versionComment=version_comment
    )
    if name is not None:
        file_ent.name = name
    if annotations is not None:
        file_ent.annotations = annotations
    file_ent = syn.store(
        file_ent,
        used=used,
        executed=f"https://github.com/Sage-Bionetworks/Genie/tree/v{__version__}",
    )
    return file_ent

store_files(syn, filepaths, parentid)

Stores a list of files

PARAMETER DESCRIPTION
syn

Synapse connection

TYPE: Synapse

filepaths

List of filepaths

TYPE: List[str]

parentid

Synapse Id of a folder or project

TYPE: str

RETURNS DESCRIPTION
List[File]

List[synapseclient.File]: List of Synaps File entities

Source code in genie/load.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def store_files(
    syn: synapseclient.Synapse, filepaths: List[str], parentid: str
) -> List[synapseclient.File]:
    """Stores a list of files

    Args:
        syn (synapseclient.Synapse): Synapse connection
        filepaths (List[str]): List of filepaths
        parentid (str): Synapse Id of a folder or project

    Returns:
        List[synapseclient.File]: List of Synaps File entities
    """
    file_entities = [
        store_file(syn=syn, filepath=path, parentid=parentid) for path in filepaths
    ]
    return file_entities

store_table(syn, filepath, tableid)

Stores a tsv to a Synapse Table. This can append, update, and delete rows in a Synapse table depending on how the tsv file is formatted.

PARAMETER DESCRIPTION
syn

Synapse connection

TYPE: Synapse

filepath

Path to a TSV

TYPE: str

tableid

Synapse Id of a Synapse Table

TYPE: str

Source code in genie/load.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def store_table(syn: synapseclient.Synapse, filepath: str, tableid: str):
    """Stores a tsv to a Synapse Table.  This can append, update, and delete
    rows in a Synapse table depending on how the tsv file is formatted.

    Args:
        syn (synapseclient.Synapse): Synapse connection
        filepath (str): Path to a TSV
        tableid (str): Synapse Id of a Synapse Table

    """
    try:
        update_table = synapseclient.Table(tableid, filepath, separator="\t")
        syn.store(update_table)
    except SynapseTimeoutError:
        # This error occurs because of waiting for table to index.
        # Don't worry about this error
        pass

update_process_trackingdf(syn, process_trackerdb_synid, center, process_type, start=True)

Updates the processing tracking database

PARAMETER DESCRIPTION
syn

Synapse connection

TYPE: Synapse

process_trackerdb_synid

Synapse Id of Process Tracking Table

TYPE: str

center

GENIE center (ie. SAGE)

TYPE: str

process_type

processing type (dbToStage or public)

TYPE: str

start

Start or end of processing. Default is True for start

TYPE: bool DEFAULT: True

Source code in genie/load.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def update_process_trackingdf(
    syn: synapseclient.Synapse,
    process_trackerdb_synid: str,
    center: str,
    process_type: str,
    start: bool = True,
):
    """
    Updates the processing tracking database

    Args:
        syn (synapseclient.Synapse): Synapse connection
        process_trackerdb_synid: Synapse Id of Process Tracking Table
        center: GENIE center (ie. SAGE)
        process_type: processing type (dbToStage or public)
        start: Start or end of processing.  Default is True for start
    """
    logger.info("UPDATE PROCESS TRACKING TABLE")
    column = "timeStartProcessing" if start else "timeEndProcessing"
    query_str = (
        f"SELECT {column} FROM {process_trackerdb_synid} where center = '{center}' "
        f"and processingType = '{process_type}'"
    )
    process_trackerdf = extract.get_syntabledf(syn=syn, query_string=query_str)
    process_trackerdf[column].iloc[0] = str(int(time.time() * 1000))
    syn.store(synapseclient.Table(process_trackerdb_synid, process_trackerdf))

update_table(syn, databaseSynId, newData, filterBy, filterByColumn='CENTER', col=None, toDelete=False)

Update Synapse table given a new dataframe

PARAMETER DESCRIPTION
syn

Synapse connection

TYPE: Synapse

databaseSynId

Synapse Id of Synapse Table

TYPE: str

newData

New data in a dataframe

TYPE: DataFrame

filterBy

Value to filter new data by

TYPE: str

filterByColumn

Column to filter values by. Defaults to "CENTER".

TYPE: str DEFAULT: 'CENTER'

col

List of columns to ingest. Defaults to None.

TYPE: List[str] DEFAULT: None

toDelete

Delete rows given the primary key. Defaults to False.

TYPE: bool DEFAULT: False

Source code in genie/load.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def update_table(
    syn: synapseclient.Synapse,
    databaseSynId: str,
    newData: pd.DataFrame,
    filterBy: str,
    filterByColumn: str = "CENTER",
    col: Optional[List[str]] = None,
    toDelete: bool = False,
):
    """Update Synapse table given a new dataframe

    Args:
        syn (Synapse): Synapse connection
        databaseSynId (str): Synapse Id of Synapse Table
        newData (pd.DataFrame): New data in a dataframe
        filterBy (str): Value to filter new data by
        filterByColumn (str, optional): Column to filter values by. Defaults to "CENTER".
        col (List[str], optional): List of columns to ingest. Defaults to None.
        toDelete (bool, optional): Delete rows given the primary key. Defaults to False.
    """
    databaseEnt = syn.get(databaseSynId)
    database = syn.tableQuery(
        f"SELECT * FROM {databaseSynId} where {filterByColumn} ='{filterBy}'"
    )
    database = database.asDataFrame()
    db_cols = set(database.columns)
    if col is not None:
        new_data_cols = set(col)
        # Make sure columns from file exists in database columns
        use_cols = db_cols.intersection(new_data_cols)
        # No need to fail, because there is bound to be at least one
        # column that will exist in the database
        database = database[list(use_cols)]
    else:
        newData = newData[database.columns]
    _update_table(
        syn=syn,
        database=database,
        new_dataset=newData,
        database_synid=databaseSynId,
        primary_key_cols=databaseEnt.primaryKey,
        to_delete=toDelete,
    )

_update_table(syn, database, new_dataset, database_synid, primary_key_cols, to_delete=False)

A helper function to compare new dataset with existing data, and store any changes that need to be made to the database

Source code in genie/load.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def _update_table(
    syn: synapseclient.Synapse,
    database: pd.DataFrame,
    new_dataset: pd.DataFrame,
    database_synid: str,
    primary_key_cols: List[str],
    to_delete: bool = False,
):
    """
    A helper function to compare new dataset with existing data,
    and store any changes that need to be made to the database
    """
    changes = check_database_changes(database, new_dataset, primary_key_cols, to_delete)
    store_database(
        syn,
        database_synid,
        changes["col_order"],
        changes["allupdates"],
        changes["to_delete_rows"],
    )

_get_col_order(orig_database_cols)

Get column order

PARAMETER DESCRIPTION
orig_database_cols

A list of column names of the original database

TYPE: Index

RETURNS DESCRIPTION
List[str]

The list of re-ordered column names

Source code in genie/load.py
196
197
198
199
200
201
202
203
204
205
206
207
208
def _get_col_order(orig_database_cols: pd.Index) -> List[str]:
    """
    Get column order

    Args:
        orig_database_cols (pd.Index): A list of column names of the original database

    Returns:
        The list of re-ordered column names
    """
    col_order = ["ROW_ID", "ROW_VERSION"]
    col_order.extend(orig_database_cols.tolist())
    return col_order

_reorder_new_dataset(orig_database_cols, new_dataset)

Reorder new dataset based on the original database

PARAMETER DESCRIPTION
orig_database_cols

A list of column names of the original database

TYPE: Index

new_dataset

New Data

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame

The re-ordered new dataset

Source code in genie/load.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def _reorder_new_dataset(
    orig_database_cols: pd.Index, new_dataset: pd.DataFrame
) -> pd.DataFrame:
    """
    Reorder new dataset based on the original database

    Args:
        orig_database_cols (pd.Index): A list of column names of the original database
        new_dataset(pd.DataFrame): New Data

    Returns:
        The re-ordered new dataset
    """
    # Columns must be in the same order as the original data
    new_dataset = new_dataset[orig_database_cols]
    return new_dataset

_generate_primary_key(dataset, primary_key_cols, primary_key)

Generate primary key column a dataframe

PARAMETER DESCRIPTION
dataset

A dataframe

TYPE: DataFrame

primary_key_cols

Column(s) that make up the primary key

TYPE: List[str]

primary_key

The column name of the primary_key

TYPE: str

RETURNS DESCRIPTION
DataFrame

pd.DataFrame: The dataframe with primary_key column added

Source code in genie/load.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def _generate_primary_key(
    dataset: pd.DataFrame, primary_key_cols: List[str], primary_key: str
) -> pd.DataFrame:
    """Generate primary key column a dataframe

    Args:
        dataset (pd.DataFrame): A dataframe
        primary_key_cols (List[str]): Column(s) that make up the primary key
        primary_key (str): The column name of the primary_key

    Returns:
        pd.DataFrame: The dataframe with primary_key column added
    """
    # replace NAs with emtpy string
    dataset = dataset.fillna("")
    # generate primary key column for original database
    dataset[primary_key_cols] = dataset[primary_key_cols].applymap(str)
    if dataset.empty:
        dataset[primary_key] = ""
    else:
        dataset[primary_key] = dataset[primary_key_cols].apply(
            lambda x: " ".join(x), axis=1
        )
    return dataset

check_database_changes(database, new_dataset, primary_key_cols, to_delete=False)

Check changes that need to be made, i.e. append/update/delete rows to the database based on its comparison with new data

PARAMETER DESCRIPTION
database

Original Data

TYPE: DataFrame

new_dataset

New Data

TYPE: DataFrame

primary_key_cols

Column(s) that make up the primary key

TYPE: list

to_delete

Delete rows. Defaults to False

TYPE: bool DEFAULT: False

Source code in genie/load.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def check_database_changes(
    database: pd.DataFrame,
    new_dataset: pd.DataFrame,
    primary_key_cols: List[str],
    to_delete: bool = False,
) -> Dict[pd.DataFrame, List[str]]:
    """
    Check changes that need to be made, i.e. append/update/delete rows to the database
    based on its comparison with new data

    Args:
        database (pd.DataFrame): Original Data
        new_dataset (pd.DataFrame): New Data
        primary_key_cols (list): Column(s) that make up the primary key
        to_delete (bool, optional): Delete rows. Defaults to False
    """
    # get a list of column names of the original database
    orig_database_cols = database.columns
    # get the final column order
    col_order = _get_col_order(orig_database_cols)
    # reorder new_dataset
    new_dataset = _reorder_new_dataset(orig_database_cols, new_dataset)
    # set the primary_key name
    primary_key = "UNIQUE_KEY"
    # generate primary_key column for dataset comparison
    ori_data = _generate_primary_key(database, primary_key_cols, primary_key)
    new_data = _generate_primary_key(new_dataset, primary_key_cols, primary_key)
    # output dictionary
    changes = {"col_order": col_order, "allupdates": None, "to_delete_rows": None}
    # get rows to be appened or updated
    allupdates = pd.DataFrame(columns=col_order)
    to_append_rows = process_functions._append_rows(new_data, ori_data, primary_key)
    to_update_rows = process_functions._update_rows(new_data, ori_data, primary_key)
    allupdates = pd.concat([allupdates, to_append_rows, to_update_rows], sort=False)
    changes["allupdates"] = allupdates
    # get rows to be deleted
    if to_delete:
        to_delete_rows = process_functions._delete_rows(new_data, ori_data, primary_key)
    else:
        to_delete_rows = pd.DataFrame()
    changes["to_delete_rows"] = to_delete_rows
    return changes

store_database(syn, database_synid, col_order, all_updates, to_delete_rows)

Store changes to the database

PARAMETER DESCRIPTION
syn

Synapse object

TYPE: Synapse

database_synid

Synapse Id of the Synapse table

TYPE: str

col_order

The ordered column names to be saved

TYPE: List[str]

all_updates

rows to be appended and/or updated

TYPE: DataFrame

to_deleted_rows

rows to be deleted

TYPE: DataFrame

Source code in genie/load.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def store_database(
    syn: synapseclient.Synapse,
    database_synid: str,
    col_order: List[str],
    all_updates: pd.DataFrame,
    to_delete_rows: pd.DataFrame,
) -> None:
    """
    Store changes to the database

    Args:
        syn (synapseclient.Synapse): Synapse object
        database_synid (str): Synapse Id of the Synapse table
        col_order (List[str]): The ordered column names to be saved
        all_updates (pd.DataFrame): rows to be appended and/or updated
        to_deleted_rows (pd.DataFrame): rows to be deleted
    """
    storedatabase = False
    update_all_file = tempfile.NamedTemporaryFile(
        dir=process_functions.SCRIPT_DIR, delete=False
    )
    with open(update_all_file.name, "w") as updatefile:
        # Must write out the headers in case there are no appends or updates
        updatefile.write(",".join(col_order) + "\n")
        if not all_updates.empty:
            """
            This is done because of pandas typing.
            An integer column with one NA/blank value
            will be cast as a double.
            """
            updatefile.write(
                all_updates[col_order]
                .to_csv(index=False, header=None)
                .replace(".0,", ",")
                .replace(".0\n", "\n")
            )
            storedatabase = True
        if not to_delete_rows.empty:
            updatefile.write(
                to_delete_rows.to_csv(index=False, header=None)
                .replace(".0,", ",")
                .replace(".0\n", "\n")
            )
            storedatabase = True
    if storedatabase:
        syn.store(synapseclient.Table(database_synid, update_all_file.name))
    # Delete the update file
    os.unlink(update_all_file.name)

_copyRecursive(syn, entity, destinationId, mapping=None, skipCopyAnnotations=False, **kwargs)

NOTE: This is a copy of the function found here: https://github.com/Sage-Bionetworks/synapsePythonClient/blob/develop/synapseutils/copy_functions.py#L409 This was copied because there is a restriction that doesn't allow for copying entities with access requirements

Recursively copies synapse entites, but does not copy the wikis

PARAMETER DESCRIPTION
syn

A Synapse object with user's login

TYPE: Synapse

entity

A synapse entity ID

TYPE: str

destinationId

Synapse ID of a folder/project that the copied entity is being copied to

TYPE: str

mapping

A mapping of the old entities to the new entities

TYPE: Dict[str, str] DEFAULT: None

skipCopyAnnotations

Skips copying the annotations Default is False

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Dict[str, str]

a mapping between the original and copied entity: {'syn1234':'syn33455'}

Source code in genie/load.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def _copyRecursive(
    syn: synapseclient.Synapse,
    entity: str,
    destinationId: str,
    mapping: Dict[str, str] = None,
    skipCopyAnnotations: bool = False,
    **kwargs,
) -> Dict[str, str]:
    """
    NOTE: This is a copy of the function found here: https://github.com/Sage-Bionetworks/synapsePythonClient/blob/develop/synapseutils/copy_functions.py#L409
    This was copied because there is a restriction that doesn't allow for copying entities with access requirements

    Recursively copies synapse entites, but does not copy the wikis

    Arguments:
        syn: A Synapse object with user's login
        entity: A synapse entity ID
        destinationId: Synapse ID of a folder/project that the copied entity is being copied to
        mapping: A mapping of the old entities to the new entities
        skipCopyAnnotations: Skips copying the annotations
                                Default is False

    Returns:
        a mapping between the original and copied entity: {'syn1234':'syn33455'}
    """

    version = kwargs.get("version", None)
    setProvenance = kwargs.get("setProvenance", "traceback")
    excludeTypes = kwargs.get("excludeTypes", [])
    updateExisting = kwargs.get("updateExisting", False)
    if mapping is None:
        mapping = dict()
    # Check that passed in excludeTypes is file, table, and link
    if not isinstance(excludeTypes, list):
        raise ValueError("Excluded types must be a list")
    elif not all([i in ["file", "link", "table"] for i in excludeTypes]):
        raise ValueError(
            "Excluded types can only be a list of these values: file, table, and link"
        )

    ent = syn.get(entity, downloadFile=False)
    if ent.id == destinationId:
        raise ValueError("destinationId cannot be the same as entity id")

    if (isinstance(ent, Project) or isinstance(ent, Folder)) and version is not None:
        raise ValueError("Cannot specify version when copying a project of folder")

    if not isinstance(ent, (Project, Folder, File, Link, Schema, Entity)):
        raise ValueError("Not able to copy this type of file")

    permissions = syn.restGET("/entity/{}/permissions".format(ent.id))
    # Don't copy entities without DOWNLOAD permissions
    if not permissions["canDownload"]:
        syn.logger.warning(
            "%s not copied - this file lacks download permission" % ent.id
        )
        return mapping

    # HACK: These lines of code were removed to allow for data with access requirements to be copied
    # https://github.com/Sage-Bionetworks/synapsePythonClient/blob/2909fa778e814f62f6fe6ce2d951ce58c0080a4e/synapseutils/copy_functions.py#L464-L470

    copiedId = None

    if isinstance(ent, Project):
        project = syn.get(destinationId)
        if not isinstance(project, Project):
            raise ValueError(
                "You must give a destinationId of a new project to copy projects"
            )
        copiedId = destinationId
        # Projects include Docker repos, and Docker repos cannot be copied
        # with the Synapse rest API. Entity views currently also aren't
        # supported
        entities = syn.getChildren(
            entity, includeTypes=["folder", "file", "table", "link"]
        )
        for i in entities:
            mapping = _copyRecursive(
                syn,
                i["id"],
                destinationId,
                mapping=mapping,
                skipCopyAnnotations=skipCopyAnnotations,
                **kwargs,
            )

        if not skipCopyAnnotations:
            project.annotations = ent.annotations
            syn.store(project)
    elif isinstance(ent, Folder):
        copiedId = synu.copy_functions._copyFolder(
            syn,
            ent.id,
            destinationId,
            mapping=mapping,
            skipCopyAnnotations=skipCopyAnnotations,
            **kwargs,
        )
    elif isinstance(ent, File) and "file" not in excludeTypes:
        copiedId = synu.copy_functions._copyFile(
            syn,
            ent.id,
            destinationId,
            version=version,
            updateExisting=updateExisting,
            setProvenance=setProvenance,
            skipCopyAnnotations=skipCopyAnnotations,
        )
    elif isinstance(ent, Link) and "link" not in excludeTypes:
        copiedId = synu.copy_functions._copyLink(
            syn, ent.id, destinationId, updateExisting=updateExisting
        )
    elif isinstance(ent, Schema) and "table" not in excludeTypes:
        copiedId = synu.copy_functions._copyTable(
            syn, ent.id, destinationId, updateExisting=updateExisting
        )
    # This is currently done because copyLink returns None sometimes
    if copiedId is not None:
        mapping[ent.id] = copiedId
        syn.logger.info("Copied %s to %s" % (ent.id, copiedId))
    else:
        syn.logger.info("%s not copied" % ent.id)
    return mapping