schematic.store.database.synapse_database

SynapseDatabase

  1"""SynapseDatabase"""
  2
  3import pandas as pd
  4import synapseclient as sc  # type: ignore
  5
  6from schematic.store.database.synapse_database_wrapper import Synapse
  7from schematic.store.synapse_tracker import SynapseEntityTracker
  8
  9
 10class SynapseDatabaseMissingTableAnnotationsError(Exception):
 11    """Raised when a table is missing expected annotations"""
 12
 13    def __init__(self, message: str, table_name: str) -> None:
 14        self.message = message
 15        self.table_name = table_name
 16        super().__init__(self.message)
 17
 18    def __str__(self) -> str:
 19        return f"{self.message}; " f"name: {self.table_name};"
 20
 21
 22class InputDataframeMissingColumn(Exception):
 23    """Raised when an input dataframe is missing a needed column(s)"""
 24
 25    def __init__(
 26        self, message: str, table_columns: list[str], missing_columns: list[str]
 27    ) -> None:
 28        self.message = message
 29        self.table_columns = table_columns
 30        self.missing_columns = missing_columns
 31        super().__init__(self.message)
 32
 33    def __str__(self) -> str:
 34        return (
 35            f"{self.message}; "
 36            f"table_columns: {self.table_columns}; "
 37            f"missing_columns: {self.missing_columns}"
 38        )
 39
 40
 41class SynapseDatabase:
 42    """Represents a database stored as Synapse tables"""
 43
 44    def __init__(
 45        self,
 46        auth_token: str,
 47        project_id: str,
 48        synapse_entity_tracker: SynapseEntityTracker = None,
 49        syn: sc.Synapse = None,
 50    ) -> None:
 51        """Init
 52
 53        Args:
 54            auth_token (str): A Synapse auth_token
 55            project_id (str): A Synapse id for a project
 56            synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
 57        """
 58        self.synapse = Synapse(
 59            auth_token=auth_token,
 60            project_id=project_id,
 61            synapse_entity_tracker=synapse_entity_tracker,
 62            syn=syn,
 63        )
 64
 65    def upsert_table_rows(self, table_name: str, data: pd.DataFrame) -> None:
 66        """Upserts rows into the given table
 67
 68        Args:
 69            table_name (str): The name of the table to be upserted into.
 70            data (pd.DataFrame): The table the rows will come from
 71
 72        Raises:
 73            SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no
 74             primary key annotation.
 75        """
 76        table_id = self.synapse.get_synapse_id_from_table_name(table_name)
 77        annotations = self.synapse.get_entity_annotations(table_id)
 78        if "primary_key" not in annotations:
 79            raise SynapseDatabaseMissingTableAnnotationsError(
 80                "Table has no primary_key annotation", table_name
 81            )
 82        primary_key = annotations["primary_key"][0]
 83        self._upsert_table_rows(table_id, data, primary_key)
 84
 85    def _upsert_table_rows(
 86        self, table_id: str, data: pd.DataFrame, primary_key: str
 87    ) -> None:
 88        """Upserts rows into the given table
 89
 90        Args:
 91            table_id (str): The Synapse id of the table to be upserted into.
 92            data (pd.DataFrame): The table the rows will come from
 93            primary_key (str): The primary key of the table used to identify
 94              which rows to update
 95
 96        Raises:
 97            InputDataframeMissingColumn: Raised when the input dataframe has
 98              no column that matches the primary key argument.
 99        """
100        if primary_key not in list(data.columns):
101            raise InputDataframeMissingColumn(
102                "Input dataframe missing primary key column.",
103                list(data.columns),
104                [primary_key],
105            )
106
107        table = self._create_primary_key_table(table_id, primary_key)
108        merged_table = pd.merge(
109            data, table, how="left", on=primary_key, validate="one_to_one"
110        )
111        self.synapse.upsert_table_rows(table_id, merged_table)
112
113    def _create_primary_key_table(
114        self, table_id: str, primary_key: str
115    ) -> pd.DataFrame:
116        """Creates a dataframe with just the primary key of the table
117
118        Args:
119            table_id (str): The id of the table to query
120            primary_key (str): The name of the primary key
121
122        Returns:
123            pd.DataFrame: The table in pandas.DataFrame form with the primary key, ROW_ID, and
124             ROW_VERSION columns
125
126        Raises:
127            InputDataframeMissingColumn: Raised when the synapse table has no column that
128              matches the primary key argument.
129        """
130        table = self.synapse.query_table(table_id, include_row_data=True)
131        if primary_key not in list(table.columns):
132            raise InputDataframeMissingColumn(
133                "Synapse table missing primary key column",
134                list(table.columns),
135                [primary_key],
136            )
137        table = table[["ROW_ID", "ROW_VERSION", primary_key]]
138        return table
class SynapseDatabaseMissingTableAnnotationsError(builtins.Exception):
11class SynapseDatabaseMissingTableAnnotationsError(Exception):
12    """Raised when a table is missing expected annotations"""
13
14    def __init__(self, message: str, table_name: str) -> None:
15        self.message = message
16        self.table_name = table_name
17        super().__init__(self.message)
18
19    def __str__(self) -> str:
20        return f"{self.message}; " f"name: {self.table_name};"

Raised when a table is missing expected annotations

SynapseDatabaseMissingTableAnnotationsError(message: str, table_name: str)
14    def __init__(self, message: str, table_name: str) -> None:
15        self.message = message
16        self.table_name = table_name
17        super().__init__(self.message)
message
table_name
Inherited Members
builtins.BaseException
with_traceback
args
class InputDataframeMissingColumn(builtins.Exception):
23class InputDataframeMissingColumn(Exception):
24    """Raised when an input dataframe is missing a needed column(s)"""
25
26    def __init__(
27        self, message: str, table_columns: list[str], missing_columns: list[str]
28    ) -> None:
29        self.message = message
30        self.table_columns = table_columns
31        self.missing_columns = missing_columns
32        super().__init__(self.message)
33
34    def __str__(self) -> str:
35        return (
36            f"{self.message}; "
37            f"table_columns: {self.table_columns}; "
38            f"missing_columns: {self.missing_columns}"
39        )

Raised when an input dataframe is missing a needed column(s)

InputDataframeMissingColumn(message: str, table_columns: list[str], missing_columns: list[str])
26    def __init__(
27        self, message: str, table_columns: list[str], missing_columns: list[str]
28    ) -> None:
29        self.message = message
30        self.table_columns = table_columns
31        self.missing_columns = missing_columns
32        super().__init__(self.message)
message
table_columns
missing_columns
Inherited Members
builtins.BaseException
with_traceback
args
class SynapseDatabase:
 42class SynapseDatabase:
 43    """Represents a database stored as Synapse tables"""
 44
 45    def __init__(
 46        self,
 47        auth_token: str,
 48        project_id: str,
 49        synapse_entity_tracker: SynapseEntityTracker = None,
 50        syn: sc.Synapse = None,
 51    ) -> None:
 52        """Init
 53
 54        Args:
 55            auth_token (str): A Synapse auth_token
 56            project_id (str): A Synapse id for a project
 57            synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
 58        """
 59        self.synapse = Synapse(
 60            auth_token=auth_token,
 61            project_id=project_id,
 62            synapse_entity_tracker=synapse_entity_tracker,
 63            syn=syn,
 64        )
 65
 66    def upsert_table_rows(self, table_name: str, data: pd.DataFrame) -> None:
 67        """Upserts rows into the given table
 68
 69        Args:
 70            table_name (str): The name of the table to be upserted into.
 71            data (pd.DataFrame): The table the rows will come from
 72
 73        Raises:
 74            SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no
 75             primary key annotation.
 76        """
 77        table_id = self.synapse.get_synapse_id_from_table_name(table_name)
 78        annotations = self.synapse.get_entity_annotations(table_id)
 79        if "primary_key" not in annotations:
 80            raise SynapseDatabaseMissingTableAnnotationsError(
 81                "Table has no primary_key annotation", table_name
 82            )
 83        primary_key = annotations["primary_key"][0]
 84        self._upsert_table_rows(table_id, data, primary_key)
 85
 86    def _upsert_table_rows(
 87        self, table_id: str, data: pd.DataFrame, primary_key: str
 88    ) -> None:
 89        """Upserts rows into the given table
 90
 91        Args:
 92            table_id (str): The Synapse id of the table to be upserted into.
 93            data (pd.DataFrame): The table the rows will come from
 94            primary_key (str): The primary key of the table used to identify
 95              which rows to update
 96
 97        Raises:
 98            InputDataframeMissingColumn: Raised when the input dataframe has
 99              no column that matches the primary key argument.
100        """
101        if primary_key not in list(data.columns):
102            raise InputDataframeMissingColumn(
103                "Input dataframe missing primary key column.",
104                list(data.columns),
105                [primary_key],
106            )
107
108        table = self._create_primary_key_table(table_id, primary_key)
109        merged_table = pd.merge(
110            data, table, how="left", on=primary_key, validate="one_to_one"
111        )
112        self.synapse.upsert_table_rows(table_id, merged_table)
113
114    def _create_primary_key_table(
115        self, table_id: str, primary_key: str
116    ) -> pd.DataFrame:
117        """Creates a dataframe with just the primary key of the table
118
119        Args:
120            table_id (str): The id of the table to query
121            primary_key (str): The name of the primary key
122
123        Returns:
124            pd.DataFrame: The table in pandas.DataFrame form with the primary key, ROW_ID, and
125             ROW_VERSION columns
126
127        Raises:
128            InputDataframeMissingColumn: Raised when the synapse table has no column that
129              matches the primary key argument.
130        """
131        table = self.synapse.query_table(table_id, include_row_data=True)
132        if primary_key not in list(table.columns):
133            raise InputDataframeMissingColumn(
134                "Synapse table missing primary key column",
135                list(table.columns),
136                [primary_key],
137            )
138        table = table[["ROW_ID", "ROW_VERSION", primary_key]]
139        return table

Represents a database stored as Synapse tables

SynapseDatabase( auth_token: str, project_id: str, synapse_entity_tracker: schematic.store.synapse_tracker.SynapseEntityTracker = None, syn: synapseclient.client.Synapse = None)
45    def __init__(
46        self,
47        auth_token: str,
48        project_id: str,
49        synapse_entity_tracker: SynapseEntityTracker = None,
50        syn: sc.Synapse = None,
51    ) -> None:
52        """Init
53
54        Args:
55            auth_token (str): A Synapse auth_token
56            project_id (str): A Synapse id for a project
57            synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
58        """
59        self.synapse = Synapse(
60            auth_token=auth_token,
61            project_id=project_id,
62            synapse_entity_tracker=synapse_entity_tracker,
63            syn=syn,
64        )

Init

Arguments:
  • auth_token (str): A Synapse auth_token
  • project_id (str): A Synapse id for a project
  • synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
synapse
def upsert_table_rows(self, table_name: str, data: pandas.core.frame.DataFrame) -> None:
66    def upsert_table_rows(self, table_name: str, data: pd.DataFrame) -> None:
67        """Upserts rows into the given table
68
69        Args:
70            table_name (str): The name of the table to be upserted into.
71            data (pd.DataFrame): The table the rows will come from
72
73        Raises:
74            SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no
75             primary key annotation.
76        """
77        table_id = self.synapse.get_synapse_id_from_table_name(table_name)
78        annotations = self.synapse.get_entity_annotations(table_id)
79        if "primary_key" not in annotations:
80            raise SynapseDatabaseMissingTableAnnotationsError(
81                "Table has no primary_key annotation", table_name
82            )
83        primary_key = annotations["primary_key"][0]
84        self._upsert_table_rows(table_id, data, primary_key)

Upserts rows into the given table

Arguments:
  • table_name (str): The name of the table to be upserted into.
  • data (pd.DataFrame): The table the rows will come from
Raises:
  • SynapseDatabaseMissingTableAnnotationsError: Raised when the table has no primary key annotation.