schematic.store.database.synapse_database
SynapseDatabase
1"""SynapseDatabase""" 2 3import pandas as pd 4import synapseclient as sc # type: ignore 5 6from schematic.store.database.synapse_database_wrapper import Synapse 7from schematic.store.synapse_tracker import SynapseEntityTracker 8 9 10class SynapseDatabaseMissingTableAnnotationsError(Exception): 11 """Raised when a table is missing expected annotations""" 12 13 def __init__(self, message: str, table_name: str) -> None: 14 self.message = message 15 self.table_name = table_name 16 super().__init__(self.message) 17 18 def __str__(self) -> str: 19 return f"{self.message}; " f"name: {self.table_name};" 20 21 22class InputDataframeMissingColumn(Exception): 23 """Raised when an input dataframe is missing a needed column(s)""" 24 25 def __init__( 26 self, message: str, table_columns: list[str], missing_columns: list[str] 27 ) -> None: 28 self.message = message 29 self.table_columns = table_columns 30 self.missing_columns = missing_columns 31 super().__init__(self.message) 32 33 def __str__(self) -> str: 34 return ( 35 f"{self.message}; " 36 f"table_columns: {self.table_columns}; " 37 f"missing_columns: {self.missing_columns}" 38 ) 39 40 41class SynapseDatabase: 42 """Represents a database stored as Synapse tables""" 43 44 def __init__( 45 self, 46 auth_token: str, 47 project_id: str, 48 synapse_entity_tracker: SynapseEntityTracker = None, 49 syn: sc.Synapse = None, 50 ) -> None: 51 """Init 52 53 Args: 54 auth_token (str): A Synapse auth_token 55 project_id (str): A Synapse id for a project 56 synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities 57 """ 58 self.synapse = Synapse( 59 auth_token=auth_token, 60 project_id=project_id, 61 synapse_entity_tracker=synapse_entity_tracker, 62 syn=syn, 63 ) 64 65 def upsert_table_rows(self, table_name: str, data: pd.DataFrame) -> None: 66 """Upserts rows into the given table 67 68 Args: 69 table_name (str): The name of the table to be upserted into. 70 data (pd.DataFrame): The table the rows will come from 71 72 Raises: 73 SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no 74 primary key annotation. 75 """ 76 table_id = self.synapse.get_synapse_id_from_table_name(table_name) 77 annotations = self.synapse.get_entity_annotations(table_id) 78 if "primary_key" not in annotations: 79 raise SynapseDatabaseMissingTableAnnotationsError( 80 "Table has no primary_key annotation", table_name 81 ) 82 primary_key = annotations["primary_key"][0] 83 self._upsert_table_rows(table_id, data, primary_key) 84 85 def _upsert_table_rows( 86 self, table_id: str, data: pd.DataFrame, primary_key: str 87 ) -> None: 88 """Upserts rows into the given table 89 90 Args: 91 table_id (str): The Synapse id of the table to be upserted into. 92 data (pd.DataFrame): The table the rows will come from 93 primary_key (str): The primary key of the table used to identify 94 which rows to update 95 96 Raises: 97 InputDataframeMissingColumn: Raised when the input dataframe has 98 no column that matches the primary key argument. 99 """ 100 if primary_key not in list(data.columns): 101 raise InputDataframeMissingColumn( 102 "Input dataframe missing primary key column.", 103 list(data.columns), 104 [primary_key], 105 ) 106 107 table = self._create_primary_key_table(table_id, primary_key) 108 merged_table = pd.merge( 109 data, table, how="left", on=primary_key, validate="one_to_one" 110 ) 111 self.synapse.upsert_table_rows(table_id, merged_table) 112 113 def _create_primary_key_table( 114 self, table_id: str, primary_key: str 115 ) -> pd.DataFrame: 116 """Creates a dataframe with just the primary key of the table 117 118 Args: 119 table_id (str): The id of the table to query 120 primary_key (str): The name of the primary key 121 122 Returns: 123 pd.DataFrame: The table in pandas.DataFrame form with the primary key, ROW_ID, and 124 ROW_VERSION columns 125 126 Raises: 127 InputDataframeMissingColumn: Raised when the synapse table has no column that 128 matches the primary key argument. 129 """ 130 table = self.synapse.query_table(table_id, include_row_data=True) 131 if primary_key not in list(table.columns): 132 raise InputDataframeMissingColumn( 133 "Synapse table missing primary key column", 134 list(table.columns), 135 [primary_key], 136 ) 137 table = table[["ROW_ID", "ROW_VERSION", primary_key]] 138 return table
class
SynapseDatabaseMissingTableAnnotationsError(builtins.Exception):
11class SynapseDatabaseMissingTableAnnotationsError(Exception): 12 """Raised when a table is missing expected annotations""" 13 14 def __init__(self, message: str, table_name: str) -> None: 15 self.message = message 16 self.table_name = table_name 17 super().__init__(self.message) 18 19 def __str__(self) -> str: 20 return f"{self.message}; " f"name: {self.table_name};"
Raised when a table is missing expected annotations
Inherited Members
- builtins.BaseException
- with_traceback
- args
class
InputDataframeMissingColumn(builtins.Exception):
23class InputDataframeMissingColumn(Exception): 24 """Raised when an input dataframe is missing a needed column(s)""" 25 26 def __init__( 27 self, message: str, table_columns: list[str], missing_columns: list[str] 28 ) -> None: 29 self.message = message 30 self.table_columns = table_columns 31 self.missing_columns = missing_columns 32 super().__init__(self.message) 33 34 def __str__(self) -> str: 35 return ( 36 f"{self.message}; " 37 f"table_columns: {self.table_columns}; " 38 f"missing_columns: {self.missing_columns}" 39 )
Raised when an input dataframe is missing a needed column(s)
Inherited Members
- builtins.BaseException
- with_traceback
- args
class
SynapseDatabase:
42class SynapseDatabase: 43 """Represents a database stored as Synapse tables""" 44 45 def __init__( 46 self, 47 auth_token: str, 48 project_id: str, 49 synapse_entity_tracker: SynapseEntityTracker = None, 50 syn: sc.Synapse = None, 51 ) -> None: 52 """Init 53 54 Args: 55 auth_token (str): A Synapse auth_token 56 project_id (str): A Synapse id for a project 57 synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities 58 """ 59 self.synapse = Synapse( 60 auth_token=auth_token, 61 project_id=project_id, 62 synapse_entity_tracker=synapse_entity_tracker, 63 syn=syn, 64 ) 65 66 def upsert_table_rows(self, table_name: str, data: pd.DataFrame) -> None: 67 """Upserts rows into the given table 68 69 Args: 70 table_name (str): The name of the table to be upserted into. 71 data (pd.DataFrame): The table the rows will come from 72 73 Raises: 74 SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no 75 primary key annotation. 76 """ 77 table_id = self.synapse.get_synapse_id_from_table_name(table_name) 78 annotations = self.synapse.get_entity_annotations(table_id) 79 if "primary_key" not in annotations: 80 raise SynapseDatabaseMissingTableAnnotationsError( 81 "Table has no primary_key annotation", table_name 82 ) 83 primary_key = annotations["primary_key"][0] 84 self._upsert_table_rows(table_id, data, primary_key) 85 86 def _upsert_table_rows( 87 self, table_id: str, data: pd.DataFrame, primary_key: str 88 ) -> None: 89 """Upserts rows into the given table 90 91 Args: 92 table_id (str): The Synapse id of the table to be upserted into. 93 data (pd.DataFrame): The table the rows will come from 94 primary_key (str): The primary key of the table used to identify 95 which rows to update 96 97 Raises: 98 InputDataframeMissingColumn: Raised when the input dataframe has 99 no column that matches the primary key argument. 100 """ 101 if primary_key not in list(data.columns): 102 raise InputDataframeMissingColumn( 103 "Input dataframe missing primary key column.", 104 list(data.columns), 105 [primary_key], 106 ) 107 108 table = self._create_primary_key_table(table_id, primary_key) 109 merged_table = pd.merge( 110 data, table, how="left", on=primary_key, validate="one_to_one" 111 ) 112 self.synapse.upsert_table_rows(table_id, merged_table) 113 114 def _create_primary_key_table( 115 self, table_id: str, primary_key: str 116 ) -> pd.DataFrame: 117 """Creates a dataframe with just the primary key of the table 118 119 Args: 120 table_id (str): The id of the table to query 121 primary_key (str): The name of the primary key 122 123 Returns: 124 pd.DataFrame: The table in pandas.DataFrame form with the primary key, ROW_ID, and 125 ROW_VERSION columns 126 127 Raises: 128 InputDataframeMissingColumn: Raised when the synapse table has no column that 129 matches the primary key argument. 130 """ 131 table = self.synapse.query_table(table_id, include_row_data=True) 132 if primary_key not in list(table.columns): 133 raise InputDataframeMissingColumn( 134 "Synapse table missing primary key column", 135 list(table.columns), 136 [primary_key], 137 ) 138 table = table[["ROW_ID", "ROW_VERSION", primary_key]] 139 return table
Represents a database stored as Synapse tables
SynapseDatabase( auth_token: str, project_id: str, synapse_entity_tracker: schematic.store.synapse_tracker.SynapseEntityTracker = None, syn: synapseclient.client.Synapse = None)
45 def __init__( 46 self, 47 auth_token: str, 48 project_id: str, 49 synapse_entity_tracker: SynapseEntityTracker = None, 50 syn: sc.Synapse = None, 51 ) -> None: 52 """Init 53 54 Args: 55 auth_token (str): A Synapse auth_token 56 project_id (str): A Synapse id for a project 57 synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities 58 """ 59 self.synapse = Synapse( 60 auth_token=auth_token, 61 project_id=project_id, 62 synapse_entity_tracker=synapse_entity_tracker, 63 syn=syn, 64 )
Init
Arguments:
- auth_token (str): A Synapse auth_token
- project_id (str): A Synapse id for a project
- synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
def
upsert_table_rows(self, table_name: str, data: pandas.core.frame.DataFrame) -> None:
66 def upsert_table_rows(self, table_name: str, data: pd.DataFrame) -> None: 67 """Upserts rows into the given table 68 69 Args: 70 table_name (str): The name of the table to be upserted into. 71 data (pd.DataFrame): The table the rows will come from 72 73 Raises: 74 SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no 75 primary key annotation. 76 """ 77 table_id = self.synapse.get_synapse_id_from_table_name(table_name) 78 annotations = self.synapse.get_entity_annotations(table_id) 79 if "primary_key" not in annotations: 80 raise SynapseDatabaseMissingTableAnnotationsError( 81 "Table has no primary_key annotation", table_name 82 ) 83 primary_key = annotations["primary_key"][0] 84 self._upsert_table_rows(table_id, data, primary_key)
Upserts rows into the given table
Arguments:
- table_name (str): The name of the table to be upserted into.
- data (pd.DataFrame): The table the rows will come from
Raises:
- SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no primary key annotation.