schematic.utils.general

General utils

  1"""General utils"""
  2
  3# pylint: disable=logging-fstring-interpolation
  4
  5import logging
  6import os
  7import pstats
  8from pathlib import Path
  9import tempfile
 10from cProfile import Profile
 11from datetime import datetime, timedelta
 12from functools import wraps
 13from typing import Any, Callable, Optional, Sequence, TypeVar, Union
 14
 15from synapseclient import Synapse  # type: ignore
 16from synapseclient.core import cache  # type: ignore
 17from synapseclient.core.exceptions import SynapseHTTPError  # type: ignore
 18from synapseclient.entity import File, Folder, Project  # type: ignore
 19from synapseclient.table import EntityViewSchema  # type: ignore
 20
 21from schematic.store.synapse_tracker import SynapseEntityTracker
 22
 23logger = logging.getLogger(__name__)
 24
 25T = TypeVar("T")
 26
 27SYN_ID_REGEX = r"(syn\d+\,?)+"
 28
 29
 30def find_duplicates(_list: list[T]) -> set[T]:
 31    """Find duplicate items in a list"""
 32    return {x for x in _list if _list.count(x) > 1}
 33
 34
 35def dict2list(item: Any) -> Optional[Union[dict, list]]:
 36    """Puts a dictionary into a list
 37
 38    Args:
 39        item (Any): Any type of input
 40
 41    Returns:
 42        Optional[Union[dict, list]]:
 43          If input is a list, return it
 44          If input is a dict, return it in a list
 45          Return None for anything else
 46    """
 47    if isinstance(item, list):
 48        return item
 49    if isinstance(item, dict):
 50        return [item]
 51    return None
 52
 53
 54def str2list(item: Any) -> Optional[list]:
 55    """Puts a string into a list
 56
 57    Args:
 58        item (Any): Any type of input
 59
 60    Returns:
 61        Optional[list]:
 62          If input is a list, return it
 63          If input is a string, return it in a list
 64          Return None for anything else
 65    """
 66    if isinstance(item, str):
 67        return [item]
 68    if isinstance(item, list):
 69        return item
 70    return None
 71
 72
 73X = TypeVar("X")
 74
 75
 76def unlist(seq: Sequence[X]) -> Union[Sequence[X], X]:
 77    """Returns the first item of a sequence
 78
 79    Args:
 80        seq (Sequence[X]): A Sequence of any type
 81
 82    Returns:
 83        Union[Sequence[X], X]:
 84          if sequence is length one, return the first item
 85          otherwise return the sequence
 86    """
 87    if len(seq) == 1:
 88        return seq[0]
 89    return seq
 90
 91
 92def get_dir_size(path: str) -> int:
 93    """
 94    Recursively descend the directory tree rooted at the top and call
 95      .st_size function to calculate size of files in bytes.
 96    Args:
 97        path: path to a folder
 98    return: total size of all the files in a given directory in bytes.
 99    """
100    total = 0
101    # Recursively scan directory to find entries
102    with os.scandir(path) as itr:
103        for entry in itr:
104            if entry.is_file():
105                total += entry.stat().st_size
106            elif entry.is_dir():
107                total += get_dir_size(entry.path)
108    return total
109
110
111def calculate_datetime(
112    minutes: int, input_date: datetime, before_or_after: str = "before"
113) -> datetime:
114    """calculate date time
115
116    Args:
117        input_date (datetime): date time object provided by users
118        minutes (int): number of minutes
119        before_or_after (str): default to "before". if "before", calculate x minutes before
120         current date time. if "after", calculate x minutes after current date time.
121
122    Returns:
123        datetime:  return result of date time calculation
124    """
125    if before_or_after == "before":
126        date_time_result = input_date - timedelta(minutes=minutes)
127    elif before_or_after == "after":
128        date_time_result = input_date + timedelta(minutes=minutes)
129    else:
130        raise ValueError("Invalid value. Use either 'before' or 'after'.")
131    return date_time_result
132
133
134def check_synapse_cache_size(directory: str = "/root/.synapseCache") -> float:
135    """Calculate size of .synapseCache directory in bytes using pathlib.
136
137    Args:
138        directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache'
139
140    Returns:
141        float: size of .synapsecache directory in bytes
142    """
143    total_size = sum(
144        f.stat().st_size for f in Path(directory).rglob("*") if f.is_file()
145    )
146    return total_size
147
148
149def clear_synapse_cache(synapse_cache: cache.Cache, minutes: int) -> int:
150    """clear synapse cache before a certain time
151
152    Args:
153        synapse_cache: an object of synapseclient Cache.
154        minutes (int): all files before this minute will be removed
155    Returns:
156        int: number of files that get deleted
157    """
158    current_date = datetime.utcnow()
159    minutes_earlier = calculate_datetime(
160        input_date=current_date, minutes=minutes, before_or_after="before"
161    )
162    num_of_deleted_files = synapse_cache.purge(before_date=minutes_earlier)
163    return num_of_deleted_files
164
165
166def entity_type_mapping(
167    syn: Synapse,
168    entity_id: str,
169    synapse_entity_tracker: Optional[SynapseEntityTracker] = None,
170) -> str:
171    """Return the entity type of manifest
172
173    Args:
174        syn (Synapse): Synapse object
175        entity_id (str): id of an entity
176        synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
177
178    Raises:
179        SynapseHTTPError: Re-raised SynapseHTTPError
180
181    Returns:
182        str: type of the manifest being returned
183    """
184    # check the type of entity
185    try:
186        if not synapse_entity_tracker:
187            synapse_entity_tracker = SynapseEntityTracker()
188        entity = synapse_entity_tracker.get(
189            synapse_id=entity_id, syn=syn, download_file=False
190        )
191    except SynapseHTTPError as exc:
192        logger.error(
193            f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists"
194        )
195        raise SynapseHTTPError(
196            f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists"
197        ) from exc
198
199    if isinstance(entity, EntityViewSchema):
200        entity_type = "asset view"
201    elif isinstance(entity, Folder):
202        entity_type = "folder"
203    elif isinstance(entity, File):
204        entity_type = "file"
205    elif isinstance(entity, Project):
206        entity_type = "project"
207    else:
208        assert entity is not None
209        # if there's no matching type, return concreteType
210        entity_type = entity.concreteType
211    return entity_type
212
213
214def create_temp_folder(path: str, prefix: Optional[str] = None) -> str:
215    """This function creates a temporary directory in the specified directory
216    Args:
217        path(str): a directory path where all the temporary files will live
218        prefix(str): a prefix to be added to the temporary directory name
219    Returns: returns the absolute pathname of the new directory.
220    """
221    if not os.path.exists(path):
222        os.makedirs(path, exist_ok=True)
223
224    # Create a temporary directory in the specified directory
225    path = tempfile.mkdtemp(dir=path, prefix=prefix)
226    return path
227
228
229def profile(
230    output_file: Optional[str] = None,
231    sort_by: Any = "cumulative",
232    lines_to_print: Optional[int] = None,
233    strip_dirs: bool = False,
234) -> Callable:
235    """
236    The function was initially taken from:
237    https://towardsdatascience.com/how-to-profile-your-code-in-python-e70c834fad89
238    A time profiler decorator.
239    Inspired by and modified the profile decorator of Giampaolo Rodola:
240    http://code.activestate.com/recipes/577817-profile-decorator/
241
242    Args:
243        output_file (Optional[str], optional):
244            Path of the output file. If only name of the file is given, it's
245            saved in the current directory.
246            If it's None, the name of the decorated function is used.
247            Defaults to None.
248        sort_by (str, optional):
249            str or SortKey enum or tuple/list of str/SortKey enum
250            Sorting criteria for the Stats object.
251            For a list of valid string and SortKey refer to:
252            https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats
253            Defaults to "cumulative".
254        lines_to_print (Optional[int], optional):
255            Number of lines to print.
256            This is useful in reducing the size of the printout, especially
257            that sorting by 'cumulative', the time consuming operations
258            are printed toward the top of the file.
259            Default (None) is for all the lines.
260        strip_dirs (bool, optional):
261            Whether to remove the leading path info from file names.
262            This is also useful in reducing the size of the printout
263            Defaults to False.
264
265    Returns:
266        Callable: Profile of the decorated function
267    """
268
269    def inner(func: Callable) -> Callable:
270        @wraps(func)
271        def wrapper(*args: Any, **kwargs: Any) -> Callable:
272            _output_file = output_file or func.__name__ + ".prof"
273            profiler = Profile()
274            profiler.enable()
275            retval = func(*args, **kwargs)
276            profiler.disable()
277            profiler.dump_stats(_output_file)
278
279            # if we are running the functions on AWS:
280            if "SECRETS_MANAGER_SECRETS" in os.environ:
281                p_stats = pstats.Stats(profiler)
282                # limit this to 30 line for now otherwise it will be too long for AWS log
283                p_stats.sort_stats("cumulative").print_stats(30)
284            else:
285                with open(_output_file, "w", encoding="utf-8") as fle:
286                    p_stats = pstats.Stats(profiler, stream=fle)
287                    if strip_dirs:
288                        p_stats.strip_dirs()
289                    if isinstance(sort_by, (tuple, list)):
290                        p_stats.sort_stats(*sort_by)
291                    else:
292                        p_stats.sort_stats(sort_by)
293                    p_stats.print_stats(lines_to_print)  # type: ignore
294            return retval
295
296        return wrapper
297
298    return inner
299
300
301def normalize_path(path: str, parent_folder: str) -> str:
302    """
303    Normalizes a path.
304    If the path is relative, the parent_folder is added to make it an absolute path.
305
306    Args:
307        path (str): The path to the file to normalize.
308        parent_folder (str): The folder the file is in.
309
310    Returns:
311        str: The normalized path.
312    """
313    if not os.path.isabs(path):
314        path = os.path.join(parent_folder, path)
315    return os.path.normpath(path)
logger = <Logger schematic.utils.general (WARNING)>
SYN_ID_REGEX = '(syn\\d+\\,?)+'
def find_duplicates(_list: list[~T]) -> set[~T]:
31def find_duplicates(_list: list[T]) -> set[T]:
32    """Find duplicate items in a list"""
33    return {x for x in _list if _list.count(x) > 1}

Find duplicate items in a list

def dict2list(item: Any) -> Union[dict, list, NoneType]:
36def dict2list(item: Any) -> Optional[Union[dict, list]]:
37    """Puts a dictionary into a list
38
39    Args:
40        item (Any): Any type of input
41
42    Returns:
43        Optional[Union[dict, list]]:
44          If input is a list, return it
45          If input is a dict, return it in a list
46          Return None for anything else
47    """
48    if isinstance(item, list):
49        return item
50    if isinstance(item, dict):
51        return [item]
52    return None

Puts a dictionary into a list

Arguments:
  • item (Any): Any type of input
Returns:

Optional[Union[dict, list]]: If input is a list, return it If input is a dict, return it in a list Return None for anything else

def str2list(item: Any) -> Optional[list]:
55def str2list(item: Any) -> Optional[list]:
56    """Puts a string into a list
57
58    Args:
59        item (Any): Any type of input
60
61    Returns:
62        Optional[list]:
63          If input is a list, return it
64          If input is a string, return it in a list
65          Return None for anything else
66    """
67    if isinstance(item, str):
68        return [item]
69    if isinstance(item, list):
70        return item
71    return None

Puts a string into a list

Arguments:
  • item (Any): Any type of input
Returns:

Optional[list]: If input is a list, return it If input is a string, return it in a list Return None for anything else

def unlist(seq: Sequence[~X]) -> Union[Sequence[~X], ~X]:
77def unlist(seq: Sequence[X]) -> Union[Sequence[X], X]:
78    """Returns the first item of a sequence
79
80    Args:
81        seq (Sequence[X]): A Sequence of any type
82
83    Returns:
84        Union[Sequence[X], X]:
85          if sequence is length one, return the first item
86          otherwise return the sequence
87    """
88    if len(seq) == 1:
89        return seq[0]
90    return seq

Returns the first item of a sequence

Arguments:
  • seq (Sequence[X]): A Sequence of any type
Returns:

Union[Sequence[X], X]: if sequence is length one, return the first item otherwise return the sequence

def get_dir_size(path: str) -> int:
 93def get_dir_size(path: str) -> int:
 94    """
 95    Recursively descend the directory tree rooted at the top and call
 96      .st_size function to calculate size of files in bytes.
 97    Args:
 98        path: path to a folder
 99    return: total size of all the files in a given directory in bytes.
100    """
101    total = 0
102    # Recursively scan directory to find entries
103    with os.scandir(path) as itr:
104        for entry in itr:
105            if entry.is_file():
106                total += entry.stat().st_size
107            elif entry.is_dir():
108                total += get_dir_size(entry.path)
109    return total

Recursively descend the directory tree rooted at the top and call .st_size function to calculate size of files in bytes.

Arguments:
  • path: path to a folder

return: total size of all the files in a given directory in bytes.

def calculate_datetime( minutes: int, input_date: datetime.datetime, before_or_after: str = 'before') -> datetime.datetime:
112def calculate_datetime(
113    minutes: int, input_date: datetime, before_or_after: str = "before"
114) -> datetime:
115    """calculate date time
116
117    Args:
118        input_date (datetime): date time object provided by users
119        minutes (int): number of minutes
120        before_or_after (str): default to "before". if "before", calculate x minutes before
121         current date time. if "after", calculate x minutes after current date time.
122
123    Returns:
124        datetime:  return result of date time calculation
125    """
126    if before_or_after == "before":
127        date_time_result = input_date - timedelta(minutes=minutes)
128    elif before_or_after == "after":
129        date_time_result = input_date + timedelta(minutes=minutes)
130    else:
131        raise ValueError("Invalid value. Use either 'before' or 'after'.")
132    return date_time_result

calculate date time

Arguments:
  • input_date (datetime): date time object provided by users
  • minutes (int): number of minutes
  • before_or_after (str): default to "before". if "before", calculate x minutes before current date time. if "after", calculate x minutes after current date time.
Returns:

datetime: return result of date time calculation

def check_synapse_cache_size(directory: str = '/root/.synapseCache') -> float:
135def check_synapse_cache_size(directory: str = "/root/.synapseCache") -> float:
136    """Calculate size of .synapseCache directory in bytes using pathlib.
137
138    Args:
139        directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache'
140
141    Returns:
142        float: size of .synapsecache directory in bytes
143    """
144    total_size = sum(
145        f.stat().st_size for f in Path(directory).rglob("*") if f.is_file()
146    )
147    return total_size

Calculate size of .synapseCache directory in bytes using pathlib.

Arguments:
  • directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache'
Returns:

float: size of .synapsecache directory in bytes

def clear_synapse_cache(synapse_cache: synapseclient.core.cache.Cache, minutes: int) -> int:
150def clear_synapse_cache(synapse_cache: cache.Cache, minutes: int) -> int:
151    """clear synapse cache before a certain time
152
153    Args:
154        synapse_cache: an object of synapseclient Cache.
155        minutes (int): all files before this minute will be removed
156    Returns:
157        int: number of files that get deleted
158    """
159    current_date = datetime.utcnow()
160    minutes_earlier = calculate_datetime(
161        input_date=current_date, minutes=minutes, before_or_after="before"
162    )
163    num_of_deleted_files = synapse_cache.purge(before_date=minutes_earlier)
164    return num_of_deleted_files

clear synapse cache before a certain time

Arguments:
  • synapse_cache: an object of synapseclient Cache.
  • minutes (int): all files before this minute will be removed
Returns:

int: number of files that get deleted

def entity_type_mapping( syn: synapseclient.client.Synapse, entity_id: str, synapse_entity_tracker: Optional[schematic.store.synapse_tracker.SynapseEntityTracker] = None) -> str:
167def entity_type_mapping(
168    syn: Synapse,
169    entity_id: str,
170    synapse_entity_tracker: Optional[SynapseEntityTracker] = None,
171) -> str:
172    """Return the entity type of manifest
173
174    Args:
175        syn (Synapse): Synapse object
176        entity_id (str): id of an entity
177        synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
178
179    Raises:
180        SynapseHTTPError: Re-raised SynapseHTTPError
181
182    Returns:
183        str: type of the manifest being returned
184    """
185    # check the type of entity
186    try:
187        if not synapse_entity_tracker:
188            synapse_entity_tracker = SynapseEntityTracker()
189        entity = synapse_entity_tracker.get(
190            synapse_id=entity_id, syn=syn, download_file=False
191        )
192    except SynapseHTTPError as exc:
193        logger.error(
194            f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists"
195        )
196        raise SynapseHTTPError(
197            f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists"
198        ) from exc
199
200    if isinstance(entity, EntityViewSchema):
201        entity_type = "asset view"
202    elif isinstance(entity, Folder):
203        entity_type = "folder"
204    elif isinstance(entity, File):
205        entity_type = "file"
206    elif isinstance(entity, Project):
207        entity_type = "project"
208    else:
209        assert entity is not None
210        # if there's no matching type, return concreteType
211        entity_type = entity.concreteType
212    return entity_type

Return the entity type of manifest

Arguments:
  • syn (Synapse): Synapse object
  • entity_id (str): id of an entity
  • synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
Raises:
  • SynapseHTTPError: Re-raised SynapseHTTPError
Returns:

str: type of the manifest being returned

def create_temp_folder(path: str, prefix: Optional[str] = None) -> str:
215def create_temp_folder(path: str, prefix: Optional[str] = None) -> str:
216    """This function creates a temporary directory in the specified directory
217    Args:
218        path(str): a directory path where all the temporary files will live
219        prefix(str): a prefix to be added to the temporary directory name
220    Returns: returns the absolute pathname of the new directory.
221    """
222    if not os.path.exists(path):
223        os.makedirs(path, exist_ok=True)
224
225    # Create a temporary directory in the specified directory
226    path = tempfile.mkdtemp(dir=path, prefix=prefix)
227    return path

This function creates a temporary directory in the specified directory

Arguments:
  • path(str): a directory path where all the temporary files will live
  • prefix(str): a prefix to be added to the temporary directory name

Returns: returns the absolute pathname of the new directory.

def profile( output_file: Optional[str] = None, sort_by: Any = 'cumulative', lines_to_print: Optional[int] = None, strip_dirs: bool = False) -> Callable:
230def profile(
231    output_file: Optional[str] = None,
232    sort_by: Any = "cumulative",
233    lines_to_print: Optional[int] = None,
234    strip_dirs: bool = False,
235) -> Callable:
236    """
237    The function was initially taken from:
238    https://towardsdatascience.com/how-to-profile-your-code-in-python-e70c834fad89
239    A time profiler decorator.
240    Inspired by and modified the profile decorator of Giampaolo Rodola:
241    http://code.activestate.com/recipes/577817-profile-decorator/
242
243    Args:
244        output_file (Optional[str], optional):
245            Path of the output file. If only name of the file is given, it's
246            saved in the current directory.
247            If it's None, the name of the decorated function is used.
248            Defaults to None.
249        sort_by (str, optional):
250            str or SortKey enum or tuple/list of str/SortKey enum
251            Sorting criteria for the Stats object.
252            For a list of valid string and SortKey refer to:
253            https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats
254            Defaults to "cumulative".
255        lines_to_print (Optional[int], optional):
256            Number of lines to print.
257            This is useful in reducing the size of the printout, especially
258            that sorting by 'cumulative', the time consuming operations
259            are printed toward the top of the file.
260            Default (None) is for all the lines.
261        strip_dirs (bool, optional):
262            Whether to remove the leading path info from file names.
263            This is also useful in reducing the size of the printout
264            Defaults to False.
265
266    Returns:
267        Callable: Profile of the decorated function
268    """
269
270    def inner(func: Callable) -> Callable:
271        @wraps(func)
272        def wrapper(*args: Any, **kwargs: Any) -> Callable:
273            _output_file = output_file or func.__name__ + ".prof"
274            profiler = Profile()
275            profiler.enable()
276            retval = func(*args, **kwargs)
277            profiler.disable()
278            profiler.dump_stats(_output_file)
279
280            # if we are running the functions on AWS:
281            if "SECRETS_MANAGER_SECRETS" in os.environ:
282                p_stats = pstats.Stats(profiler)
283                # limit this to 30 line for now otherwise it will be too long for AWS log
284                p_stats.sort_stats("cumulative").print_stats(30)
285            else:
286                with open(_output_file, "w", encoding="utf-8") as fle:
287                    p_stats = pstats.Stats(profiler, stream=fle)
288                    if strip_dirs:
289                        p_stats.strip_dirs()
290                    if isinstance(sort_by, (tuple, list)):
291                        p_stats.sort_stats(*sort_by)
292                    else:
293                        p_stats.sort_stats(sort_by)
294                    p_stats.print_stats(lines_to_print)  # type: ignore
295            return retval
296
297        return wrapper
298
299    return inner

The function was initially taken from: https://towardsdatascience.com/how-to-profile-your-code-in-python-e70c834fad89 A time profiler decorator. Inspired by and modified the profile decorator of Giampaolo Rodola: http://code.activestate.com/recipes/577817-profile-decorator/

Arguments:
  • output_file (Optional[str], optional): Path of the output file. If only name of the file is given, it's saved in the current directory. If it's None, the name of the decorated function is used. Defaults to None.
  • sort_by (str, optional): str or SortKey enum or tuple/list of str/SortKey enum Sorting criteria for the Stats object. For a list of valid string and SortKey refer to: https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats Defaults to "cumulative".
  • lines_to_print (Optional[int], optional): Number of lines to print. This is useful in reducing the size of the printout, especially that sorting by 'cumulative', the time consuming operations are printed toward the top of the file. Default (None) is for all the lines.
  • strip_dirs (bool, optional): Whether to remove the leading path info from file names. This is also useful in reducing the size of the printout Defaults to False.
Returns:

Callable: Profile of the decorated function

def normalize_path(path: str, parent_folder: str) -> str:
302def normalize_path(path: str, parent_folder: str) -> str:
303    """
304    Normalizes a path.
305    If the path is relative, the parent_folder is added to make it an absolute path.
306
307    Args:
308        path (str): The path to the file to normalize.
309        parent_folder (str): The folder the file is in.
310
311    Returns:
312        str: The normalized path.
313    """
314    if not os.path.isabs(path):
315        path = os.path.join(parent_folder, path)
316    return os.path.normpath(path)

Normalizes a path. If the path is relative, the parent_folder is added to make it an absolute path.

Arguments:
  • path (str): The path to the file to normalize.
  • parent_folder (str): The folder the file is in.
Returns:

str: The normalized path.