Source code for challengeutils.submission

"""Functions that interact with submissions"""
import os
import re
import sys
import time
from typing import Union

import pandas as pd
import synapseutils
from synapseclient import (
    AUTHENTICATED_USERS,
    entity,
    Project,
    Synapse,
    SubmissionViewSchema,
)
from synapseclient.annotations import to_submission_status_annotations
from synapseclient.core.exceptions import SynapseHTTPError
from synapseclient.core.utils import id_of

from . import dockertools
from . import permissions
from . import utils
from . import annotations

WORKFLOW_LAST_UPDATED_KEY = (
    "orgSagebionetworksSynapseWorkflowOrchestratorWorkflowLastUpdated"
)
WORKFLOW_START_KEY = "orgSagebionetworksSynapseWorkflowOrchestratorExecutionStarted"
TIME_REMAINING_KEY = "orgSagebionetworksSynapseWorkflowOrchestratorTimeRemaining"


[docs]def append_writeup_to_main_submission(row, syn): """ Helper function that appends the write up synapse id and archived write up synapse id on the main submission Args: row: Dictionary row['team'], row['objectId'], row['archived'], row['entityId'] syn: synapse object """ if pd.isnull(row["archived"]): print("NO WRITEUP: " + row["submitterId"]) else: status = syn.getSubmissionStatus(row["objectId"]) add_writeup_dict = { "writeUp": row["entityId"], "archivedWriteUp": row["archived"], } add_writeup = to_submission_status_annotations( add_writeup_dict, is_private=False ) new_status = utils.update_single_submission_status(status, add_writeup) syn.store(new_status)
[docs]def attach_writeup(syn, writeup_queueid, submission_queueid): """ Attach the write up to the submission queue Args: writeup_queueid: Write up evaluation queue id submission_queueid: Submission queue id """ writeups = list( utils.evaluation_queue_query( syn, "select submitterId, entityId, archived from evaluation_{} " "where writeup_status == 'VALIDATED'".format(writeup_queueid), ) ) submissions = list( utils.evaluation_queue_query( syn, "select objectId, submitterId from evaluation_{} " "where prediction_file_status == 'SCORED'".format(submission_queueid), ) ) writeupsdf = pd.DataFrame(writeups) submissionsdf = pd.DataFrame(submissions) submissions_with_writeupsdf = submissionsdf.merge( writeupsdf, on="submitterId", how="left" ) submissions_with_writeupsdf.apply( lambda row: append_writeup_to_main_submission(row, syn), axis=1 )
[docs]def validate_project(syn, submission, challenge, public=False, admin=None): """ Validate a Project submission. Args: submission - submission ID challenge - Synapse ID of Challenge wiki public - Project should be public (default: False) admin - (optional) Project should be shared with this username/ID Returns: submission_errors (str) - error messages ("" if none found) submission_status (str) - "VALIDATED"/"INVALID" """ writeup = syn.getSubmission(submission) errors = [] type_error = _validate_ent_type(writeup) if type_error: errors.append(type_error) contents_error = _validate_project_id(writeup, challenge) if contents_error: errors.append(contents_error) permissions_error = _check_project_permissions(syn, writeup, public, admin) errors.extend(permissions_error) status = "INVALID" if errors else "VALIDATED" return {"submission_errors": "\n".join(errors), "submission_status": status}
[docs]def archive_project(syn, submission, admin): """ Make a copy (archive) of the Project submission. Args: submission - submission ID admin - user who will own the archived project """ writeup = syn.getSubmission(submission) name = writeup.entity.name.replace("&", "+").replace("'", "") curr_time = int(round(time.time() * 1000)) new_project = Project( f"Archived {name} {curr_time} {writeup.id} " + f"{writeup.entityId}" ) archive = syn.store(new_project) permissions.set_entity_permissions(syn, archive, admin, "admin") archived = synapseutils.copy(syn, writeup.entityId, archive.id) return {"archived": archived.get(writeup.entityId)}
# TODO: move to utils module def _validate_ent_type(submission): """Check entity type of submission.""" try: if not isinstance(submission.entity, entity.Project): ent_type = re.search(r"entity\.(.*?)'", str(type(submission.entity))).group( 1 ) return f"Submission should be a Synapse project, not a {ent_type}." except AttributeError: return "Unknown entity type; please submit a Synapse project." else: return "" def _validate_project_id(proj, challenge): """Check that submission is not the Challenge site.""" return ( "Submission should not be the Challenge site." if proj.entityId == challenge else "" ) def _validate_public_permissions(syn, proj): """Ensure project is shared with the public.""" error = "Your project is not publicly available." try: # Remove error message if the project is accessible by the public. public_perms = syn.getPermissions(proj.entityId) if "READ" in public_perms: error = "" except SynapseHTTPError as e: # Raise exception message if error is not a permissions error. if e.response.status_code != 403: raise e return error def _validate_admin_permissions(syn, proj, admin): """Ensure project is shared with the given admin.""" error = ( "Project is private; please update its sharing settings." f" Writeup should be shared with {admin}." ) try: # Remove error message if admin has read and download permissions # OR if the project is publicly availably. admin_perms = syn.getPermissions(proj.entityId, admin) public_perms = syn.getPermissions(proj.entityId) if "READ" in public_perms or ( "READ" in admin_perms and "DOWNLOAD" in admin_perms ): error = "" except SynapseHTTPError as e: # Raise exception message if error is not a permissions error. if e.response.status_code != 403: raise e return error def _check_project_permissions(syn, submission, public, admin): """Check the submission sharing settings.""" errors = [] if public: public_error = _validate_public_permissions(syn, submission) if public_error: errors.append(public_error) if not public and admin is not None: admin_error = _validate_admin_permissions(syn, submission, admin) if admin_error: errors.append(admin_error) return errors
[docs]def validate_docker_submission(syn, submissionid): """Validates Synapse docker repository + sha digest submission This function requires users to have a synapse config file using synapse username and password Args: syn: Synapse connection submissionid: Submission id Returns: True if valid, False if not """ # Uses synapse config path config = syn.getConfigFile(syn.configPath) authen = dict(config.items("authentication")) if authen.get("username") is None or authen.get("password") is None: raise ValueError("Synapse config file must have username and password") docker_sub = syn.getSubmission(submissionid) docker_repository = docker_sub.get("dockerRepositoryName") docker_digest = docker_sub.get("dockerDigest") if docker_repository is None or docker_digest is None: raise ValueError("Submission is not a Docker submission") docker_repo = docker_repository.replace("docker.synapse.org/", "") valid = dockertools.validate_docker( docker_repo=docker_repo, docker_digest=docker_digest, index_endpoint=dockertools.ENDPOINT_MAPPING["synapse"], username=authen["username"], password=authen["password"], ) return valid
[docs]def get_submitterid_from_submission_id(syn, submissionid, queue, verbose=False): """Gets submitterid from submission id Args: syn: Synapse connection submissionid: Submission id queue: Evaluation queue id verbose: Boolean value to print Returns: Submitter id """ query = "select submitterId from evaluation_{} " "where objectId == '{}'".format( queue, submissionid ) generator = utils.evaluation_queue_query(syn, query) lst = list(generator) if not lst: raise ValueError("submission id {} not in queue".format(submissionid)) submission_dict = lst[0] submitterid = submission_dict["submitterId"] if verbose: print("submitterid: " + submitterid) return submitterid
[docs]def get_submitters_lead_submission( syn, submitterid, queue, cutoff_annotation, verbose=False ): """Gets submitter's lead submission Args: submitterid: Submitter id queue: Evaluation queue id cutoff_annotation: Boolean cutoff annotation key verbose: Boolean value to print Returns: previous_submission.csv or None """ query = ( "select objectId from evaluation_{} where submitterId == '{}' " "and prediction_file_status == 'SCORED' and {} == 'true' " "order by createdOn DESC".format(queue, submitterid, cutoff_annotation) ) generator = utils.evaluation_queue_query(syn, query) lst = list(generator) if lst: sub_dict = lst[0] objectid = sub_dict["objectId"] if verbose: print("Dowloading submissionid: " + objectid) sub = syn.getSubmission(objectid, downloadLocation=".") os.rename(sub.filePath, "previous_submission.csv") return "previous_submission.csv" print("Downloading no file") return None
[docs]def download_current_lead_sub( syn, submissionid, status, cutoff_annotation, verbose=False ): """Download the current leading submission for boot ladder boot method Args: syn: Synapse connection submissionid: Submission id status: Submission status cutoff_annotation: Boolean cutoff annotation key verbose: Boolean value to print Returns: Path to current leading submission or None """ if status == "VALIDATED": current_sub = syn.getSubmission(submissionid, downloadFile=False) queue_num = current_sub["evaluationId"] submitterid = get_submitterid_from_submission_id( syn, submissionid, queue_num, verbose ) path = get_submitters_lead_submission( syn, submitterid, queue_num, cutoff_annotation, verbose ) return path return None
[docs]def stop_submission_over_quota( syn: Synapse, submission_view: Union[str, SubmissionViewSchema], quota: int = sys.maxsize, ): """Stops any submission that has exceeded the run time quota by using submission views. A submission view must first exist. Rerunning submissions will require setting TimeRemaining annotation to a positive integer. Args: syn: Synapse connection submission_view: Submission View or its Synapse Id. quota: Quota in milliseconds. Default is sys.maxsize. One hour is 3600000. """ if not isinstance(quota, int): raise ValueError("quota must be an integer") if quota <= 0: raise ValueError("quota must be larger than 0") try: view_query = syn.tableQuery( f"select {WORKFLOW_LAST_UPDATED_KEY}, {WORKFLOW_START_KEY}, id, " f"status from {id_of(submission_view)} where " "status = 'EVALUATION_IN_PROGRESS'" ) except SynapseHTTPError as http_error: raise ValueError( "Submission view must have columns: " f"{WORKFLOW_LAST_UPDATED_KEY}, {WORKFLOW_START_KEY}, id" ) from http_error view_querydf = view_query.asDataFrame() runtime = view_querydf[WORKFLOW_LAST_UPDATED_KEY] - view_querydf[WORKFLOW_START_KEY] submissions_over_quota_idx = runtime > quota over_quotadf = view_querydf[submissions_over_quota_idx] for index, row in over_quotadf.iterrows(): add_annotations = {TIME_REMAINING_KEY: 0} annotations.annotate_submission( syn, row["id"], add_annotations, is_private=False, force=True )