Module synapsemonitor.monitor

Monitor Synapse Project

Expand source code
"""Monitor Synapse Project"""
from datetime import datetime, timedelta
from dateutil import tz
import logging
import typing

import pandas as pd
import synapseclient
from synapseclient import EntityViewSchema, EntityViewType, Synapse


def create_file_view(
    syn: Synapse, name: str, project_id: str, scope_ids: typing.List[str]
) -> EntityViewSchema:
    """Creates a file view that will list all the File entities under
    the specified scopes (Synapse Folders or Projects). This will
    allow you to query for the files contained in your specified scopes.
    This will NOT track the other entities currently: PROJECT, TABLE,
    FOLDER, VIEW, DOCKER.

    Args:
        syn: Synapse connection
        name: File view name
        project_id: Synapse project id to store your file view
        scope_ids: List of Folder or Project synapse Ids

    Returns:
        Synapse file view
    """
    view = EntityViewSchema(
        name=name,
        parent=project_id,
        scopes=scope_ids,
        includeEntityTypes=[EntityViewType.FILE],
        add_default_columns=True,
        addAnnotationColumns=False,
    )
    return syn.store(view)


def _render_fileview(
    syn: Synapse, viewdf: pd.DataFrame, tz_name="US/Pacific"
) -> pd.DataFrame:
    """Renders file view values such as changing modifiedOn from
    Epoch time to US/Pacific datetime and Synapse userids to usernames

    Args:
        syn: Synapse connection
        viewdf: File view dataframe
        tz_name: Timezone database name
                 https://en.wikipedia.org/wiki/List_of_tz_database_time_zones

    Returns:
        Rendered File view dataframe

    """
    viewdf["createdOn"] = (
        pd.to_datetime(viewdf["createdOn"], unit="ms")
        .dt.tz_localize("utc")
        .dt.tz_convert(tz_name)
    )
    viewdf["modifiedOn"] = (
        pd.to_datetime(viewdf["modifiedOn"], unit="ms")
        .dt.tz_localize("utc")
        .dt.tz_convert(tz_name)
    )
    users = [syn.getUserProfile(user)["userName"] for user in viewdf["modifiedBy"]]
    viewdf["modifiedBy"] = users
    return viewdf


def _find_modified_entities_fileview(
    syn: Synapse, syn_id: str, value: int = 1, unit: str = "day"
) -> list:
    """Finds entities scoped in a fileview modified in the past {value} {unit}

    Args:
        syn: Synapse connection
        syn_id: Synapse Fileview Id
        value: number of time units
        unit: time unit

    Returns:
        List of synapse ids
    """
    # Update the view
    # _force_update_view(syn, view_id)

    query = (
        f"select id from {syn_id} where "
        f"modifiedOn > unix_timestamp(NOW() - INTERVAL {value} {unit})*1000"
    )
    results = syn.tableQuery(query)
    resultsdf = results.asDataFrame()
    return resultsdf["id"].tolist()


def _find_modified_entities_file(
    syn: Synapse, syn_id: str, value: int = 1, unit: str = "day"
) -> list:
    """Determines if entity was modified in the past {value} {unit}.
    Note: entity modifiedOn returns UTC time

    Args:
        syn: Synapse connection
        syn_id: Synapse File Id
        value: number of time units
        unit: time unit

    Returns:
        List of synapse ids
    """

    valid_units = ["day", "hour"]
    if unit not in valid_units:
        raise ValueError(
            f"'{unit}' is not an accepted time unit. Accepted units: {valid_units}."
        )

    entity = syn.get(syn_id, downloadFile=False)
    utc_mod = datetime.strptime(entity["modifiedOn"], "%Y-%m-%dT%H:%M:%S.%fZ")
    utc_now = datetime.utcnow()

    if unit == "day":
        td = timedelta(days=value)
    elif unit == "hour":
        td = timedelta(hours=value)

    if utc_mod > utc_now - td:
        return [syn_id]
    return []


def _traverse(
    syn: Synapse,
    synid_root: str,
    include_types: typing.List = ["file"],
) -> list:
    """Traverse Synapse entity hierarchy to gather all descendant
    entities of a root entity.
    Args:
        syn: Synapse connection
        synid_root: Synapse ID of root entity.
        include_types: Must be a list of entity types (ie. [“folder”,”file”])
            which can be found here:
            http://docs.synapse.org/rest/org/sagebionetworks/repo/model/EntityType.html
    Returns:
        List of descendant Synapse IDs without root Synapse ID
    """

    synid_desc = []

    # full traverse depends on examining folder entities, even if not requested
    include_types_mod = set(include_types)
    include_types_mod.add("folder")
    include_types_mod = list(include_types_mod)

    synid_children = syn.getChildren(parent=synid_root, includeTypes=include_types_mod)
    for synid_child in synid_children:
        entity_type = synid_child["type"].split(".")[-1].lower().replace("entity", "")
        if entity_type == "folder":
            synid_desc.extend(
                _traverse(
                    syn=syn, synid_root=synid_child["id"], include_types=include_types
                )
            )
        if entity_type in include_types:
            synid_desc.append(synid_child["id"])

    return synid_desc


def _traverse_root(
    syn: Synapse,
    synid_root: str,
    include_types: typing.List = ["file"],
) -> list:
    """Wrapper for call traverse to include root.

    Args:
        syn (Synapse): Synapse connection
        synid_root (str): Synapse ID of root entity.
        include_types (typing.List, optional): Must be a list of entity types (ie. [“folder”,”file”])
            which can be found here:
            http://docs.synapse.org/rest/org/sagebionetworks/repo/model/EntityType.html

    Returns:
        list: List of descendant Synapse IDs with root Synapse ID
    """
    synid_desc = _traverse(syn, synid_root, include_types)
    entity = syn.get(synid_root, downloadFile=False)
    entity_type = entity["concreteType"].split(".")[-1].lower().replace("entity", "")
    if entity_type in include_types:
        synid_desc.append(synid_root)

    return synid_desc


def _find_modified_entities_container(
    syn: Synapse, syn_id: str, value: int = 1, unit: str = "day"
) -> list:
    """Finds entities in a folder or project modified in the past {value} {unit}

    Args:
        syn: Synapse connection
        syn_id: Synapse Folder or Project Id
        value: number of time units
        unit: time unit

    Returns:
        List of synapse ids
    """
    syn_id_mod = []
    syn_id_children = _traverse_root(syn, syn_id)

    for syn_id_child in syn_id_children:
        syn_id_res = _find_modified_entities_file(syn, syn_id_child, value, unit)
        if syn_id_res:
            syn_id_mod.extend(syn_id_res)

    return syn_id_mod


def _force_update_view(syn: Synapse, view_id: str):
    """File views are not indexed unless someone queries them by
    going to the file view on Synapse or querying them via a function
    call.  This will force the update of the file view to ensure the most
    up to date fileview is used.

    Args:
        syn: Synapse connection
        view_id: Synapse ID of fileview to be monitored.
    """
    syn.tableQuery(f"select * from {view_id} limit 1")


def _get_user_ids(syn: Synapse, users: list = None):
    """Get users ids from list of user ids or usernames.  This will also
    confirm that the users specified exist in the system

    Args:
        syn: Synapse connection
        users: List of Synapse user Ids or usernames

    Returns:
        List of Synapse user Ids.
    """
    if users is None:
        user_ids = [syn.getUserProfile()["ownerId"]]
    else:
        user_ids = [syn.getUserProfile(user)["ownerId"] for user in users]
    return user_ids


def find_modified_entities(
    syn: Synapse, syn_id: str, value: int = 1, unit: str = "day"
) -> list:
    """Find modified entities based on the type of the input

    Args:
        syn: Synapse connection
        syn_id: Synapse Entity Id
        value: number of time units
        unit: time unit

    Returns:
        List of synapse ids
    """

    entity = syn.get(syn_id, downloadFile=False)
    if isinstance(entity, synapseclient.EntityViewSchema):
        return _find_modified_entities_fileview(
            syn=syn, syn_id=syn_id, value=value, unit=unit
        )
    elif isinstance(entity, (synapseclient.File, synapseclient.Schema)):
        return _find_modified_entities_file(
            syn=syn, syn_id=syn_id, value=value, unit=unit
        )
    elif isinstance(entity, (synapseclient.Folder, synapseclient.Project)):
        return _find_modified_entities_container(
            syn=syn, syn_id=syn_id, value=value, unit=unit
        )
    else:
        raise ValueError(f"{type(entity)} not supported")


def monitoring(
    syn: Synapse,
    syn_id: str,
    users: list = None,
    email_subject: str = "New Synapse Files",
    value: int = 1,
    unit: str = "day",
) -> pd.DataFrame:
    """Monitor the modifications of an entity scoped by a Fileview.

    Args:
        syn: Synapse connection
        synid: Synapse ID of fileview to be monitored.
        users: User Id or usernames of individual to send report.
               If empty, defaults to current logged in Synapse user.
        email_subject: Sets the subject heading of the email sent out.
                       (default: 'New Synapse Files')
        value: number of time units
        unit: time unit

    Returns:
        Dataframe with files modified within past {value} {unit}
    """

    # get dataframe of files
    modified_entities = find_modified_entities(
        syn=syn, syn_id=syn_id, value=value, unit=unit
    )
    # Filter out projects and folders
    logging.info(f"Total number of entities = {len(modified_entities)}")

    # get user ids
    user_ids = _get_user_ids(syn, users)

    # TODO: Add function to beautify email message

    # Prepare and send Message
    if modified_entities:
        syn.sendMessage(
            user_ids,
            email_subject,
            ", ".join(modified_entities),
            contentType="text/html",
        )
    return modified_entities

Functions

def create_file_view(syn: synapseclient.client.Synapse, name: str, project_id: str, scope_ids: List[str]) ‑> synapseclient.table.EntityViewSchema

Creates a file view that will list all the File entities under the specified scopes (Synapse Folders or Projects). This will allow you to query for the files contained in your specified scopes. This will NOT track the other entities currently: PROJECT, TABLE, FOLDER, VIEW, DOCKER.

Args

syn
Synapse connection
name
File view name
project_id
Synapse project id to store your file view
scope_ids
List of Folder or Project synapse Ids

Returns

Synapse file view

Expand source code
def create_file_view(
    syn: Synapse, name: str, project_id: str, scope_ids: typing.List[str]
) -> EntityViewSchema:
    """Creates a file view that will list all the File entities under
    the specified scopes (Synapse Folders or Projects). This will
    allow you to query for the files contained in your specified scopes.
    This will NOT track the other entities currently: PROJECT, TABLE,
    FOLDER, VIEW, DOCKER.

    Args:
        syn: Synapse connection
        name: File view name
        project_id: Synapse project id to store your file view
        scope_ids: List of Folder or Project synapse Ids

    Returns:
        Synapse file view
    """
    view = EntityViewSchema(
        name=name,
        parent=project_id,
        scopes=scope_ids,
        includeEntityTypes=[EntityViewType.FILE],
        add_default_columns=True,
        addAnnotationColumns=False,
    )
    return syn.store(view)
def find_modified_entities(syn: synapseclient.client.Synapse, syn_id: str, value: int = 1, unit: str = 'day') ‑> list

Find modified entities based on the type of the input

Args

syn
Synapse connection
syn_id
Synapse Entity Id
value
number of time units
unit
time unit

Returns

List of synapse ids

Expand source code
def find_modified_entities(
    syn: Synapse, syn_id: str, value: int = 1, unit: str = "day"
) -> list:
    """Find modified entities based on the type of the input

    Args:
        syn: Synapse connection
        syn_id: Synapse Entity Id
        value: number of time units
        unit: time unit

    Returns:
        List of synapse ids
    """

    entity = syn.get(syn_id, downloadFile=False)
    if isinstance(entity, synapseclient.EntityViewSchema):
        return _find_modified_entities_fileview(
            syn=syn, syn_id=syn_id, value=value, unit=unit
        )
    elif isinstance(entity, (synapseclient.File, synapseclient.Schema)):
        return _find_modified_entities_file(
            syn=syn, syn_id=syn_id, value=value, unit=unit
        )
    elif isinstance(entity, (synapseclient.Folder, synapseclient.Project)):
        return _find_modified_entities_container(
            syn=syn, syn_id=syn_id, value=value, unit=unit
        )
    else:
        raise ValueError(f"{type(entity)} not supported")
def monitoring(syn: synapseclient.client.Synapse, syn_id: str, users: list = None, email_subject: str = 'New Synapse Files', value: int = 1, unit: str = 'day') ‑> pandas.core.frame.DataFrame

Monitor the modifications of an entity scoped by a Fileview.

Args

syn
Synapse connection
synid
Synapse ID of fileview to be monitored.
users
User Id or usernames of individual to send report. If empty, defaults to current logged in Synapse user.
email_subject
Sets the subject heading of the email sent out. (default: 'New Synapse Files')
value
number of time units
unit
time unit

Returns

Dataframe with files modified within past {value} {unit}

Expand source code
def monitoring(
    syn: Synapse,
    syn_id: str,
    users: list = None,
    email_subject: str = "New Synapse Files",
    value: int = 1,
    unit: str = "day",
) -> pd.DataFrame:
    """Monitor the modifications of an entity scoped by a Fileview.

    Args:
        syn: Synapse connection
        synid: Synapse ID of fileview to be monitored.
        users: User Id or usernames of individual to send report.
               If empty, defaults to current logged in Synapse user.
        email_subject: Sets the subject heading of the email sent out.
                       (default: 'New Synapse Files')
        value: number of time units
        unit: time unit

    Returns:
        Dataframe with files modified within past {value} {unit}
    """

    # get dataframe of files
    modified_entities = find_modified_entities(
        syn=syn, syn_id=syn_id, value=value, unit=unit
    )
    # Filter out projects and folders
    logging.info(f"Total number of entities = {len(modified_entities)}")

    # get user ids
    user_ids = _get_user_ids(syn, users)

    # TODO: Add function to beautify email message

    # Prepare and send Message
    if modified_entities:
        syn.sendMessage(
            user_ids,
            email_subject,
            ", ".join(modified_entities),
            contentType="text/html",
        )
    return modified_entities