Skip to content

Workflow

genie_registry.workflow

Attributes

logger = logging.getLogger(__name__) module-attribute

Classes

FileTypeFormat

Source code in genie/example_filetype_format.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class FileTypeFormat(metaclass=ABCMeta):
    _process_kwargs = ["newPath", "databaseSynId"]

    _fileType = "fileType"

    _validation_kwargs: List[str] = []

    def __init__(
        self,
        syn: synapseclient.Synapse,
        center: str,
        genie_config: Optional[dict] = None,
        ancillary_files: Optional[List[List[synapseclient.Entity]]] = None,
    ):
        """A validator helper class for a center's files.

        Args:
            syn (synapseclient.Synapse): a synapseclient.Synapse object
            center (str): The participating center name.
            genie_config (dict): The configurations needed for the GENIE codebase.
                GENIE table type/name to Synapse Id. Defaults to None.
            ancillary_files (List[List[synapseclient.Entity]]): all files downloaded for validation. Defaults to None.
        """
        self.syn = syn
        self.center = center
        self.genie_config = genie_config
        self.ancillary_files = ancillary_files

        # self.pool = multiprocessing.Pool(poolSize)

    def _get_dataframe(self, filePathList):
        """
        This function by defaults assumes the filePathList is length of 1
        and is a tsv file.  Could change depending on file type.

        Args:
            filePathList:  A list of file paths (Max is 2 for the two
                           clinical files)

        Returns:
            df: Pandas dataframe of file
        """
        filePath = filePathList[0]
        df = pd.read_csv(filePath, sep="\t", comment="#")
        return df

    def read_file(self, filePathList):
        """
        Each file is to be read in for validation and processing.
        This is not to be changed in any functions.

        Args:
            filePathList:  A list of file paths (Max is 2 for the two
                           clinical files)

        Returns:
            df: Pandas dataframe of file
        """
        df = self._get_dataframe(filePathList)
        return df

    def _validateFilename(self, filePath):
        """
        Function that changes per file type for validating its filename
        Expects an assertion error.

        Args:
            filePath: Path to file
        """
        # assert True
        raise NotImplementedError

    def validateFilename(self, filePath):
        """
        Validation of file name.  The filename is what maps the file
        to its validation and processing.

        Args:
            filePath: Path to file

        Returns:
            str: file type defined by self._fileType
        """
        self._validateFilename(filePath)
        return self._fileType

    def process_steps(self, df, **kwargs):
        """
        This function is modified for every single file.
        It reformats the file and stores the file into database and Synapse.
        """
        pass

    def preprocess(self, newpath):
        """
        This is for any preprocessing that has to occur to the entity name
        to add to kwargs for processing.  entity name is included in
        the new path

        Args:
            newpath: Path to file
        """
        return dict()

    def process(self, filePath, **kwargs):
        """
        This is the main processing function.

        Args:
            filePath: Path to file
            kwargs: The kwargs are determined by self._process_kwargs

        Returns:
            str: file path of processed file
        """
        preprocess_args = self.preprocess(kwargs.get("newPath"))
        kwargs.update(preprocess_args)
        mykwargs = {}
        for required_parameter in self._process_kwargs:
            assert required_parameter in kwargs.keys(), (
                "%s not in parameter list" % required_parameter
            )
            mykwargs[required_parameter] = kwargs[required_parameter]
        logger.info("PROCESSING %s" % filePath)
        # This is done because the clinical files are being merged into a list
        if self._fileType == "clinical":
            path_or_df = self.read_file(filePath)
        # If file type is vcf or maf file, processing requires a filepath
        elif self._fileType not in ["vcf", "maf", "mafSP", "md"]:
            path_or_df = self.read_file([filePath])
        else:
            path_or_df = filePath
        path = self.process_steps(path_or_df, **mykwargs)
        return path

    def _validate(self, df: pd.DataFrame, **kwargs) -> tuple:
        """
        This is the base validation function.
        By default, no validation occurs.

        Args:
            df (pd.DataFrame): A dataframe of the file
            kwargs: The kwargs are determined by self._validation_kwargs

        Returns:
            tuple: The errors and warnings as a file from validation.
                   Defaults to blank strings
        """
        errors = ""
        warnings = ""
        logger.info("NO VALIDATION for %s files" % self._fileType)
        return errors, warnings

    def _cross_validate(self, df: pd.DataFrame) -> tuple:
        """
        This is the base cross-validation function.
        By default, no cross-validation occurs.

        Args:
            df (pd.DataFrame): A dataframe of the file

        Returns:
            tuple: The errors and warnings as a file from cross-validation.
                   Defaults to blank strings
        """
        errors = ""
        warnings = ""
        logger.info("NO CROSS-VALIDATION for %s files" % self._fileType)
        return errors, warnings

    def validate(self, filePathList, **kwargs) -> ValidationResults:
        """
        This is the main validation function.
        Every file type calls self._validate, which is different.

        Args:
            filePathList: A list of file paths.
            kwargs: The kwargs are determined by self._validation_kwargs

        Returns:
            tuple: The errors and warnings as a file from validation.
        """
        mykwargs = {}
        for required_parameter in self._validation_kwargs:
            assert required_parameter in kwargs.keys(), (
                "%s not in parameter list" % required_parameter
            )
            mykwargs[required_parameter] = kwargs[required_parameter]

        errors = ""

        try:
            df = self.read_file(filePathList)
        except Exception as e:
            errors = (
                f"The file(s) ({filePathList}) cannot be read. Original error: {str(e)}"
            )
            warnings = ""

        if not errors:
            logger.info("VALIDATING %s" % os.path.basename(",".join(filePathList)))
            errors, warnings = self._validate(df, **mykwargs)
            # only cross-validate if validation passes or ancillary files exist
            # Assumes that self.ancillary_files won't be [[]] due to whats returned from
            # extract.get_center_input_files
            if not errors and (
                isinstance(self.ancillary_files, list) and self.ancillary_files
            ):
                logger.info(
                    "CROSS-VALIDATING %s" % os.path.basename(",".join(filePathList))
                )
                errors_cross_validate, warnings_cross_validate = self._cross_validate(
                    df
                )
                errors += errors_cross_validate
                warnings += warnings_cross_validate

        result_cls = ValidationResults(errors=errors, warnings=warnings)
        return result_cls
Functions
__init__(syn, center, genie_config=None, ancillary_files=None)

A validator helper class for a center's files.

PARAMETER DESCRIPTION
syn

a synapseclient.Synapse object

TYPE: Synapse

center

The participating center name.

TYPE: str

genie_config

The configurations needed for the GENIE codebase. GENIE table type/name to Synapse Id. Defaults to None.

TYPE: dict DEFAULT: None

ancillary_files

all files downloaded for validation. Defaults to None.

TYPE: List[List[Entity]] DEFAULT: None

Source code in genie/example_filetype_format.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self,
    syn: synapseclient.Synapse,
    center: str,
    genie_config: Optional[dict] = None,
    ancillary_files: Optional[List[List[synapseclient.Entity]]] = None,
):
    """A validator helper class for a center's files.

    Args:
        syn (synapseclient.Synapse): a synapseclient.Synapse object
        center (str): The participating center name.
        genie_config (dict): The configurations needed for the GENIE codebase.
            GENIE table type/name to Synapse Id. Defaults to None.
        ancillary_files (List[List[synapseclient.Entity]]): all files downloaded for validation. Defaults to None.
    """
    self.syn = syn
    self.center = center
    self.genie_config = genie_config
    self.ancillary_files = ancillary_files
read_file(filePathList)

Each file is to be read in for validation and processing. This is not to be changed in any functions.

PARAMETER DESCRIPTION
filePathList

A list of file paths (Max is 2 for the two clinical files)

RETURNS DESCRIPTION
df

Pandas dataframe of file

Source code in genie/example_filetype_format.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def read_file(self, filePathList):
    """
    Each file is to be read in for validation and processing.
    This is not to be changed in any functions.

    Args:
        filePathList:  A list of file paths (Max is 2 for the two
                       clinical files)

    Returns:
        df: Pandas dataframe of file
    """
    df = self._get_dataframe(filePathList)
    return df
validateFilename(filePath)

Validation of file name. The filename is what maps the file to its validation and processing.

PARAMETER DESCRIPTION
filePath

Path to file

RETURNS DESCRIPTION
str

file type defined by self._fileType

Source code in genie/example_filetype_format.py
125
126
127
128
129
130
131
132
133
134
135
136
137
def validateFilename(self, filePath):
    """
    Validation of file name.  The filename is what maps the file
    to its validation and processing.

    Args:
        filePath: Path to file

    Returns:
        str: file type defined by self._fileType
    """
    self._validateFilename(filePath)
    return self._fileType
process_steps(df, **kwargs)

This function is modified for every single file. It reformats the file and stores the file into database and Synapse.

Source code in genie/example_filetype_format.py
139
140
141
142
143
144
def process_steps(self, df, **kwargs):
    """
    This function is modified for every single file.
    It reformats the file and stores the file into database and Synapse.
    """
    pass
preprocess(newpath)

This is for any preprocessing that has to occur to the entity name to add to kwargs for processing. entity name is included in the new path

PARAMETER DESCRIPTION
newpath

Path to file

Source code in genie/example_filetype_format.py
146
147
148
149
150
151
152
153
154
155
def preprocess(self, newpath):
    """
    This is for any preprocessing that has to occur to the entity name
    to add to kwargs for processing.  entity name is included in
    the new path

    Args:
        newpath: Path to file
    """
    return dict()
process(filePath, **kwargs)

This is the main processing function.

PARAMETER DESCRIPTION
filePath

Path to file

kwargs

The kwargs are determined by self._process_kwargs

DEFAULT: {}

RETURNS DESCRIPTION
str

file path of processed file

Source code in genie/example_filetype_format.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def process(self, filePath, **kwargs):
    """
    This is the main processing function.

    Args:
        filePath: Path to file
        kwargs: The kwargs are determined by self._process_kwargs

    Returns:
        str: file path of processed file
    """
    preprocess_args = self.preprocess(kwargs.get("newPath"))
    kwargs.update(preprocess_args)
    mykwargs = {}
    for required_parameter in self._process_kwargs:
        assert required_parameter in kwargs.keys(), (
            "%s not in parameter list" % required_parameter
        )
        mykwargs[required_parameter] = kwargs[required_parameter]
    logger.info("PROCESSING %s" % filePath)
    # This is done because the clinical files are being merged into a list
    if self._fileType == "clinical":
        path_or_df = self.read_file(filePath)
    # If file type is vcf or maf file, processing requires a filepath
    elif self._fileType not in ["vcf", "maf", "mafSP", "md"]:
        path_or_df = self.read_file([filePath])
    else:
        path_or_df = filePath
    path = self.process_steps(path_or_df, **mykwargs)
    return path
validate(filePathList, **kwargs)

This is the main validation function. Every file type calls self._validate, which is different.

PARAMETER DESCRIPTION
filePathList

A list of file paths.

kwargs

The kwargs are determined by self._validation_kwargs

DEFAULT: {}

RETURNS DESCRIPTION
tuple

The errors and warnings as a file from validation.

TYPE: ValidationResults

Source code in genie/example_filetype_format.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def validate(self, filePathList, **kwargs) -> ValidationResults:
    """
    This is the main validation function.
    Every file type calls self._validate, which is different.

    Args:
        filePathList: A list of file paths.
        kwargs: The kwargs are determined by self._validation_kwargs

    Returns:
        tuple: The errors and warnings as a file from validation.
    """
    mykwargs = {}
    for required_parameter in self._validation_kwargs:
        assert required_parameter in kwargs.keys(), (
            "%s not in parameter list" % required_parameter
        )
        mykwargs[required_parameter] = kwargs[required_parameter]

    errors = ""

    try:
        df = self.read_file(filePathList)
    except Exception as e:
        errors = (
            f"The file(s) ({filePathList}) cannot be read. Original error: {str(e)}"
        )
        warnings = ""

    if not errors:
        logger.info("VALIDATING %s" % os.path.basename(",".join(filePathList)))
        errors, warnings = self._validate(df, **mykwargs)
        # only cross-validate if validation passes or ancillary files exist
        # Assumes that self.ancillary_files won't be [[]] due to whats returned from
        # extract.get_center_input_files
        if not errors and (
            isinstance(self.ancillary_files, list) and self.ancillary_files
        ):
            logger.info(
                "CROSS-VALIDATING %s" % os.path.basename(",".join(filePathList))
            )
            errors_cross_validate, warnings_cross_validate = self._cross_validate(
                df
            )
            errors += errors_cross_validate
            warnings += warnings_cross_validate

    result_cls = ValidationResults(errors=errors, warnings=warnings)
    return result_cls

workflow

Bases: FileTypeFormat

Source code in genie_registry/workflow.py
11
12
13
14
15
16
17
18
19
20
21
22
23
class workflow(FileTypeFormat):
    _fileType = "md"

    _process_kwargs = ["databaseSynId"]

    def _validateFilename(self, filePath):
        assert os.path.basename(filePath[0]).startswith(
            self.center
        ) and os.path.basename(filePath[0]).endswith(".md")

    def process_steps(self, filePath, databaseSynId):
        self.syn.store(synapseclient.File(filePath, parent=databaseSynId))
        return filePath