Module synapsemonitor.actions
Expand source code
from abc import ABC, abstractmethod
from typing import Type
from synapseclient import Synapse
from . import monitor
class SynapseAction(ABC):
"""Base synapse action class"""
def __init__(
self,
syn: Synapse,
syn_id: str,
value: int = 1,
unit: str = "day",
verbose: bool = False,
) -> None:
self.syn = syn
self.syn_id = syn_id
self.value = value
self.unit = unit
self.verbose = verbose
@abstractmethod
def _action(self, modified_entities: list) -> None:
pass
def action(self):
"""Do action on list modified entities"""
modified_entities = monitor.find_modified_entities(
syn=self.syn, syn_id=self.syn_id, value=self.value, unit=self.unit
)
action_result = self._action(modified_entities)
if self.verbose:
print(action_result)
return action_result
class EmailAction(SynapseAction):
"""This action emails specified users with modified entities"""
def __init__(
self,
syn: Synapse,
syn_id: str,
value: int = 1,
unit: str = "day",
verbose: bool = False,
users: list = None,
email_subject: str = "New Synapse Files",
):
self.users = users
self.email_subject = email_subject
super().__init__(
syn=syn, syn_id=syn_id, value=value, unit=unit, verbose=verbose
)
def _action(self, modified_entities: list) -> list:
# get user ids
user_ids = monitor._get_user_ids(self.syn, self.users)
# TODO: Add function to beautify email message
# Prepare and send Message
if modified_entities:
self.syn.sendMessage(
user_ids,
self.email_subject,
", ".join(modified_entities),
contentType="text/html",
)
return modified_entities
def synapse_action(action_cls: Type[SynapseAction]):
"""synapse action helper function
Args:
action_cls: Takes in any class that extends SynapseAction
Returns:
User defined return
"""
action_results = action_cls.action()
return action_results
Functions
def synapse_action(action_cls: Type[SynapseAction])
-
synapse action helper function
Args
action_cls
- Takes in any class that extends SynapseAction
Returns
User defined return
Expand source code
def synapse_action(action_cls: Type[SynapseAction]): """synapse action helper function Args: action_cls: Takes in any class that extends SynapseAction Returns: User defined return """ action_results = action_cls.action() return action_results
Classes
class EmailAction (syn: synapseclient.client.Synapse, syn_id: str, value: int = 1, unit: str = 'day', verbose: bool = False, users: list = None, email_subject: str = 'New Synapse Files')
-
This action emails specified users with modified entities
Expand source code
class EmailAction(SynapseAction): """This action emails specified users with modified entities""" def __init__( self, syn: Synapse, syn_id: str, value: int = 1, unit: str = "day", verbose: bool = False, users: list = None, email_subject: str = "New Synapse Files", ): self.users = users self.email_subject = email_subject super().__init__( syn=syn, syn_id=syn_id, value=value, unit=unit, verbose=verbose ) def _action(self, modified_entities: list) -> list: # get user ids user_ids = monitor._get_user_ids(self.syn, self.users) # TODO: Add function to beautify email message # Prepare and send Message if modified_entities: self.syn.sendMessage( user_ids, self.email_subject, ", ".join(modified_entities), contentType="text/html", ) return modified_entities
Ancestors
- SynapseAction
- abc.ABC
Inherited members
class SynapseAction (syn: synapseclient.client.Synapse, syn_id: str, value: int = 1, unit: str = 'day', verbose: bool = False)
-
Base synapse action class
Expand source code
class SynapseAction(ABC): """Base synapse action class""" def __init__( self, syn: Synapse, syn_id: str, value: int = 1, unit: str = "day", verbose: bool = False, ) -> None: self.syn = syn self.syn_id = syn_id self.value = value self.unit = unit self.verbose = verbose @abstractmethod def _action(self, modified_entities: list) -> None: pass def action(self): """Do action on list modified entities""" modified_entities = monitor.find_modified_entities( syn=self.syn, syn_id=self.syn_id, value=self.value, unit=self.unit ) action_result = self._action(modified_entities) if self.verbose: print(action_result) return action_result
Ancestors
- abc.ABC
Subclasses
Methods
def action(self)
-
Do action on list modified entities
Expand source code
def action(self): """Do action on list modified entities""" modified_entities = monitor.find_modified_entities( syn=self.syn, syn_id=self.syn_id, value=self.value, unit=self.unit ) action_result = self._action(modified_entities) if self.verbose: print(action_result) return action_result