schematic.utils.general
General utils
1"""General utils""" 2 3# pylint: disable=logging-fstring-interpolation 4 5import logging 6import os 7import pstats 8from pathlib import Path 9import tempfile 10from cProfile import Profile 11from datetime import datetime, timedelta 12from functools import wraps 13from typing import Any, Callable, Optional, Sequence, TypeVar, Union 14 15from synapseclient import Synapse # type: ignore 16from synapseclient.core import cache # type: ignore 17from synapseclient.core.exceptions import SynapseHTTPError # type: ignore 18from synapseclient.entity import File, Folder, Project # type: ignore 19from synapseclient.table import EntityViewSchema # type: ignore 20 21from schematic.store.synapse_tracker import SynapseEntityTracker 22 23logger = logging.getLogger(__name__) 24 25T = TypeVar("T") 26 27SYN_ID_REGEX = r"(syn\d+\,?)+" 28 29 30def find_duplicates(_list: list[T]) -> set[T]: 31 """Find duplicate items in a list""" 32 return {x for x in _list if _list.count(x) > 1} 33 34 35def dict2list(item: Any) -> Optional[Union[dict, list]]: 36 """Puts a dictionary into a list 37 38 Args: 39 item (Any): Any type of input 40 41 Returns: 42 Optional[Union[dict, list]]: 43 If input is a list, return it 44 If input is a dict, return it in a list 45 Return None for anything else 46 """ 47 if isinstance(item, list): 48 return item 49 if isinstance(item, dict): 50 return [item] 51 return None 52 53 54def str2list(item: Any) -> Optional[list]: 55 """Puts a string into a list 56 57 Args: 58 item (Any): Any type of input 59 60 Returns: 61 Optional[list]: 62 If input is a list, return it 63 If input is a string, return it in a list 64 Return None for anything else 65 """ 66 if isinstance(item, str): 67 return [item] 68 if isinstance(item, list): 69 return item 70 return None 71 72 73X = TypeVar("X") 74 75 76def unlist(seq: Sequence[X]) -> Union[Sequence[X], X]: 77 """Returns the first item of a sequence 78 79 Args: 80 seq (Sequence[X]): A Sequence of any type 81 82 Returns: 83 Union[Sequence[X], X]: 84 if sequence is length one, return the first item 85 otherwise return the sequence 86 """ 87 if len(seq) == 1: 88 return seq[0] 89 return seq 90 91 92def get_dir_size(path: str) -> int: 93 """ 94 Recursively descend the directory tree rooted at the top and call 95 .st_size function to calculate size of files in bytes. 96 Args: 97 path: path to a folder 98 return: total size of all the files in a given directory in bytes. 99 """ 100 total = 0 101 # Recursively scan directory to find entries 102 with os.scandir(path) as itr: 103 for entry in itr: 104 if entry.is_file(): 105 total += entry.stat().st_size 106 elif entry.is_dir(): 107 total += get_dir_size(entry.path) 108 return total 109 110 111def calculate_datetime( 112 minutes: int, input_date: datetime, before_or_after: str = "before" 113) -> datetime: 114 """calculate date time 115 116 Args: 117 input_date (datetime): date time object provided by users 118 minutes (int): number of minutes 119 before_or_after (str): default to "before". if "before", calculate x minutes before 120 current date time. if "after", calculate x minutes after current date time. 121 122 Returns: 123 datetime: return result of date time calculation 124 """ 125 if before_or_after == "before": 126 date_time_result = input_date - timedelta(minutes=minutes) 127 elif before_or_after == "after": 128 date_time_result = input_date + timedelta(minutes=minutes) 129 else: 130 raise ValueError("Invalid value. Use either 'before' or 'after'.") 131 return date_time_result 132 133 134def check_synapse_cache_size(directory: str = "/root/.synapseCache") -> float: 135 """Calculate size of .synapseCache directory in bytes using pathlib. 136 137 Args: 138 directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache' 139 140 Returns: 141 float: size of .synapsecache directory in bytes 142 """ 143 total_size = sum( 144 f.stat().st_size for f in Path(directory).rglob("*") if f.is_file() 145 ) 146 return total_size 147 148 149def clear_synapse_cache(synapse_cache: cache.Cache, minutes: int) -> int: 150 """clear synapse cache before a certain time 151 152 Args: 153 synapse_cache: an object of synapseclient Cache. 154 minutes (int): all files before this minute will be removed 155 Returns: 156 int: number of files that get deleted 157 """ 158 current_date = datetime.utcnow() 159 minutes_earlier = calculate_datetime( 160 input_date=current_date, minutes=minutes, before_or_after="before" 161 ) 162 num_of_deleted_files = synapse_cache.purge(before_date=minutes_earlier) 163 return num_of_deleted_files 164 165 166def entity_type_mapping( 167 syn: Synapse, 168 entity_id: str, 169 synapse_entity_tracker: Optional[SynapseEntityTracker] = None, 170) -> str: 171 """Return the entity type of manifest 172 173 Args: 174 syn (Synapse): Synapse object 175 entity_id (str): id of an entity 176 synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities 177 178 Raises: 179 SynapseHTTPError: Re-raised SynapseHTTPError 180 181 Returns: 182 str: type of the manifest being returned 183 """ 184 # check the type of entity 185 try: 186 if not synapse_entity_tracker: 187 synapse_entity_tracker = SynapseEntityTracker() 188 entity = synapse_entity_tracker.get( 189 synapse_id=entity_id, syn=syn, download_file=False 190 ) 191 except SynapseHTTPError as exc: 192 logger.error( 193 f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists" 194 ) 195 raise SynapseHTTPError( 196 f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists" 197 ) from exc 198 199 if isinstance(entity, EntityViewSchema): 200 entity_type = "asset view" 201 elif isinstance(entity, Folder): 202 entity_type = "folder" 203 elif isinstance(entity, File): 204 entity_type = "file" 205 elif isinstance(entity, Project): 206 entity_type = "project" 207 else: 208 assert entity is not None 209 # if there's no matching type, return concreteType 210 entity_type = entity.concreteType 211 return entity_type 212 213 214def create_temp_folder(path: str, prefix: Optional[str] = None) -> str: 215 """This function creates a temporary directory in the specified directory 216 Args: 217 path(str): a directory path where all the temporary files will live 218 prefix(str): a prefix to be added to the temporary directory name 219 Returns: returns the absolute pathname of the new directory. 220 """ 221 if not os.path.exists(path): 222 os.makedirs(path, exist_ok=True) 223 224 # Create a temporary directory in the specified directory 225 path = tempfile.mkdtemp(dir=path, prefix=prefix) 226 return path 227 228 229def profile( 230 output_file: Optional[str] = None, 231 sort_by: Any = "cumulative", 232 lines_to_print: Optional[int] = None, 233 strip_dirs: bool = False, 234) -> Callable: 235 """ 236 The function was initially taken from: 237 https://towardsdatascience.com/how-to-profile-your-code-in-python-e70c834fad89 238 A time profiler decorator. 239 Inspired by and modified the profile decorator of Giampaolo Rodola: 240 http://code.activestate.com/recipes/577817-profile-decorator/ 241 242 Args: 243 output_file (Optional[str], optional): 244 Path of the output file. If only name of the file is given, it's 245 saved in the current directory. 246 If it's None, the name of the decorated function is used. 247 Defaults to None. 248 sort_by (str, optional): 249 str or SortKey enum or tuple/list of str/SortKey enum 250 Sorting criteria for the Stats object. 251 For a list of valid string and SortKey refer to: 252 https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats 253 Defaults to "cumulative". 254 lines_to_print (Optional[int], optional): 255 Number of lines to print. 256 This is useful in reducing the size of the printout, especially 257 that sorting by 'cumulative', the time consuming operations 258 are printed toward the top of the file. 259 Default (None) is for all the lines. 260 strip_dirs (bool, optional): 261 Whether to remove the leading path info from file names. 262 This is also useful in reducing the size of the printout 263 Defaults to False. 264 265 Returns: 266 Callable: Profile of the decorated function 267 """ 268 269 def inner(func: Callable) -> Callable: 270 @wraps(func) 271 def wrapper(*args: Any, **kwargs: Any) -> Callable: 272 _output_file = output_file or func.__name__ + ".prof" 273 profiler = Profile() 274 profiler.enable() 275 retval = func(*args, **kwargs) 276 profiler.disable() 277 profiler.dump_stats(_output_file) 278 279 # if we are running the functions on AWS: 280 if "SECRETS_MANAGER_SECRETS" in os.environ: 281 p_stats = pstats.Stats(profiler) 282 # limit this to 30 line for now otherwise it will be too long for AWS log 283 p_stats.sort_stats("cumulative").print_stats(30) 284 else: 285 with open(_output_file, "w", encoding="utf-8") as fle: 286 p_stats = pstats.Stats(profiler, stream=fle) 287 if strip_dirs: 288 p_stats.strip_dirs() 289 if isinstance(sort_by, (tuple, list)): 290 p_stats.sort_stats(*sort_by) 291 else: 292 p_stats.sort_stats(sort_by) 293 p_stats.print_stats(lines_to_print) # type: ignore 294 return retval 295 296 return wrapper 297 298 return inner 299 300 301def normalize_path(path: str, parent_folder: str) -> str: 302 """ 303 Normalizes a path. 304 If the path is relative, the parent_folder is added to make it an absolute path. 305 306 Args: 307 path (str): The path to the file to normalize. 308 parent_folder (str): The folder the file is in. 309 310 Returns: 311 str: The normalized path. 312 """ 313 if not os.path.isabs(path): 314 path = os.path.join(parent_folder, path) 315 return os.path.normpath(path)
31def find_duplicates(_list: list[T]) -> set[T]: 32 """Find duplicate items in a list""" 33 return {x for x in _list if _list.count(x) > 1}
Find duplicate items in a list
36def dict2list(item: Any) -> Optional[Union[dict, list]]: 37 """Puts a dictionary into a list 38 39 Args: 40 item (Any): Any type of input 41 42 Returns: 43 Optional[Union[dict, list]]: 44 If input is a list, return it 45 If input is a dict, return it in a list 46 Return None for anything else 47 """ 48 if isinstance(item, list): 49 return item 50 if isinstance(item, dict): 51 return [item] 52 return None
Puts a dictionary into a list
Arguments:
- item (Any): Any type of input
Returns:
Optional[Union[dict, list]]: If input is a list, return it If input is a dict, return it in a list Return None for anything else
55def str2list(item: Any) -> Optional[list]: 56 """Puts a string into a list 57 58 Args: 59 item (Any): Any type of input 60 61 Returns: 62 Optional[list]: 63 If input is a list, return it 64 If input is a string, return it in a list 65 Return None for anything else 66 """ 67 if isinstance(item, str): 68 return [item] 69 if isinstance(item, list): 70 return item 71 return None
Puts a string into a list
Arguments:
- item (Any): Any type of input
Returns:
Optional[list]: If input is a list, return it If input is a string, return it in a list Return None for anything else
77def unlist(seq: Sequence[X]) -> Union[Sequence[X], X]: 78 """Returns the first item of a sequence 79 80 Args: 81 seq (Sequence[X]): A Sequence of any type 82 83 Returns: 84 Union[Sequence[X], X]: 85 if sequence is length one, return the first item 86 otherwise return the sequence 87 """ 88 if len(seq) == 1: 89 return seq[0] 90 return seq
Returns the first item of a sequence
Arguments:
- seq (Sequence[X]): A Sequence of any type
Returns:
Union[Sequence[X], X]: if sequence is length one, return the first item otherwise return the sequence
93def get_dir_size(path: str) -> int: 94 """ 95 Recursively descend the directory tree rooted at the top and call 96 .st_size function to calculate size of files in bytes. 97 Args: 98 path: path to a folder 99 return: total size of all the files in a given directory in bytes. 100 """ 101 total = 0 102 # Recursively scan directory to find entries 103 with os.scandir(path) as itr: 104 for entry in itr: 105 if entry.is_file(): 106 total += entry.stat().st_size 107 elif entry.is_dir(): 108 total += get_dir_size(entry.path) 109 return total
Recursively descend the directory tree rooted at the top and call .st_size function to calculate size of files in bytes.
Arguments:
- path: path to a folder
return: total size of all the files in a given directory in bytes.
112def calculate_datetime( 113 minutes: int, input_date: datetime, before_or_after: str = "before" 114) -> datetime: 115 """calculate date time 116 117 Args: 118 input_date (datetime): date time object provided by users 119 minutes (int): number of minutes 120 before_or_after (str): default to "before". if "before", calculate x minutes before 121 current date time. if "after", calculate x minutes after current date time. 122 123 Returns: 124 datetime: return result of date time calculation 125 """ 126 if before_or_after == "before": 127 date_time_result = input_date - timedelta(minutes=minutes) 128 elif before_or_after == "after": 129 date_time_result = input_date + timedelta(minutes=minutes) 130 else: 131 raise ValueError("Invalid value. Use either 'before' or 'after'.") 132 return date_time_result
calculate date time
Arguments:
- input_date (datetime): date time object provided by users
- minutes (int): number of minutes
- before_or_after (str): default to "before". if "before", calculate x minutes before current date time. if "after", calculate x minutes after current date time.
Returns:
datetime: return result of date time calculation
135def check_synapse_cache_size(directory: str = "/root/.synapseCache") -> float: 136 """Calculate size of .synapseCache directory in bytes using pathlib. 137 138 Args: 139 directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache' 140 141 Returns: 142 float: size of .synapsecache directory in bytes 143 """ 144 total_size = sum( 145 f.stat().st_size for f in Path(directory).rglob("*") if f.is_file() 146 ) 147 return total_size
Calculate size of .synapseCache directory in bytes using pathlib.
Arguments:
- directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache'
Returns:
float: size of .synapsecache directory in bytes
150def clear_synapse_cache(synapse_cache: cache.Cache, minutes: int) -> int: 151 """clear synapse cache before a certain time 152 153 Args: 154 synapse_cache: an object of synapseclient Cache. 155 minutes (int): all files before this minute will be removed 156 Returns: 157 int: number of files that get deleted 158 """ 159 current_date = datetime.utcnow() 160 minutes_earlier = calculate_datetime( 161 input_date=current_date, minutes=minutes, before_or_after="before" 162 ) 163 num_of_deleted_files = synapse_cache.purge(before_date=minutes_earlier) 164 return num_of_deleted_files
clear synapse cache before a certain time
Arguments:
- synapse_cache: an object of synapseclient Cache.
- minutes (int): all files before this minute will be removed
Returns:
int: number of files that get deleted
167def entity_type_mapping( 168 syn: Synapse, 169 entity_id: str, 170 synapse_entity_tracker: Optional[SynapseEntityTracker] = None, 171) -> str: 172 """Return the entity type of manifest 173 174 Args: 175 syn (Synapse): Synapse object 176 entity_id (str): id of an entity 177 synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities 178 179 Raises: 180 SynapseHTTPError: Re-raised SynapseHTTPError 181 182 Returns: 183 str: type of the manifest being returned 184 """ 185 # check the type of entity 186 try: 187 if not synapse_entity_tracker: 188 synapse_entity_tracker = SynapseEntityTracker() 189 entity = synapse_entity_tracker.get( 190 synapse_id=entity_id, syn=syn, download_file=False 191 ) 192 except SynapseHTTPError as exc: 193 logger.error( 194 f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists" 195 ) 196 raise SynapseHTTPError( 197 f"cannot get {entity_id} from asset store. Please make sure that {entity_id} exists" 198 ) from exc 199 200 if isinstance(entity, EntityViewSchema): 201 entity_type = "asset view" 202 elif isinstance(entity, Folder): 203 entity_type = "folder" 204 elif isinstance(entity, File): 205 entity_type = "file" 206 elif isinstance(entity, Project): 207 entity_type = "project" 208 else: 209 assert entity is not None 210 # if there's no matching type, return concreteType 211 entity_type = entity.concreteType 212 return entity_type
Return the entity type of manifest
Arguments:
- syn (Synapse): Synapse object
- entity_id (str): id of an entity
- synapse_entity_tracker: Tracker for a pull-through cache of Synapse entities
Raises:
- SynapseHTTPError: Re-raised SynapseHTTPError
Returns:
str: type of the manifest being returned
215def create_temp_folder(path: str, prefix: Optional[str] = None) -> str: 216 """This function creates a temporary directory in the specified directory 217 Args: 218 path(str): a directory path where all the temporary files will live 219 prefix(str): a prefix to be added to the temporary directory name 220 Returns: returns the absolute pathname of the new directory. 221 """ 222 if not os.path.exists(path): 223 os.makedirs(path, exist_ok=True) 224 225 # Create a temporary directory in the specified directory 226 path = tempfile.mkdtemp(dir=path, prefix=prefix) 227 return path
This function creates a temporary directory in the specified directory
Arguments:
- path(str): a directory path where all the temporary files will live
- prefix(str): a prefix to be added to the temporary directory name
Returns: returns the absolute pathname of the new directory.
230def profile( 231 output_file: Optional[str] = None, 232 sort_by: Any = "cumulative", 233 lines_to_print: Optional[int] = None, 234 strip_dirs: bool = False, 235) -> Callable: 236 """ 237 The function was initially taken from: 238 https://towardsdatascience.com/how-to-profile-your-code-in-python-e70c834fad89 239 A time profiler decorator. 240 Inspired by and modified the profile decorator of Giampaolo Rodola: 241 http://code.activestate.com/recipes/577817-profile-decorator/ 242 243 Args: 244 output_file (Optional[str], optional): 245 Path of the output file. If only name of the file is given, it's 246 saved in the current directory. 247 If it's None, the name of the decorated function is used. 248 Defaults to None. 249 sort_by (str, optional): 250 str or SortKey enum or tuple/list of str/SortKey enum 251 Sorting criteria for the Stats object. 252 For a list of valid string and SortKey refer to: 253 https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats 254 Defaults to "cumulative". 255 lines_to_print (Optional[int], optional): 256 Number of lines to print. 257 This is useful in reducing the size of the printout, especially 258 that sorting by 'cumulative', the time consuming operations 259 are printed toward the top of the file. 260 Default (None) is for all the lines. 261 strip_dirs (bool, optional): 262 Whether to remove the leading path info from file names. 263 This is also useful in reducing the size of the printout 264 Defaults to False. 265 266 Returns: 267 Callable: Profile of the decorated function 268 """ 269 270 def inner(func: Callable) -> Callable: 271 @wraps(func) 272 def wrapper(*args: Any, **kwargs: Any) -> Callable: 273 _output_file = output_file or func.__name__ + ".prof" 274 profiler = Profile() 275 profiler.enable() 276 retval = func(*args, **kwargs) 277 profiler.disable() 278 profiler.dump_stats(_output_file) 279 280 # if we are running the functions on AWS: 281 if "SECRETS_MANAGER_SECRETS" in os.environ: 282 p_stats = pstats.Stats(profiler) 283 # limit this to 30 line for now otherwise it will be too long for AWS log 284 p_stats.sort_stats("cumulative").print_stats(30) 285 else: 286 with open(_output_file, "w", encoding="utf-8") as fle: 287 p_stats = pstats.Stats(profiler, stream=fle) 288 if strip_dirs: 289 p_stats.strip_dirs() 290 if isinstance(sort_by, (tuple, list)): 291 p_stats.sort_stats(*sort_by) 292 else: 293 p_stats.sort_stats(sort_by) 294 p_stats.print_stats(lines_to_print) # type: ignore 295 return retval 296 297 return wrapper 298 299 return inner
The function was initially taken from: https://towardsdatascience.com/how-to-profile-your-code-in-python-e70c834fad89 A time profiler decorator. Inspired by and modified the profile decorator of Giampaolo Rodola: http://code.activestate.com/recipes/577817-profile-decorator/
Arguments:
- output_file (Optional[str], optional): Path of the output file. If only name of the file is given, it's saved in the current directory. If it's None, the name of the decorated function is used. Defaults to None.
- sort_by (str, optional): str or SortKey enum or tuple/list of str/SortKey enum Sorting criteria for the Stats object. For a list of valid string and SortKey refer to: https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats Defaults to "cumulative".
- lines_to_print (Optional[int], optional): Number of lines to print. This is useful in reducing the size of the printout, especially that sorting by 'cumulative', the time consuming operations are printed toward the top of the file. Default (None) is for all the lines.
- strip_dirs (bool, optional): Whether to remove the leading path info from file names. This is also useful in reducing the size of the printout Defaults to False.
Returns:
Callable: Profile of the decorated function
302def normalize_path(path: str, parent_folder: str) -> str: 303 """ 304 Normalizes a path. 305 If the path is relative, the parent_folder is added to make it an absolute path. 306 307 Args: 308 path (str): The path to the file to normalize. 309 parent_folder (str): The folder the file is in. 310 311 Returns: 312 str: The normalized path. 313 """ 314 if not os.path.isabs(path): 315 path = os.path.join(parent_folder, path) 316 return os.path.normpath(path)
Normalizes a path. If the path is relative, the parent_folder is added to make it an absolute path.
Arguments:
- path (str): The path to the file to normalize.
- parent_folder (str): The folder the file is in.
Returns:
str: The normalized path.