schematic.schemas.data_model_graph
DataModel Graph
1"""DataModel Graph""" 2 3import logging 4from typing import Any, Optional, Union 5 6import graphviz # type: ignore 7import networkx as nx # type: ignore 8from opentelemetry import trace 9 10from schematic.schemas.data_model_edges import DataModelEdges 11from schematic.schemas.data_model_nodes import DataModelNodes 12from schematic.schemas.data_model_relationships import DataModelRelationships 13from schematic.utils.general import unlist 14from schematic.utils.schema_utils import ( 15 DisplayLabelType, 16 extract_component_validation_rules, 17 get_class_label_from_display_name, 18 get_property_label_from_display_name, 19) 20from schematic.utils.validate_utils import rule_in_rule_list 21from schematic.utils.viz_utils import visualize 22 23logger = logging.getLogger(__name__) 24 25 26logger = logging.getLogger(__name__) 27tracer = trace.get_tracer("Schematic") 28 29 30class DataModelGraphMeta: # pylint: disable=too-few-public-methods 31 """DataModelGraphMeta""" 32 33 _instances: dict = {} 34 35 def __call__( # pylint: disable=no-self-argument 36 cls, *args: Any, **kwargs: Any 37 ) -> Any: 38 """ 39 Possible changes to the value of the `__init__` argument do not affect 40 the returned instance. 41 """ 42 if cls not in cls._instances: 43 instance = super().__call__(*args, **kwargs) # type: ignore # pylint: disable=no-member 44 cls._instances[cls] = instance 45 return cls._instances[cls] 46 47 48class DataModelGraph: # pylint: disable=too-few-public-methods 49 """ 50 Generate graph network (networkx) from the attributes and relationships returned 51 from the data model parser. 52 53 Create a singleton. 54 """ 55 56 __metaclass__ = DataModelGraphMeta 57 58 def __init__( 59 self, 60 attribute_relationships_dict: dict, 61 data_model_labels: DisplayLabelType = "class_label", 62 ) -> None: 63 """Load parsed data model. 64 Args: 65 attributes_relationship_dict, dict: generated in data_model_parser 66 {Attribute Display Name: { 67 Relationships: { 68 CSV Header: Value}}} 69 data_model_labels: str, display_label or class_label. 70 display_label, use the display name as a label, if it is valid 71 (contains no blacklisted characters) otherwise will default to schema_label. 72 class_label, default, use standard class or property label. 73 Raises: 74 ValueError, attribute_relationship_dict not loaded. 75 """ 76 self.attribute_relationships_dict = attribute_relationships_dict 77 self.dmn = DataModelNodes(self.attribute_relationships_dict) 78 self.dme = DataModelEdges() 79 self.dmr = DataModelRelationships() 80 self.data_model_labels = data_model_labels 81 82 if not self.attribute_relationships_dict: 83 raise ValueError( 84 ( 85 "Something has gone wrong, a data model was not loaded into the DataModelGraph " 86 "Class. Please check that your paths are correct" 87 ) 88 ) 89 self.graph = self.generate_data_model_graph() 90 91 @tracer.start_as_current_span("DataModelGraph::generate_data_model_graph") 92 def generate_data_model_graph(self) -> nx.MultiDiGraph: 93 """ 94 Generate NetworkX Graph from the Relationships/attributes dictionary, the graph is built 95 by first adding all nodes to the graph, then connecting nodes by the relationships defined 96 in the attributes_relationship dictionary. 97 Returns: 98 G: nx.MultiDiGraph, networkx graph representation of the data model 99 """ 100 # Get all relationships with edges 101 edge_relationships = self.dmr.retreive_rel_headers_dict(edge=True) 102 103 # Find all nodes 104 all_nodes = self.dmn.gather_all_nodes_in_model( 105 attr_rel_dict=self.attribute_relationships_dict 106 ) 107 108 # Instantiate NetworkX MultiDigraph 109 graph = nx.MultiDiGraph() 110 111 all_node_dict = {} 112 113 ## Fill in MultiDigraph with nodes 114 for node in all_nodes: 115 # Gather information for each node 116 node_dict = self.dmn.generate_node_dict( 117 node_display_name=node, 118 attr_rel_dict=self.attribute_relationships_dict, 119 data_model_labels=self.data_model_labels, 120 ) 121 122 # Add each node to the all_node_dict to be used for generating edges 123 all_node_dict[node] = node_dict 124 125 # Generate node and attach information (attributes) to each node 126 graph = self.dmn.generate_node(graph, node_dict) 127 128 edge_list: list[tuple[str, str, dict[str, Union[str, int]]]] = [] 129 ## Connect nodes via edges 130 for node in all_nodes: 131 # Generate edges 132 edge_list_2 = self.dme.generate_edge( 133 node, 134 all_node_dict, 135 self.attribute_relationships_dict, 136 edge_relationships, 137 edge_list, 138 ) 139 edge_list = edge_list_2.copy() 140 141 # Add edges to the Graph 142 for node_1, node_2, edge_dict in edge_list: 143 graph.add_edge( 144 node_1, node_2, key=edge_dict["key"], weight=edge_dict["weight"] 145 ) 146 return graph 147 148 149class DataModelGraphExplorer: # pylint: disable=too-many-public-methods 150 """DataModelGraphExplorer""" 151 152 def __init__( 153 self, 154 graph: nx.MultiDiGraph, 155 ): 156 """Load data model graph as a singleton. 157 Args: 158 G: nx.MultiDiGraph, networkx graph representation of the data model 159 """ 160 self.graph = graph # At this point the graph is expected to be fully formed. 161 self.dmr = DataModelRelationships() 162 self.rel_dict = self.dmr.relationships_dictionary 163 164 def find_properties(self) -> set[str]: 165 """ 166 Identify all properties, as defined by the first node in a pair, connected with 167 'domainIncludes' edge type 168 169 Returns: 170 properties, set: All properties defined in the data model, each property name 171 is defined by its label. 172 """ 173 properties_list: list[str] = [] 174 for node_1, _, rel in self.graph.edges: 175 if rel == self.rel_dict["domainIncludes"]["edge_key"]: 176 properties_list.append(node_1) 177 properties_set = set(properties_list) 178 return properties_set 179 180 def find_classes(self) -> set[str]: 181 """ 182 Identify all classes, as defined but all nodes, minus all properties 183 (which are explicitly defined) 184 Returns: 185 classes, set: All classes defined in the data model, each class 186 name is defined by its label. 187 """ 188 nodes = self.graph.nodes 189 properties = self.find_properties() 190 classes = nodes - properties 191 return classes 192 193 def find_node_range( 194 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 195 ) -> list: 196 """Get valid values for the given node (attribute) 197 Args: 198 node_label, str, Optional[str]: label of the node for which to retrieve valid values 199 node_display_name, str, Optional[str]: Display Name of the node for which to 200 retrieve valid values 201 Returns: 202 valid_values, list: List of valid values associated with the provided node. 203 """ 204 if not node_label: 205 assert node_display_name is not None 206 node_label = self.get_node_label(node_display_name) 207 208 valid_values = [] 209 for node_1, node_2, rel in self.graph.edges: 210 if ( 211 node_1 == node_label 212 and rel == self.rel_dict["rangeIncludes"]["edge_key"] 213 ): 214 valid_values.append(node_2) 215 valid_values = list(set(valid_values)) 216 return valid_values 217 218 def get_adjacent_nodes_by_relationship( 219 self, node_label: str, relationship: str 220 ) -> list[str]: 221 """Get a list of nodes that is / are adjacent to a given node, based on a relationship type. 222 223 Args: 224 node_label: label of the the node whose edges we need to look at. 225 relationship: the type of link(s) that the above node and its immediate neighbors share. 226 227 Returns: 228 List of nodes that are adjacent to the given node. 229 #checked 230 """ 231 nodes = set() 232 for _, node_2, key, _ in self.graph.out_edges(node_label, data=True, keys=True): 233 if key == relationship: 234 nodes.add(node_2) 235 236 return list(nodes) 237 238 def get_component_node_required( 239 self, 240 manifest_component: str, 241 node_validation_rules: Optional[list[str]] = None, 242 node_label: Optional[str] = None, 243 node_display_name: Optional[str] = None, 244 ) -> bool: 245 """Check if a node is required taking into account the manifest component it is defined in 246 (requirements can be set in validaiton rule as well as required column) 247 Args: 248 manifest_component: str, manifest component display name that the node belongs to. 249 node_validation_rules: list[str], valdation rules for a given node and component. 250 node_label: str, Label of the node you would want to get the comment for. 251 node_display_name: str, node display name for the node being queried. 252 Returns: 253 True, if node is required, False if not 254 """ 255 node_required = False 256 257 if not node_validation_rules: 258 # Get node validation rules for a given component 259 node_validation_rules = self.get_component_node_validation_rules( 260 manifest_component=manifest_component, 261 node_label=node_label, 262 node_display_name=node_display_name, 263 ) 264 265 # Check if the valdation rule specifies that the node is required for this particular 266 # component. 267 if rule_in_rule_list("required", node_validation_rules): 268 node_required = True 269 # To prevent any unintended errors, ensure the Required field for this node is False 270 if self.get_node_required( 271 node_label=node_label, node_display_name=node_display_name 272 ): 273 if not node_display_name: 274 assert node_label is not None 275 node_display_name = self.graph.nodes[node_label][ 276 self.rel_dict["displayName"]["node_label"] 277 ] 278 error_str = " ".join( 279 [ 280 f"For component: {manifest_component} and attribute: {node_display_name}", 281 "requirements are being specified in both the Required field and in the", 282 "Validation Rules. If you desire to use validation rules to set component", 283 "specific requirements for this attribute", 284 "then the Required field needs to be set to False, or the validation may", 285 "not work as intended, for other components where the attribute", 286 "that should not be required.", 287 ] 288 ) 289 290 logger.error(error_str) 291 else: 292 # If requirements are not being set in the validaiton rule, then just pull the 293 # standard node requirements from the model 294 node_required = self.get_node_required( 295 node_label=node_label, node_display_name=node_display_name 296 ) 297 return node_required 298 299 def get_component_node_validation_rules( 300 self, 301 manifest_component: str, 302 node_label: Optional[str] = None, 303 node_display_name: Optional[str] = None, 304 ) -> list: 305 """Get valdation rules for a given node and component. 306 Args: 307 manifest_component: str, manifest component display name that the node belongs to. 308 node_label: str, Label of the node you would want to get the comment for. 309 node_display_name: str, node display name for the node being queried. 310 Returns: 311 validation_rules: list, validation rules list for a given node and component. 312 """ 313 # get any additional validation rules associated with this node (e.g. can this node 314 # be mapped to a list of other nodes) 315 node_validation_rules = self.get_node_validation_rules( 316 node_label=node_label, node_display_name=node_display_name 317 ) 318 319 # Parse the validation rules per component if applicable 320 if node_validation_rules and isinstance(node_validation_rules, dict): 321 node_validation_rules_list = extract_component_validation_rules( 322 manifest_component=manifest_component, 323 validation_rules_dict=node_validation_rules, # type: ignore 324 ) 325 else: 326 assert isinstance(node_validation_rules, list) 327 node_validation_rules_list = node_validation_rules 328 return node_validation_rules_list 329 330 def get_component_requirements( 331 self, 332 source_component: str, 333 ) -> list[str]: 334 """ 335 Get all components that are associated with a given source component and are 336 required by it. 337 338 Args: 339 source_component: source component for which we need to find all required downstream 340 components. 341 342 Returns: 343 List of nodes that are descendants from the source component are are related to the 344 source through a specific component relationship. 345 """ 346 347 req_components = list( 348 reversed( 349 self.get_descendants_by_edge_type( 350 source_component, 351 self.rel_dict["requiresComponent"]["edge_key"], 352 ordered=True, 353 ) 354 ) 355 ) 356 357 return req_components 358 359 def get_component_requirements_graph( 360 self, 361 source_component: str, 362 ) -> nx.DiGraph: 363 """ 364 Get all components that are associated with a given source component and are required by it; 365 return the components as a dependency graph (i.e. a DAG). 366 367 Args: 368 source_component, str: source component for which we need to find all required 369 downstream components. 370 371 Returns: 372 A subgraph of the schema graph induced on nodes that are descendants from the source 373 component and are related to the source through a specific component relationship. 374 """ 375 376 # get a list of required component nodes 377 req_components = self.get_component_requirements(source_component) 378 379 # get the subgraph induced on required component nodes 380 req_components_graph = self.get_subgraph_by_edge_type( 381 self.rel_dict["requiresComponent"]["edge_key"], 382 ).subgraph(req_components) 383 384 return req_components_graph 385 386 def get_descendants_by_edge_type( 387 self, 388 source_node: str, 389 relationship: str, 390 connected: bool = True, 391 ordered: bool = False, 392 ) -> list[str]: 393 """ 394 Get all nodes that are descendants of a given source node, based on a specific 395 type of edge / relationship type. 396 397 Args: 398 source_node: The node whose descendants need to be retreived. 399 relationship: Edge / link relationship type with possible values same as in above docs. 400 connected: 401 If True, we need to ensure that all descendant nodes are reachable from the source 402 node, i.e., they are part of the same connected component. 403 If False, the descendants could be in multiple connected components. 404 Default value is True. 405 ordered: 406 If True, the list of descendants will be topologically ordered. 407 If False, the list has no particular order (depends on the order in which the 408 descendats were traversed in the subgraph). 409 410 Returns: 411 List of nodes that are descendants from a particular node (sorted / unsorted) 412 """ 413 414 root_descendants = nx.descendants(self.graph, source_node) 415 416 subgraph_nodes = list(root_descendants) 417 subgraph_nodes.append(source_node) 418 descendants_subgraph = self.graph.subgraph(subgraph_nodes) 419 420 # prune the descendants subgraph so as to include only those edges that match 421 # the relationship type 422 rel_edges = [] 423 for node_1, node_2, key, _ in descendants_subgraph.edges(data=True, keys=True): 424 if key == relationship: 425 rel_edges.append((node_1, node_2)) 426 427 relationship_subgraph = nx.DiGraph() 428 relationship_subgraph.add_edges_from(rel_edges) 429 430 descendants = relationship_subgraph.nodes() 431 432 if not descendants: 433 # return empty list if there are no nodes that are reachable from the 434 # source node based on this relationship type 435 return [] 436 437 if connected and ordered: 438 # get the set of reachable nodes from the source node 439 descendants = nx.descendants(relationship_subgraph, source_node) 440 descendants.add(source_node) 441 442 # normally, the descendants from a node are unordered (peculiarity 443 # of nx descendants call) 444 # form the subgraph on descendants and order it topologically 445 # this assumes an acyclic subgraph 446 descendants = nx.topological_sort( 447 relationship_subgraph.subgraph(descendants) 448 ) 449 elif connected: 450 # get the nodes that are reachable from a given source node 451 # after the pruning process above some nodes in the 452 # root_descendants subgraph might have become disconnected and 453 # will be omitted 454 descendants = nx.descendants(relationship_subgraph, source_node) 455 descendants.add(source_node) 456 elif ordered: 457 # sort the nodes topologically 458 # this requires the graph to be an acyclic graph 459 descendants = nx.topological_sort(relationship_subgraph) 460 461 return list(descendants) 462 463 def get_digraph_by_edge_type(self, edge_type: str) -> nx.DiGraph: 464 """Get a networkx digraph of the nodes connected via a given edge_type. 465 Args: 466 edge_type: 467 Edge type to search for, possible types are defined by 'edge_key' 468 in relationship class 469 Returns: 470 """ 471 digraph = nx.DiGraph() 472 for node_1, node_2, key, _ in self.graph.edges(data=True, keys=True): 473 if key == edge_type: 474 digraph.add_edge(node_1, node_2) 475 return digraph 476 477 def get_edges_by_relationship( 478 self, 479 node: str, 480 relationship: str, 481 ) -> list[tuple[str, str]]: 482 """Get a list of out-edges of a node where the edges match a specifc type of relationship. 483 484 i.e., the edges connecting a node to its neighbors are of relationship type -- "parentOf" 485 (set of edges to children / sub-class nodes). 486 487 Args: 488 node: the node whose edges we need to look at. 489 relationship: the type of link(s) that the above node and its immediate neighbors share. 490 491 Returns: 492 List of edges that are connected to the node. 493 """ 494 edges: list[tuple[str, str]] = [] 495 496 for node_1, node_2, key, _ in self.graph.out_edges(node, data=True, keys=True): 497 if key == relationship: 498 edges.append((node_1, node_2)) 499 500 return edges 501 502 def get_ordered_entry(self, key: str, source_node_label: str) -> list[str]: 503 """ 504 Order the values associated with a particular node and edge_key to 505 match original ordering in schema. 506 507 Args: 508 key (str): a key representing and edge relationship in 509 DataModelRelationships.relationships_dictionary 510 source_node_label (str): node to look for edges of and order 511 512 Raises: 513 KeyError: cannot find source node in graph 514 515 Returns: 516 list[str]: 517 list of sorted nodes, that share the specified relationship with the source node 518 For the example data model, for key='rangeIncludes', source_node_label='CancerType' 519 the return would be ['Breast, 'Colorectal', 'Lung', 'Prostate', 'Skin'] in that 520 exact order. 521 """ 522 # Check if node is in the graph, if not throw an error. 523 if not self.is_class_in_schema(node_label=source_node_label): 524 raise KeyError( 525 f"Cannot find node: {source_node_label} in the graph, please check entry." 526 ) 527 528 edge_key = self.rel_dict[key]["edge_key"] 529 530 # Handle out edges 531 if self.rel_dict[key]["jsonld_direction"] == "out": 532 # use outedges 533 534 original_edge_weights_dict = { 535 attached_node: self.graph[source_node][attached_node][edge_key][ 536 "weight" 537 ] 538 for source_node, attached_node in self.graph.out_edges( 539 source_node_label 540 ) 541 if edge_key in self.graph[source_node][attached_node] 542 } 543 # Handle in edges 544 else: 545 # use inedges 546 original_edge_weights_dict = { 547 attached_node: self.graph[attached_node][source_node][edge_key][ 548 "weight" 549 ] 550 for attached_node, source_node in self.graph.in_edges(source_node_label) 551 if edge_key in self.graph[attached_node][source_node] 552 } 553 554 sorted_nodes = list( 555 dict( 556 sorted(original_edge_weights_dict.items(), key=lambda item: item[1]) 557 ).keys() 558 ) 559 560 return sorted_nodes 561 562 # Get values associated with a node 563 def get_nodes_ancestors(self, subgraph: nx.DiGraph, node_label: str) -> list[str]: 564 """Get a list of nodes reachable from source component in graph 565 566 Args: 567 subgraph (nx.DiGraph): networkx graph object 568 node_label (str): label of node to find ancestors for 569 570 Returns: 571 list[str]: nodes reachable from source in graph 572 """ 573 all_ancestors = list(nx.ancestors(subgraph, node_label)) 574 575 return all_ancestors 576 577 def get_node_comment( 578 self, node_display_name: Optional[str] = None, node_label: Optional[str] = None 579 ) -> str: 580 """Get the node definition, i.e., the "comment" associated with a given node display name. 581 582 Args: 583 node_display_name, str: Display name of the node which you want to get the comment for. 584 node_label, str: Label of the node you would want to get the comment for. 585 Returns: 586 Comment associated with node, as a string. 587 """ 588 if not node_label: 589 assert node_display_name is not None 590 node_label = self.get_node_label(node_display_name) 591 592 if not node_label: 593 return "" 594 595 node_definition = self.graph.nodes[node_label][ 596 self.rel_dict["comment"]["node_label"] 597 ] 598 return node_definition 599 600 def get_node_dependencies( 601 self, 602 source_node: str, 603 display_names: bool = True, 604 schema_ordered: bool = True, 605 ) -> list[str]: 606 """Get the immediate dependencies that are related to a given source node. 607 608 Args: 609 source_node: The node whose dependencies we need to compute. 610 display_names: if True, return list of display names of each of the dependencies. 611 if False, return list of node labels of each of the dependencies. 612 schema_ordered: 613 if True, return the dependencies of the node following the order of the schema 614 (slower). 615 if False, return dependencies from graph without guaranteeing schema order (faster) 616 617 Returns: 618 List of nodes that are dependent on the source node. 619 """ 620 621 if schema_ordered: 622 # get dependencies in the same order in which they are defined in the schema 623 required_dependencies = self.get_ordered_entry( 624 key=self.rel_dict["requiresDependency"]["edge_key"], 625 source_node_label=source_node, 626 ) 627 else: 628 required_dependencies = self.get_adjacent_nodes_by_relationship( 629 node_label=source_node, 630 relationship=self.rel_dict["requiresDependency"]["edge_key"], 631 ) 632 633 if display_names: 634 # get display names of dependencies 635 dependencies_display_names = [] 636 637 for req in required_dependencies: 638 dependencies_display_names.append( 639 self.graph.nodes[req][self.rel_dict["displayName"]["node_label"]] 640 ) 641 642 return dependencies_display_names 643 644 return required_dependencies 645 646 def get_nodes_descendants(self, node_label: str) -> list[str]: 647 """Return a list of nodes reachable from source in graph 648 Args: 649 node_label, str: any given node 650 Return: 651 all_descendants, list: nodes reachable from source in graph 652 """ 653 all_descendants = list(nx.descendants(self.graph, node_label)) 654 655 return all_descendants 656 657 def get_nodes_display_names( 658 self, 659 node_list: list[str], 660 ) -> list[str]: 661 """Get display names associated with the given list of nodes. 662 663 Args: 664 node_list: List of nodes whose display names we need to retrieve. 665 666 Returns: 667 List of display names. 668 """ 669 node_list_display_names = [ 670 self.graph.nodes[node][self.rel_dict["displayName"]["node_label"]] 671 for node in node_list 672 ] 673 674 return node_list_display_names 675 676 def get_node_label(self, node_display_name: str) -> str: 677 """Get the node label for a given display name. 678 679 Args: 680 node_display_name: Display name of the node which you want to get the label for. 681 Returns: 682 Node label associated with given node. 683 If display name not part of schema, return an empty string. 684 """ 685 686 node_class_label = get_class_label_from_display_name( 687 display_name=node_display_name 688 ) 689 node_property_label = get_property_label_from_display_name( 690 display_name=node_display_name 691 ) 692 693 if node_class_label in self.graph.nodes: 694 node_label = node_class_label 695 elif node_property_label in self.graph.nodes: 696 node_label = node_property_label 697 else: 698 node_label = "" 699 700 return node_label 701 702 def get_node_range( 703 self, 704 node_label: Optional[str] = None, 705 node_display_name: Optional[str] = None, 706 display_names: bool = False, 707 ) -> list[str]: 708 """ 709 Get the range, i.e., all the valid values that are associated with a node label. 710 711 712 Args: 713 node_label (Optional[str], optional): Node for which you need to retrieve the range. 714 Defaults to None. 715 node_display_name (Optional[str], optional): _description_. Defaults to None. 716 display_names (bool, optional): _description_. Defaults to False. 717 718 Raises: 719 ValueError: If the node cannot be found in the graph. 720 721 Returns: 722 list[str]: 723 If display_names=False, a list of valid values (labels) associated with a given node. 724 If display_names=True, a list of valid values (display names) associated 725 with a given node 726 """ 727 if not node_label: 728 assert node_display_name is not None 729 node_label = self.get_node_label(node_display_name) 730 731 try: 732 # get node range in the order defined in schema for given node 733 required_range = self.find_node_range(node_label=node_label) 734 except KeyError as exc: 735 raise ValueError( 736 f"The source node {node_label} does not exist in the graph. " 737 "Please use a different node." 738 ) from exc 739 740 if display_names: 741 # get the display name(s) of all dependencies 742 dependencies_display_names = [] 743 744 for req in required_range: 745 dependencies_display_names.append(self.graph.nodes[req]["displayName"]) 746 747 return dependencies_display_names 748 749 return required_range 750 751 def get_node_required( 752 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 753 ) -> bool: 754 """Check if a given node is required or not. 755 756 Note: The possible options that a node can be associated with -- "required" / "optional". 757 758 Args: 759 node_label: Label of the node for which you need to look up. 760 node_display_name: Display name of the node for which you want look up. 761 Returns: 762 True: If the given node is a "required" node. 763 False: If the given node is not a "required" (i.e., an "optional") node. 764 """ 765 if not node_label: 766 assert node_display_name is not None 767 node_label = self.get_node_label(node_display_name) 768 769 rel_node_label = self.rel_dict["required"]["node_label"] 770 node_required = self.graph.nodes[node_label][rel_node_label] 771 return node_required 772 773 def get_node_validation_rules( 774 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 775 ) -> Union[list, dict[str, str]]: 776 """Get validation rules associated with a node, 777 778 Args: 779 node_label: Label of the node for which you need to look up. 780 node_display_name: Display name of the node which you want to get the label for. 781 Returns: 782 A set of validation rules associated with node, as a list or a dictionary. 783 """ 784 if not node_label: 785 if node_display_name is None: 786 raise ValueError( 787 "Either node_label or node_display_name must be provided." 788 ) 789 790 # try search node label using display name 791 node_label = self.get_node_label(node_display_name) 792 793 if not node_label: 794 return [] 795 796 try: 797 node_validation_rules = self.graph.nodes[node_label]["validationRules"] 798 except KeyError as key_error: 799 raise ValueError( 800 f"{node_label} is not in the graph, please provide a proper node label" 801 ) from key_error 802 803 return node_validation_rules 804 805 def get_subgraph_by_edge_type(self, relationship: str) -> nx.DiGraph: 806 """Get a subgraph containing all edges of a given type (aka relationship). 807 808 Args: 809 relationship: edge / link relationship type with possible values same as in above docs. 810 811 Returns: 812 Directed graph on edges of a particular type (aka relationship) 813 """ 814 815 # prune the metadata model graph so as to include only those edges that 816 # match the relationship type 817 rel_edges = [] 818 for node_1, node_2, key, _ in self.graph.out_edges(data=True, keys=True): 819 if key == relationship: 820 rel_edges.append((node_1, node_2)) 821 822 relationship_subgraph = nx.DiGraph() 823 relationship_subgraph.add_edges_from(rel_edges) 824 825 return relationship_subgraph 826 827 def find_adjacent_child_classes( 828 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 829 ) -> list[str]: 830 """Find child classes of a given node. 831 Args: 832 node_display_name: Display name of the node to look up. 833 node_label: Label of the node to look up. 834 Returns: 835 List of nodes that are adjacent to the given node, by SubclassOf relationship. 836 """ 837 if not node_label: 838 assert node_display_name is not None 839 node_label = self.get_node_label(node_display_name) 840 841 return self.get_adjacent_nodes_by_relationship( 842 node_label=node_label, relationship=self.rel_dict["subClassOf"]["edge_key"] 843 ) 844 845 def find_child_classes(self, schema_class: str) -> list: 846 """Find schema classes that inherit from the given class 847 Args: 848 schema_class: node label for the class to from which to look for children. 849 Returns: 850 list of children to the schema_class. 851 """ 852 child_classes = unlist(list(self.graph.successors(schema_class))) 853 assert isinstance(child_classes, list) 854 return child_classes 855 856 def find_class_specific_properties(self, schema_class: str) -> list[str]: 857 """Find properties specifically associated with a given class 858 Args: 859 schema_class, str: node/class label, to identify properties for. 860 Returns: 861 properties, list: List of properties associate with a given schema class. 862 Raises: 863 KeyError: Key error is raised if the provded schema_class is not in the graph 864 """ 865 866 if not self.is_class_in_schema(schema_class): 867 raise KeyError( 868 ( 869 f"Schema_class provided: {schema_class} is not in the data model, please check " 870 "that you are providing the proper class/node label" 871 ) 872 ) 873 874 properties = [] 875 for node1, node2 in self.graph.edges(): 876 if ( 877 node2 == schema_class 878 and "domainValue" in self.graph[node1][schema_class] 879 ): 880 properties.append(node1) 881 return properties 882 883 def find_parent_classes(self, node_label: str) -> list[list[str]]: 884 """Find all parents of the provided node 885 Args: 886 node_label: label of the node to find parents of 887 Returns: 888 List of list of Parents to the given node. 889 """ 890 # Get digraph of nodes with parents 891 digraph = self.get_digraph_by_edge_type("parentOf") 892 893 # Get root node 894 root_node = list(nx.topological_sort(digraph))[0] 895 896 # Get paths between root_node and the target node. 897 paths = nx.all_simple_paths(self.graph, source=root_node, target=node_label) 898 899 return [_path[:-1] for _path in paths] 900 901 def full_schema_graph(self, size: Optional[int] = None) -> graphviz.Digraph: 902 """Create a graph of the data model. 903 Args: 904 size, float: max height and width of the graph, if one value provided 905 it is used for both. 906 Returns: 907 schema graph viz 908 """ 909 edges = self.graph.edges() 910 return visualize(edges, size=size) 911 912 def is_class_in_schema(self, node_label: str) -> bool: 913 """Determine if provided node_label is in the schema graph/data model. 914 Args: 915 node_label: label of node to search for in the 916 Returns: 917 True, if node is in the graph schema 918 False, if node is not in graph schema 919 """ 920 return node_label in self.graph.nodes() 921 922 def sub_schema_graph( 923 self, source: str, direction: str, size: Optional[float] = None 924 ) -> Optional[graphviz.Digraph]: 925 """Create a sub-schema graph 926 Args: 927 source, str: source node label to start graph 928 direction, str: direction to create the vizualization, choose from "up", "down", "both" 929 size, float: max height and width of the graph, if one value provided it is used for 930 both. 931 Returns: 932 Sub-schema graph viz 933 """ 934 if direction == "down": 935 edges = list(nx.edge_bfs(self.graph, [source])) 936 return visualize(edges, size=size) 937 if direction == "up": 938 paths = self.find_parent_classes(source) 939 edges = [] 940 for _path in paths: 941 _path.append(source) 942 for i in range(0, len(_path) - 1): 943 edges.append((_path[i], _path[i + 1])) 944 return visualize(edges, size=size) 945 if direction == "both": 946 paths = self.find_parent_classes(source) 947 edges = list(nx.edge_bfs(self.graph, [source])) 948 for _path in paths: 949 _path.append(source) 950 for i in range(0, len(_path) - 1): 951 edges.append((_path[i], _path[i + 1])) 952 return visualize(edges, size=size) 953 return None
31class DataModelGraphMeta: # pylint: disable=too-few-public-methods 32 """DataModelGraphMeta""" 33 34 _instances: dict = {} 35 36 def __call__( # pylint: disable=no-self-argument 37 cls, *args: Any, **kwargs: Any 38 ) -> Any: 39 """ 40 Possible changes to the value of the `__init__` argument do not affect 41 the returned instance. 42 """ 43 if cls not in cls._instances: 44 instance = super().__call__(*args, **kwargs) # type: ignore # pylint: disable=no-member 45 cls._instances[cls] = instance 46 return cls._instances[cls]
DataModelGraphMeta
49class DataModelGraph: # pylint: disable=too-few-public-methods 50 """ 51 Generate graph network (networkx) from the attributes and relationships returned 52 from the data model parser. 53 54 Create a singleton. 55 """ 56 57 __metaclass__ = DataModelGraphMeta 58 59 def __init__( 60 self, 61 attribute_relationships_dict: dict, 62 data_model_labels: DisplayLabelType = "class_label", 63 ) -> None: 64 """Load parsed data model. 65 Args: 66 attributes_relationship_dict, dict: generated in data_model_parser 67 {Attribute Display Name: { 68 Relationships: { 69 CSV Header: Value}}} 70 data_model_labels: str, display_label or class_label. 71 display_label, use the display name as a label, if it is valid 72 (contains no blacklisted characters) otherwise will default to schema_label. 73 class_label, default, use standard class or property label. 74 Raises: 75 ValueError, attribute_relationship_dict not loaded. 76 """ 77 self.attribute_relationships_dict = attribute_relationships_dict 78 self.dmn = DataModelNodes(self.attribute_relationships_dict) 79 self.dme = DataModelEdges() 80 self.dmr = DataModelRelationships() 81 self.data_model_labels = data_model_labels 82 83 if not self.attribute_relationships_dict: 84 raise ValueError( 85 ( 86 "Something has gone wrong, a data model was not loaded into the DataModelGraph " 87 "Class. Please check that your paths are correct" 88 ) 89 ) 90 self.graph = self.generate_data_model_graph() 91 92 @tracer.start_as_current_span("DataModelGraph::generate_data_model_graph") 93 def generate_data_model_graph(self) -> nx.MultiDiGraph: 94 """ 95 Generate NetworkX Graph from the Relationships/attributes dictionary, the graph is built 96 by first adding all nodes to the graph, then connecting nodes by the relationships defined 97 in the attributes_relationship dictionary. 98 Returns: 99 G: nx.MultiDiGraph, networkx graph representation of the data model 100 """ 101 # Get all relationships with edges 102 edge_relationships = self.dmr.retreive_rel_headers_dict(edge=True) 103 104 # Find all nodes 105 all_nodes = self.dmn.gather_all_nodes_in_model( 106 attr_rel_dict=self.attribute_relationships_dict 107 ) 108 109 # Instantiate NetworkX MultiDigraph 110 graph = nx.MultiDiGraph() 111 112 all_node_dict = {} 113 114 ## Fill in MultiDigraph with nodes 115 for node in all_nodes: 116 # Gather information for each node 117 node_dict = self.dmn.generate_node_dict( 118 node_display_name=node, 119 attr_rel_dict=self.attribute_relationships_dict, 120 data_model_labels=self.data_model_labels, 121 ) 122 123 # Add each node to the all_node_dict to be used for generating edges 124 all_node_dict[node] = node_dict 125 126 # Generate node and attach information (attributes) to each node 127 graph = self.dmn.generate_node(graph, node_dict) 128 129 edge_list: list[tuple[str, str, dict[str, Union[str, int]]]] = [] 130 ## Connect nodes via edges 131 for node in all_nodes: 132 # Generate edges 133 edge_list_2 = self.dme.generate_edge( 134 node, 135 all_node_dict, 136 self.attribute_relationships_dict, 137 edge_relationships, 138 edge_list, 139 ) 140 edge_list = edge_list_2.copy() 141 142 # Add edges to the Graph 143 for node_1, node_2, edge_dict in edge_list: 144 graph.add_edge( 145 node_1, node_2, key=edge_dict["key"], weight=edge_dict["weight"] 146 ) 147 return graph
Generate graph network (networkx) from the attributes and relationships returned from the data model parser.
Create a singleton.
59 def __init__( 60 self, 61 attribute_relationships_dict: dict, 62 data_model_labels: DisplayLabelType = "class_label", 63 ) -> None: 64 """Load parsed data model. 65 Args: 66 attributes_relationship_dict, dict: generated in data_model_parser 67 {Attribute Display Name: { 68 Relationships: { 69 CSV Header: Value}}} 70 data_model_labels: str, display_label or class_label. 71 display_label, use the display name as a label, if it is valid 72 (contains no blacklisted characters) otherwise will default to schema_label. 73 class_label, default, use standard class or property label. 74 Raises: 75 ValueError, attribute_relationship_dict not loaded. 76 """ 77 self.attribute_relationships_dict = attribute_relationships_dict 78 self.dmn = DataModelNodes(self.attribute_relationships_dict) 79 self.dme = DataModelEdges() 80 self.dmr = DataModelRelationships() 81 self.data_model_labels = data_model_labels 82 83 if not self.attribute_relationships_dict: 84 raise ValueError( 85 ( 86 "Something has gone wrong, a data model was not loaded into the DataModelGraph " 87 "Class. Please check that your paths are correct" 88 ) 89 ) 90 self.graph = self.generate_data_model_graph()
Load parsed data model.
Arguments:
- attributes_relationship_dict, dict: generated in data_model_parser {Attribute Display Name: { Relationships: { CSV Header: Value}}}
- data_model_labels: str, display_label or class_label. display_label, use the display name as a label, if it is valid (contains no blacklisted characters) otherwise will default to schema_label. class_label, default, use standard class or property label.
Raises:
- ValueError, attribute_relationship_dict not loaded.
92 @tracer.start_as_current_span("DataModelGraph::generate_data_model_graph") 93 def generate_data_model_graph(self) -> nx.MultiDiGraph: 94 """ 95 Generate NetworkX Graph from the Relationships/attributes dictionary, the graph is built 96 by first adding all nodes to the graph, then connecting nodes by the relationships defined 97 in the attributes_relationship dictionary. 98 Returns: 99 G: nx.MultiDiGraph, networkx graph representation of the data model 100 """ 101 # Get all relationships with edges 102 edge_relationships = self.dmr.retreive_rel_headers_dict(edge=True) 103 104 # Find all nodes 105 all_nodes = self.dmn.gather_all_nodes_in_model( 106 attr_rel_dict=self.attribute_relationships_dict 107 ) 108 109 # Instantiate NetworkX MultiDigraph 110 graph = nx.MultiDiGraph() 111 112 all_node_dict = {} 113 114 ## Fill in MultiDigraph with nodes 115 for node in all_nodes: 116 # Gather information for each node 117 node_dict = self.dmn.generate_node_dict( 118 node_display_name=node, 119 attr_rel_dict=self.attribute_relationships_dict, 120 data_model_labels=self.data_model_labels, 121 ) 122 123 # Add each node to the all_node_dict to be used for generating edges 124 all_node_dict[node] = node_dict 125 126 # Generate node and attach information (attributes) to each node 127 graph = self.dmn.generate_node(graph, node_dict) 128 129 edge_list: list[tuple[str, str, dict[str, Union[str, int]]]] = [] 130 ## Connect nodes via edges 131 for node in all_nodes: 132 # Generate edges 133 edge_list_2 = self.dme.generate_edge( 134 node, 135 all_node_dict, 136 self.attribute_relationships_dict, 137 edge_relationships, 138 edge_list, 139 ) 140 edge_list = edge_list_2.copy() 141 142 # Add edges to the Graph 143 for node_1, node_2, edge_dict in edge_list: 144 graph.add_edge( 145 node_1, node_2, key=edge_dict["key"], weight=edge_dict["weight"] 146 ) 147 return graph
Generate NetworkX Graph from the Relationships/attributes dictionary, the graph is built by first adding all nodes to the graph, then connecting nodes by the relationships defined in the attributes_relationship dictionary.
Returns:
G: nx.MultiDiGraph, networkx graph representation of the data model
150class DataModelGraphExplorer: # pylint: disable=too-many-public-methods 151 """DataModelGraphExplorer""" 152 153 def __init__( 154 self, 155 graph: nx.MultiDiGraph, 156 ): 157 """Load data model graph as a singleton. 158 Args: 159 G: nx.MultiDiGraph, networkx graph representation of the data model 160 """ 161 self.graph = graph # At this point the graph is expected to be fully formed. 162 self.dmr = DataModelRelationships() 163 self.rel_dict = self.dmr.relationships_dictionary 164 165 def find_properties(self) -> set[str]: 166 """ 167 Identify all properties, as defined by the first node in a pair, connected with 168 'domainIncludes' edge type 169 170 Returns: 171 properties, set: All properties defined in the data model, each property name 172 is defined by its label. 173 """ 174 properties_list: list[str] = [] 175 for node_1, _, rel in self.graph.edges: 176 if rel == self.rel_dict["domainIncludes"]["edge_key"]: 177 properties_list.append(node_1) 178 properties_set = set(properties_list) 179 return properties_set 180 181 def find_classes(self) -> set[str]: 182 """ 183 Identify all classes, as defined but all nodes, minus all properties 184 (which are explicitly defined) 185 Returns: 186 classes, set: All classes defined in the data model, each class 187 name is defined by its label. 188 """ 189 nodes = self.graph.nodes 190 properties = self.find_properties() 191 classes = nodes - properties 192 return classes 193 194 def find_node_range( 195 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 196 ) -> list: 197 """Get valid values for the given node (attribute) 198 Args: 199 node_label, str, Optional[str]: label of the node for which to retrieve valid values 200 node_display_name, str, Optional[str]: Display Name of the node for which to 201 retrieve valid values 202 Returns: 203 valid_values, list: List of valid values associated with the provided node. 204 """ 205 if not node_label: 206 assert node_display_name is not None 207 node_label = self.get_node_label(node_display_name) 208 209 valid_values = [] 210 for node_1, node_2, rel in self.graph.edges: 211 if ( 212 node_1 == node_label 213 and rel == self.rel_dict["rangeIncludes"]["edge_key"] 214 ): 215 valid_values.append(node_2) 216 valid_values = list(set(valid_values)) 217 return valid_values 218 219 def get_adjacent_nodes_by_relationship( 220 self, node_label: str, relationship: str 221 ) -> list[str]: 222 """Get a list of nodes that is / are adjacent to a given node, based on a relationship type. 223 224 Args: 225 node_label: label of the the node whose edges we need to look at. 226 relationship: the type of link(s) that the above node and its immediate neighbors share. 227 228 Returns: 229 List of nodes that are adjacent to the given node. 230 #checked 231 """ 232 nodes = set() 233 for _, node_2, key, _ in self.graph.out_edges(node_label, data=True, keys=True): 234 if key == relationship: 235 nodes.add(node_2) 236 237 return list(nodes) 238 239 def get_component_node_required( 240 self, 241 manifest_component: str, 242 node_validation_rules: Optional[list[str]] = None, 243 node_label: Optional[str] = None, 244 node_display_name: Optional[str] = None, 245 ) -> bool: 246 """Check if a node is required taking into account the manifest component it is defined in 247 (requirements can be set in validaiton rule as well as required column) 248 Args: 249 manifest_component: str, manifest component display name that the node belongs to. 250 node_validation_rules: list[str], valdation rules for a given node and component. 251 node_label: str, Label of the node you would want to get the comment for. 252 node_display_name: str, node display name for the node being queried. 253 Returns: 254 True, if node is required, False if not 255 """ 256 node_required = False 257 258 if not node_validation_rules: 259 # Get node validation rules for a given component 260 node_validation_rules = self.get_component_node_validation_rules( 261 manifest_component=manifest_component, 262 node_label=node_label, 263 node_display_name=node_display_name, 264 ) 265 266 # Check if the valdation rule specifies that the node is required for this particular 267 # component. 268 if rule_in_rule_list("required", node_validation_rules): 269 node_required = True 270 # To prevent any unintended errors, ensure the Required field for this node is False 271 if self.get_node_required( 272 node_label=node_label, node_display_name=node_display_name 273 ): 274 if not node_display_name: 275 assert node_label is not None 276 node_display_name = self.graph.nodes[node_label][ 277 self.rel_dict["displayName"]["node_label"] 278 ] 279 error_str = " ".join( 280 [ 281 f"For component: {manifest_component} and attribute: {node_display_name}", 282 "requirements are being specified in both the Required field and in the", 283 "Validation Rules. If you desire to use validation rules to set component", 284 "specific requirements for this attribute", 285 "then the Required field needs to be set to False, or the validation may", 286 "not work as intended, for other components where the attribute", 287 "that should not be required.", 288 ] 289 ) 290 291 logger.error(error_str) 292 else: 293 # If requirements are not being set in the validaiton rule, then just pull the 294 # standard node requirements from the model 295 node_required = self.get_node_required( 296 node_label=node_label, node_display_name=node_display_name 297 ) 298 return node_required 299 300 def get_component_node_validation_rules( 301 self, 302 manifest_component: str, 303 node_label: Optional[str] = None, 304 node_display_name: Optional[str] = None, 305 ) -> list: 306 """Get valdation rules for a given node and component. 307 Args: 308 manifest_component: str, manifest component display name that the node belongs to. 309 node_label: str, Label of the node you would want to get the comment for. 310 node_display_name: str, node display name for the node being queried. 311 Returns: 312 validation_rules: list, validation rules list for a given node and component. 313 """ 314 # get any additional validation rules associated with this node (e.g. can this node 315 # be mapped to a list of other nodes) 316 node_validation_rules = self.get_node_validation_rules( 317 node_label=node_label, node_display_name=node_display_name 318 ) 319 320 # Parse the validation rules per component if applicable 321 if node_validation_rules and isinstance(node_validation_rules, dict): 322 node_validation_rules_list = extract_component_validation_rules( 323 manifest_component=manifest_component, 324 validation_rules_dict=node_validation_rules, # type: ignore 325 ) 326 else: 327 assert isinstance(node_validation_rules, list) 328 node_validation_rules_list = node_validation_rules 329 return node_validation_rules_list 330 331 def get_component_requirements( 332 self, 333 source_component: str, 334 ) -> list[str]: 335 """ 336 Get all components that are associated with a given source component and are 337 required by it. 338 339 Args: 340 source_component: source component for which we need to find all required downstream 341 components. 342 343 Returns: 344 List of nodes that are descendants from the source component are are related to the 345 source through a specific component relationship. 346 """ 347 348 req_components = list( 349 reversed( 350 self.get_descendants_by_edge_type( 351 source_component, 352 self.rel_dict["requiresComponent"]["edge_key"], 353 ordered=True, 354 ) 355 ) 356 ) 357 358 return req_components 359 360 def get_component_requirements_graph( 361 self, 362 source_component: str, 363 ) -> nx.DiGraph: 364 """ 365 Get all components that are associated with a given source component and are required by it; 366 return the components as a dependency graph (i.e. a DAG). 367 368 Args: 369 source_component, str: source component for which we need to find all required 370 downstream components. 371 372 Returns: 373 A subgraph of the schema graph induced on nodes that are descendants from the source 374 component and are related to the source through a specific component relationship. 375 """ 376 377 # get a list of required component nodes 378 req_components = self.get_component_requirements(source_component) 379 380 # get the subgraph induced on required component nodes 381 req_components_graph = self.get_subgraph_by_edge_type( 382 self.rel_dict["requiresComponent"]["edge_key"], 383 ).subgraph(req_components) 384 385 return req_components_graph 386 387 def get_descendants_by_edge_type( 388 self, 389 source_node: str, 390 relationship: str, 391 connected: bool = True, 392 ordered: bool = False, 393 ) -> list[str]: 394 """ 395 Get all nodes that are descendants of a given source node, based on a specific 396 type of edge / relationship type. 397 398 Args: 399 source_node: The node whose descendants need to be retreived. 400 relationship: Edge / link relationship type with possible values same as in above docs. 401 connected: 402 If True, we need to ensure that all descendant nodes are reachable from the source 403 node, i.e., they are part of the same connected component. 404 If False, the descendants could be in multiple connected components. 405 Default value is True. 406 ordered: 407 If True, the list of descendants will be topologically ordered. 408 If False, the list has no particular order (depends on the order in which the 409 descendats were traversed in the subgraph). 410 411 Returns: 412 List of nodes that are descendants from a particular node (sorted / unsorted) 413 """ 414 415 root_descendants = nx.descendants(self.graph, source_node) 416 417 subgraph_nodes = list(root_descendants) 418 subgraph_nodes.append(source_node) 419 descendants_subgraph = self.graph.subgraph(subgraph_nodes) 420 421 # prune the descendants subgraph so as to include only those edges that match 422 # the relationship type 423 rel_edges = [] 424 for node_1, node_2, key, _ in descendants_subgraph.edges(data=True, keys=True): 425 if key == relationship: 426 rel_edges.append((node_1, node_2)) 427 428 relationship_subgraph = nx.DiGraph() 429 relationship_subgraph.add_edges_from(rel_edges) 430 431 descendants = relationship_subgraph.nodes() 432 433 if not descendants: 434 # return empty list if there are no nodes that are reachable from the 435 # source node based on this relationship type 436 return [] 437 438 if connected and ordered: 439 # get the set of reachable nodes from the source node 440 descendants = nx.descendants(relationship_subgraph, source_node) 441 descendants.add(source_node) 442 443 # normally, the descendants from a node are unordered (peculiarity 444 # of nx descendants call) 445 # form the subgraph on descendants and order it topologically 446 # this assumes an acyclic subgraph 447 descendants = nx.topological_sort( 448 relationship_subgraph.subgraph(descendants) 449 ) 450 elif connected: 451 # get the nodes that are reachable from a given source node 452 # after the pruning process above some nodes in the 453 # root_descendants subgraph might have become disconnected and 454 # will be omitted 455 descendants = nx.descendants(relationship_subgraph, source_node) 456 descendants.add(source_node) 457 elif ordered: 458 # sort the nodes topologically 459 # this requires the graph to be an acyclic graph 460 descendants = nx.topological_sort(relationship_subgraph) 461 462 return list(descendants) 463 464 def get_digraph_by_edge_type(self, edge_type: str) -> nx.DiGraph: 465 """Get a networkx digraph of the nodes connected via a given edge_type. 466 Args: 467 edge_type: 468 Edge type to search for, possible types are defined by 'edge_key' 469 in relationship class 470 Returns: 471 """ 472 digraph = nx.DiGraph() 473 for node_1, node_2, key, _ in self.graph.edges(data=True, keys=True): 474 if key == edge_type: 475 digraph.add_edge(node_1, node_2) 476 return digraph 477 478 def get_edges_by_relationship( 479 self, 480 node: str, 481 relationship: str, 482 ) -> list[tuple[str, str]]: 483 """Get a list of out-edges of a node where the edges match a specifc type of relationship. 484 485 i.e., the edges connecting a node to its neighbors are of relationship type -- "parentOf" 486 (set of edges to children / sub-class nodes). 487 488 Args: 489 node: the node whose edges we need to look at. 490 relationship: the type of link(s) that the above node and its immediate neighbors share. 491 492 Returns: 493 List of edges that are connected to the node. 494 """ 495 edges: list[tuple[str, str]] = [] 496 497 for node_1, node_2, key, _ in self.graph.out_edges(node, data=True, keys=True): 498 if key == relationship: 499 edges.append((node_1, node_2)) 500 501 return edges 502 503 def get_ordered_entry(self, key: str, source_node_label: str) -> list[str]: 504 """ 505 Order the values associated with a particular node and edge_key to 506 match original ordering in schema. 507 508 Args: 509 key (str): a key representing and edge relationship in 510 DataModelRelationships.relationships_dictionary 511 source_node_label (str): node to look for edges of and order 512 513 Raises: 514 KeyError: cannot find source node in graph 515 516 Returns: 517 list[str]: 518 list of sorted nodes, that share the specified relationship with the source node 519 For the example data model, for key='rangeIncludes', source_node_label='CancerType' 520 the return would be ['Breast, 'Colorectal', 'Lung', 'Prostate', 'Skin'] in that 521 exact order. 522 """ 523 # Check if node is in the graph, if not throw an error. 524 if not self.is_class_in_schema(node_label=source_node_label): 525 raise KeyError( 526 f"Cannot find node: {source_node_label} in the graph, please check entry." 527 ) 528 529 edge_key = self.rel_dict[key]["edge_key"] 530 531 # Handle out edges 532 if self.rel_dict[key]["jsonld_direction"] == "out": 533 # use outedges 534 535 original_edge_weights_dict = { 536 attached_node: self.graph[source_node][attached_node][edge_key][ 537 "weight" 538 ] 539 for source_node, attached_node in self.graph.out_edges( 540 source_node_label 541 ) 542 if edge_key in self.graph[source_node][attached_node] 543 } 544 # Handle in edges 545 else: 546 # use inedges 547 original_edge_weights_dict = { 548 attached_node: self.graph[attached_node][source_node][edge_key][ 549 "weight" 550 ] 551 for attached_node, source_node in self.graph.in_edges(source_node_label) 552 if edge_key in self.graph[attached_node][source_node] 553 } 554 555 sorted_nodes = list( 556 dict( 557 sorted(original_edge_weights_dict.items(), key=lambda item: item[1]) 558 ).keys() 559 ) 560 561 return sorted_nodes 562 563 # Get values associated with a node 564 def get_nodes_ancestors(self, subgraph: nx.DiGraph, node_label: str) -> list[str]: 565 """Get a list of nodes reachable from source component in graph 566 567 Args: 568 subgraph (nx.DiGraph): networkx graph object 569 node_label (str): label of node to find ancestors for 570 571 Returns: 572 list[str]: nodes reachable from source in graph 573 """ 574 all_ancestors = list(nx.ancestors(subgraph, node_label)) 575 576 return all_ancestors 577 578 def get_node_comment( 579 self, node_display_name: Optional[str] = None, node_label: Optional[str] = None 580 ) -> str: 581 """Get the node definition, i.e., the "comment" associated with a given node display name. 582 583 Args: 584 node_display_name, str: Display name of the node which you want to get the comment for. 585 node_label, str: Label of the node you would want to get the comment for. 586 Returns: 587 Comment associated with node, as a string. 588 """ 589 if not node_label: 590 assert node_display_name is not None 591 node_label = self.get_node_label(node_display_name) 592 593 if not node_label: 594 return "" 595 596 node_definition = self.graph.nodes[node_label][ 597 self.rel_dict["comment"]["node_label"] 598 ] 599 return node_definition 600 601 def get_node_dependencies( 602 self, 603 source_node: str, 604 display_names: bool = True, 605 schema_ordered: bool = True, 606 ) -> list[str]: 607 """Get the immediate dependencies that are related to a given source node. 608 609 Args: 610 source_node: The node whose dependencies we need to compute. 611 display_names: if True, return list of display names of each of the dependencies. 612 if False, return list of node labels of each of the dependencies. 613 schema_ordered: 614 if True, return the dependencies of the node following the order of the schema 615 (slower). 616 if False, return dependencies from graph without guaranteeing schema order (faster) 617 618 Returns: 619 List of nodes that are dependent on the source node. 620 """ 621 622 if schema_ordered: 623 # get dependencies in the same order in which they are defined in the schema 624 required_dependencies = self.get_ordered_entry( 625 key=self.rel_dict["requiresDependency"]["edge_key"], 626 source_node_label=source_node, 627 ) 628 else: 629 required_dependencies = self.get_adjacent_nodes_by_relationship( 630 node_label=source_node, 631 relationship=self.rel_dict["requiresDependency"]["edge_key"], 632 ) 633 634 if display_names: 635 # get display names of dependencies 636 dependencies_display_names = [] 637 638 for req in required_dependencies: 639 dependencies_display_names.append( 640 self.graph.nodes[req][self.rel_dict["displayName"]["node_label"]] 641 ) 642 643 return dependencies_display_names 644 645 return required_dependencies 646 647 def get_nodes_descendants(self, node_label: str) -> list[str]: 648 """Return a list of nodes reachable from source in graph 649 Args: 650 node_label, str: any given node 651 Return: 652 all_descendants, list: nodes reachable from source in graph 653 """ 654 all_descendants = list(nx.descendants(self.graph, node_label)) 655 656 return all_descendants 657 658 def get_nodes_display_names( 659 self, 660 node_list: list[str], 661 ) -> list[str]: 662 """Get display names associated with the given list of nodes. 663 664 Args: 665 node_list: List of nodes whose display names we need to retrieve. 666 667 Returns: 668 List of display names. 669 """ 670 node_list_display_names = [ 671 self.graph.nodes[node][self.rel_dict["displayName"]["node_label"]] 672 for node in node_list 673 ] 674 675 return node_list_display_names 676 677 def get_node_label(self, node_display_name: str) -> str: 678 """Get the node label for a given display name. 679 680 Args: 681 node_display_name: Display name of the node which you want to get the label for. 682 Returns: 683 Node label associated with given node. 684 If display name not part of schema, return an empty string. 685 """ 686 687 node_class_label = get_class_label_from_display_name( 688 display_name=node_display_name 689 ) 690 node_property_label = get_property_label_from_display_name( 691 display_name=node_display_name 692 ) 693 694 if node_class_label in self.graph.nodes: 695 node_label = node_class_label 696 elif node_property_label in self.graph.nodes: 697 node_label = node_property_label 698 else: 699 node_label = "" 700 701 return node_label 702 703 def get_node_range( 704 self, 705 node_label: Optional[str] = None, 706 node_display_name: Optional[str] = None, 707 display_names: bool = False, 708 ) -> list[str]: 709 """ 710 Get the range, i.e., all the valid values that are associated with a node label. 711 712 713 Args: 714 node_label (Optional[str], optional): Node for which you need to retrieve the range. 715 Defaults to None. 716 node_display_name (Optional[str], optional): _description_. Defaults to None. 717 display_names (bool, optional): _description_. Defaults to False. 718 719 Raises: 720 ValueError: If the node cannot be found in the graph. 721 722 Returns: 723 list[str]: 724 If display_names=False, a list of valid values (labels) associated with a given node. 725 If display_names=True, a list of valid values (display names) associated 726 with a given node 727 """ 728 if not node_label: 729 assert node_display_name is not None 730 node_label = self.get_node_label(node_display_name) 731 732 try: 733 # get node range in the order defined in schema for given node 734 required_range = self.find_node_range(node_label=node_label) 735 except KeyError as exc: 736 raise ValueError( 737 f"The source node {node_label} does not exist in the graph. " 738 "Please use a different node." 739 ) from exc 740 741 if display_names: 742 # get the display name(s) of all dependencies 743 dependencies_display_names = [] 744 745 for req in required_range: 746 dependencies_display_names.append(self.graph.nodes[req]["displayName"]) 747 748 return dependencies_display_names 749 750 return required_range 751 752 def get_node_required( 753 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 754 ) -> bool: 755 """Check if a given node is required or not. 756 757 Note: The possible options that a node can be associated with -- "required" / "optional". 758 759 Args: 760 node_label: Label of the node for which you need to look up. 761 node_display_name: Display name of the node for which you want look up. 762 Returns: 763 True: If the given node is a "required" node. 764 False: If the given node is not a "required" (i.e., an "optional") node. 765 """ 766 if not node_label: 767 assert node_display_name is not None 768 node_label = self.get_node_label(node_display_name) 769 770 rel_node_label = self.rel_dict["required"]["node_label"] 771 node_required = self.graph.nodes[node_label][rel_node_label] 772 return node_required 773 774 def get_node_validation_rules( 775 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 776 ) -> Union[list, dict[str, str]]: 777 """Get validation rules associated with a node, 778 779 Args: 780 node_label: Label of the node for which you need to look up. 781 node_display_name: Display name of the node which you want to get the label for. 782 Returns: 783 A set of validation rules associated with node, as a list or a dictionary. 784 """ 785 if not node_label: 786 if node_display_name is None: 787 raise ValueError( 788 "Either node_label or node_display_name must be provided." 789 ) 790 791 # try search node label using display name 792 node_label = self.get_node_label(node_display_name) 793 794 if not node_label: 795 return [] 796 797 try: 798 node_validation_rules = self.graph.nodes[node_label]["validationRules"] 799 except KeyError as key_error: 800 raise ValueError( 801 f"{node_label} is not in the graph, please provide a proper node label" 802 ) from key_error 803 804 return node_validation_rules 805 806 def get_subgraph_by_edge_type(self, relationship: str) -> nx.DiGraph: 807 """Get a subgraph containing all edges of a given type (aka relationship). 808 809 Args: 810 relationship: edge / link relationship type with possible values same as in above docs. 811 812 Returns: 813 Directed graph on edges of a particular type (aka relationship) 814 """ 815 816 # prune the metadata model graph so as to include only those edges that 817 # match the relationship type 818 rel_edges = [] 819 for node_1, node_2, key, _ in self.graph.out_edges(data=True, keys=True): 820 if key == relationship: 821 rel_edges.append((node_1, node_2)) 822 823 relationship_subgraph = nx.DiGraph() 824 relationship_subgraph.add_edges_from(rel_edges) 825 826 return relationship_subgraph 827 828 def find_adjacent_child_classes( 829 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 830 ) -> list[str]: 831 """Find child classes of a given node. 832 Args: 833 node_display_name: Display name of the node to look up. 834 node_label: Label of the node to look up. 835 Returns: 836 List of nodes that are adjacent to the given node, by SubclassOf relationship. 837 """ 838 if not node_label: 839 assert node_display_name is not None 840 node_label = self.get_node_label(node_display_name) 841 842 return self.get_adjacent_nodes_by_relationship( 843 node_label=node_label, relationship=self.rel_dict["subClassOf"]["edge_key"] 844 ) 845 846 def find_child_classes(self, schema_class: str) -> list: 847 """Find schema classes that inherit from the given class 848 Args: 849 schema_class: node label for the class to from which to look for children. 850 Returns: 851 list of children to the schema_class. 852 """ 853 child_classes = unlist(list(self.graph.successors(schema_class))) 854 assert isinstance(child_classes, list) 855 return child_classes 856 857 def find_class_specific_properties(self, schema_class: str) -> list[str]: 858 """Find properties specifically associated with a given class 859 Args: 860 schema_class, str: node/class label, to identify properties for. 861 Returns: 862 properties, list: List of properties associate with a given schema class. 863 Raises: 864 KeyError: Key error is raised if the provded schema_class is not in the graph 865 """ 866 867 if not self.is_class_in_schema(schema_class): 868 raise KeyError( 869 ( 870 f"Schema_class provided: {schema_class} is not in the data model, please check " 871 "that you are providing the proper class/node label" 872 ) 873 ) 874 875 properties = [] 876 for node1, node2 in self.graph.edges(): 877 if ( 878 node2 == schema_class 879 and "domainValue" in self.graph[node1][schema_class] 880 ): 881 properties.append(node1) 882 return properties 883 884 def find_parent_classes(self, node_label: str) -> list[list[str]]: 885 """Find all parents of the provided node 886 Args: 887 node_label: label of the node to find parents of 888 Returns: 889 List of list of Parents to the given node. 890 """ 891 # Get digraph of nodes with parents 892 digraph = self.get_digraph_by_edge_type("parentOf") 893 894 # Get root node 895 root_node = list(nx.topological_sort(digraph))[0] 896 897 # Get paths between root_node and the target node. 898 paths = nx.all_simple_paths(self.graph, source=root_node, target=node_label) 899 900 return [_path[:-1] for _path in paths] 901 902 def full_schema_graph(self, size: Optional[int] = None) -> graphviz.Digraph: 903 """Create a graph of the data model. 904 Args: 905 size, float: max height and width of the graph, if one value provided 906 it is used for both. 907 Returns: 908 schema graph viz 909 """ 910 edges = self.graph.edges() 911 return visualize(edges, size=size) 912 913 def is_class_in_schema(self, node_label: str) -> bool: 914 """Determine if provided node_label is in the schema graph/data model. 915 Args: 916 node_label: label of node to search for in the 917 Returns: 918 True, if node is in the graph schema 919 False, if node is not in graph schema 920 """ 921 return node_label in self.graph.nodes() 922 923 def sub_schema_graph( 924 self, source: str, direction: str, size: Optional[float] = None 925 ) -> Optional[graphviz.Digraph]: 926 """Create a sub-schema graph 927 Args: 928 source, str: source node label to start graph 929 direction, str: direction to create the vizualization, choose from "up", "down", "both" 930 size, float: max height and width of the graph, if one value provided it is used for 931 both. 932 Returns: 933 Sub-schema graph viz 934 """ 935 if direction == "down": 936 edges = list(nx.edge_bfs(self.graph, [source])) 937 return visualize(edges, size=size) 938 if direction == "up": 939 paths = self.find_parent_classes(source) 940 edges = [] 941 for _path in paths: 942 _path.append(source) 943 for i in range(0, len(_path) - 1): 944 edges.append((_path[i], _path[i + 1])) 945 return visualize(edges, size=size) 946 if direction == "both": 947 paths = self.find_parent_classes(source) 948 edges = list(nx.edge_bfs(self.graph, [source])) 949 for _path in paths: 950 _path.append(source) 951 for i in range(0, len(_path) - 1): 952 edges.append((_path[i], _path[i + 1])) 953 return visualize(edges, size=size) 954 return None
DataModelGraphExplorer
153 def __init__( 154 self, 155 graph: nx.MultiDiGraph, 156 ): 157 """Load data model graph as a singleton. 158 Args: 159 G: nx.MultiDiGraph, networkx graph representation of the data model 160 """ 161 self.graph = graph # At this point the graph is expected to be fully formed. 162 self.dmr = DataModelRelationships() 163 self.rel_dict = self.dmr.relationships_dictionary
Load data model graph as a singleton.
Arguments:
- G: nx.MultiDiGraph, networkx graph representation of the data model
165 def find_properties(self) -> set[str]: 166 """ 167 Identify all properties, as defined by the first node in a pair, connected with 168 'domainIncludes' edge type 169 170 Returns: 171 properties, set: All properties defined in the data model, each property name 172 is defined by its label. 173 """ 174 properties_list: list[str] = [] 175 for node_1, _, rel in self.graph.edges: 176 if rel == self.rel_dict["domainIncludes"]["edge_key"]: 177 properties_list.append(node_1) 178 properties_set = set(properties_list) 179 return properties_set
Identify all properties, as defined by the first node in a pair, connected with 'domainIncludes' edge type
Returns:
properties, set: All properties defined in the data model, each property name is defined by its label.
181 def find_classes(self) -> set[str]: 182 """ 183 Identify all classes, as defined but all nodes, minus all properties 184 (which are explicitly defined) 185 Returns: 186 classes, set: All classes defined in the data model, each class 187 name is defined by its label. 188 """ 189 nodes = self.graph.nodes 190 properties = self.find_properties() 191 classes = nodes - properties 192 return classes
Identify all classes, as defined but all nodes, minus all properties (which are explicitly defined)
Returns:
classes, set: All classes defined in the data model, each class name is defined by its label.
194 def find_node_range( 195 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 196 ) -> list: 197 """Get valid values for the given node (attribute) 198 Args: 199 node_label, str, Optional[str]: label of the node for which to retrieve valid values 200 node_display_name, str, Optional[str]: Display Name of the node for which to 201 retrieve valid values 202 Returns: 203 valid_values, list: List of valid values associated with the provided node. 204 """ 205 if not node_label: 206 assert node_display_name is not None 207 node_label = self.get_node_label(node_display_name) 208 209 valid_values = [] 210 for node_1, node_2, rel in self.graph.edges: 211 if ( 212 node_1 == node_label 213 and rel == self.rel_dict["rangeIncludes"]["edge_key"] 214 ): 215 valid_values.append(node_2) 216 valid_values = list(set(valid_values)) 217 return valid_values
Get valid values for the given node (attribute)
Arguments:
- node_label, str, Optional[str]: label of the node for which to retrieve valid values
- node_display_name, str, Optional[str]: Display Name of the node for which to retrieve valid values
Returns:
valid_values, list: List of valid values associated with the provided node.
219 def get_adjacent_nodes_by_relationship( 220 self, node_label: str, relationship: str 221 ) -> list[str]: 222 """Get a list of nodes that is / are adjacent to a given node, based on a relationship type. 223 224 Args: 225 node_label: label of the the node whose edges we need to look at. 226 relationship: the type of link(s) that the above node and its immediate neighbors share. 227 228 Returns: 229 List of nodes that are adjacent to the given node. 230 #checked 231 """ 232 nodes = set() 233 for _, node_2, key, _ in self.graph.out_edges(node_label, data=True, keys=True): 234 if key == relationship: 235 nodes.add(node_2) 236 237 return list(nodes)
Get a list of nodes that is / are adjacent to a given node, based on a relationship type.
Arguments:
- node_label: label of the the node whose edges we need to look at.
- relationship: the type of link(s) that the above node and its immediate neighbors share.
Returns:
List of nodes that are adjacent to the given node.
checked
239 def get_component_node_required( 240 self, 241 manifest_component: str, 242 node_validation_rules: Optional[list[str]] = None, 243 node_label: Optional[str] = None, 244 node_display_name: Optional[str] = None, 245 ) -> bool: 246 """Check if a node is required taking into account the manifest component it is defined in 247 (requirements can be set in validaiton rule as well as required column) 248 Args: 249 manifest_component: str, manifest component display name that the node belongs to. 250 node_validation_rules: list[str], valdation rules for a given node and component. 251 node_label: str, Label of the node you would want to get the comment for. 252 node_display_name: str, node display name for the node being queried. 253 Returns: 254 True, if node is required, False if not 255 """ 256 node_required = False 257 258 if not node_validation_rules: 259 # Get node validation rules for a given component 260 node_validation_rules = self.get_component_node_validation_rules( 261 manifest_component=manifest_component, 262 node_label=node_label, 263 node_display_name=node_display_name, 264 ) 265 266 # Check if the valdation rule specifies that the node is required for this particular 267 # component. 268 if rule_in_rule_list("required", node_validation_rules): 269 node_required = True 270 # To prevent any unintended errors, ensure the Required field for this node is False 271 if self.get_node_required( 272 node_label=node_label, node_display_name=node_display_name 273 ): 274 if not node_display_name: 275 assert node_label is not None 276 node_display_name = self.graph.nodes[node_label][ 277 self.rel_dict["displayName"]["node_label"] 278 ] 279 error_str = " ".join( 280 [ 281 f"For component: {manifest_component} and attribute: {node_display_name}", 282 "requirements are being specified in both the Required field and in the", 283 "Validation Rules. If you desire to use validation rules to set component", 284 "specific requirements for this attribute", 285 "then the Required field needs to be set to False, or the validation may", 286 "not work as intended, for other components where the attribute", 287 "that should not be required.", 288 ] 289 ) 290 291 logger.error(error_str) 292 else: 293 # If requirements are not being set in the validaiton rule, then just pull the 294 # standard node requirements from the model 295 node_required = self.get_node_required( 296 node_label=node_label, node_display_name=node_display_name 297 ) 298 return node_required
Check if a node is required taking into account the manifest component it is defined in (requirements can be set in validaiton rule as well as required column)
Arguments:
- manifest_component: str, manifest component display name that the node belongs to.
- node_validation_rules: list[str], valdation rules for a given node and component.
- node_label: str, Label of the node you would want to get the comment for.
- node_display_name: str, node display name for the node being queried.
Returns:
True, if node is required, False if not
300 def get_component_node_validation_rules( 301 self, 302 manifest_component: str, 303 node_label: Optional[str] = None, 304 node_display_name: Optional[str] = None, 305 ) -> list: 306 """Get valdation rules for a given node and component. 307 Args: 308 manifest_component: str, manifest component display name that the node belongs to. 309 node_label: str, Label of the node you would want to get the comment for. 310 node_display_name: str, node display name for the node being queried. 311 Returns: 312 validation_rules: list, validation rules list for a given node and component. 313 """ 314 # get any additional validation rules associated with this node (e.g. can this node 315 # be mapped to a list of other nodes) 316 node_validation_rules = self.get_node_validation_rules( 317 node_label=node_label, node_display_name=node_display_name 318 ) 319 320 # Parse the validation rules per component if applicable 321 if node_validation_rules and isinstance(node_validation_rules, dict): 322 node_validation_rules_list = extract_component_validation_rules( 323 manifest_component=manifest_component, 324 validation_rules_dict=node_validation_rules, # type: ignore 325 ) 326 else: 327 assert isinstance(node_validation_rules, list) 328 node_validation_rules_list = node_validation_rules 329 return node_validation_rules_list
Get valdation rules for a given node and component.
Arguments:
- manifest_component: str, manifest component display name that the node belongs to.
- node_label: str, Label of the node you would want to get the comment for.
- node_display_name: str, node display name for the node being queried.
Returns:
validation_rules: list, validation rules list for a given node and component.
331 def get_component_requirements( 332 self, 333 source_component: str, 334 ) -> list[str]: 335 """ 336 Get all components that are associated with a given source component and are 337 required by it. 338 339 Args: 340 source_component: source component for which we need to find all required downstream 341 components. 342 343 Returns: 344 List of nodes that are descendants from the source component are are related to the 345 source through a specific component relationship. 346 """ 347 348 req_components = list( 349 reversed( 350 self.get_descendants_by_edge_type( 351 source_component, 352 self.rel_dict["requiresComponent"]["edge_key"], 353 ordered=True, 354 ) 355 ) 356 ) 357 358 return req_components
Get all components that are associated with a given source component and are required by it.
Arguments:
- source_component: source component for which we need to find all required downstream components.
Returns:
List of nodes that are descendants from the source component are are related to the source through a specific component relationship.
360 def get_component_requirements_graph( 361 self, 362 source_component: str, 363 ) -> nx.DiGraph: 364 """ 365 Get all components that are associated with a given source component and are required by it; 366 return the components as a dependency graph (i.e. a DAG). 367 368 Args: 369 source_component, str: source component for which we need to find all required 370 downstream components. 371 372 Returns: 373 A subgraph of the schema graph induced on nodes that are descendants from the source 374 component and are related to the source through a specific component relationship. 375 """ 376 377 # get a list of required component nodes 378 req_components = self.get_component_requirements(source_component) 379 380 # get the subgraph induced on required component nodes 381 req_components_graph = self.get_subgraph_by_edge_type( 382 self.rel_dict["requiresComponent"]["edge_key"], 383 ).subgraph(req_components) 384 385 return req_components_graph
Get all components that are associated with a given source component and are required by it; return the components as a dependency graph (i.e. a DAG).
Arguments:
- source_component, str: source component for which we need to find all required downstream components.
Returns:
A subgraph of the schema graph induced on nodes that are descendants from the source component and are related to the source through a specific component relationship.
387 def get_descendants_by_edge_type( 388 self, 389 source_node: str, 390 relationship: str, 391 connected: bool = True, 392 ordered: bool = False, 393 ) -> list[str]: 394 """ 395 Get all nodes that are descendants of a given source node, based on a specific 396 type of edge / relationship type. 397 398 Args: 399 source_node: The node whose descendants need to be retreived. 400 relationship: Edge / link relationship type with possible values same as in above docs. 401 connected: 402 If True, we need to ensure that all descendant nodes are reachable from the source 403 node, i.e., they are part of the same connected component. 404 If False, the descendants could be in multiple connected components. 405 Default value is True. 406 ordered: 407 If True, the list of descendants will be topologically ordered. 408 If False, the list has no particular order (depends on the order in which the 409 descendats were traversed in the subgraph). 410 411 Returns: 412 List of nodes that are descendants from a particular node (sorted / unsorted) 413 """ 414 415 root_descendants = nx.descendants(self.graph, source_node) 416 417 subgraph_nodes = list(root_descendants) 418 subgraph_nodes.append(source_node) 419 descendants_subgraph = self.graph.subgraph(subgraph_nodes) 420 421 # prune the descendants subgraph so as to include only those edges that match 422 # the relationship type 423 rel_edges = [] 424 for node_1, node_2, key, _ in descendants_subgraph.edges(data=True, keys=True): 425 if key == relationship: 426 rel_edges.append((node_1, node_2)) 427 428 relationship_subgraph = nx.DiGraph() 429 relationship_subgraph.add_edges_from(rel_edges) 430 431 descendants = relationship_subgraph.nodes() 432 433 if not descendants: 434 # return empty list if there are no nodes that are reachable from the 435 # source node based on this relationship type 436 return [] 437 438 if connected and ordered: 439 # get the set of reachable nodes from the source node 440 descendants = nx.descendants(relationship_subgraph, source_node) 441 descendants.add(source_node) 442 443 # normally, the descendants from a node are unordered (peculiarity 444 # of nx descendants call) 445 # form the subgraph on descendants and order it topologically 446 # this assumes an acyclic subgraph 447 descendants = nx.topological_sort( 448 relationship_subgraph.subgraph(descendants) 449 ) 450 elif connected: 451 # get the nodes that are reachable from a given source node 452 # after the pruning process above some nodes in the 453 # root_descendants subgraph might have become disconnected and 454 # will be omitted 455 descendants = nx.descendants(relationship_subgraph, source_node) 456 descendants.add(source_node) 457 elif ordered: 458 # sort the nodes topologically 459 # this requires the graph to be an acyclic graph 460 descendants = nx.topological_sort(relationship_subgraph) 461 462 return list(descendants)
Get all nodes that are descendants of a given source node, based on a specific type of edge / relationship type.
Arguments:
- source_node: The node whose descendants need to be retreived.
- relationship: Edge / link relationship type with possible values same as in above docs.
- connected: If True, we need to ensure that all descendant nodes are reachable from the source node, i.e., they are part of the same connected component. If False, the descendants could be in multiple connected components. Default value is True.
- ordered: If True, the list of descendants will be topologically ordered. If False, the list has no particular order (depends on the order in which the descendats were traversed in the subgraph).
Returns:
List of nodes that are descendants from a particular node (sorted / unsorted)
464 def get_digraph_by_edge_type(self, edge_type: str) -> nx.DiGraph: 465 """Get a networkx digraph of the nodes connected via a given edge_type. 466 Args: 467 edge_type: 468 Edge type to search for, possible types are defined by 'edge_key' 469 in relationship class 470 Returns: 471 """ 472 digraph = nx.DiGraph() 473 for node_1, node_2, key, _ in self.graph.edges(data=True, keys=True): 474 if key == edge_type: 475 digraph.add_edge(node_1, node_2) 476 return digraph
Get a networkx digraph of the nodes connected via a given edge_type.
Arguments:
- edge_type: Edge type to search for, possible types are defined by 'edge_key' in relationship class
Returns:
478 def get_edges_by_relationship( 479 self, 480 node: str, 481 relationship: str, 482 ) -> list[tuple[str, str]]: 483 """Get a list of out-edges of a node where the edges match a specifc type of relationship. 484 485 i.e., the edges connecting a node to its neighbors are of relationship type -- "parentOf" 486 (set of edges to children / sub-class nodes). 487 488 Args: 489 node: the node whose edges we need to look at. 490 relationship: the type of link(s) that the above node and its immediate neighbors share. 491 492 Returns: 493 List of edges that are connected to the node. 494 """ 495 edges: list[tuple[str, str]] = [] 496 497 for node_1, node_2, key, _ in self.graph.out_edges(node, data=True, keys=True): 498 if key == relationship: 499 edges.append((node_1, node_2)) 500 501 return edges
Get a list of out-edges of a node where the edges match a specifc type of relationship.
i.e., the edges connecting a node to its neighbors are of relationship type -- "parentOf" (set of edges to children / sub-class nodes).
Arguments:
- node: the node whose edges we need to look at.
- relationship: the type of link(s) that the above node and its immediate neighbors share.
Returns:
List of edges that are connected to the node.
503 def get_ordered_entry(self, key: str, source_node_label: str) -> list[str]: 504 """ 505 Order the values associated with a particular node and edge_key to 506 match original ordering in schema. 507 508 Args: 509 key (str): a key representing and edge relationship in 510 DataModelRelationships.relationships_dictionary 511 source_node_label (str): node to look for edges of and order 512 513 Raises: 514 KeyError: cannot find source node in graph 515 516 Returns: 517 list[str]: 518 list of sorted nodes, that share the specified relationship with the source node 519 For the example data model, for key='rangeIncludes', source_node_label='CancerType' 520 the return would be ['Breast, 'Colorectal', 'Lung', 'Prostate', 'Skin'] in that 521 exact order. 522 """ 523 # Check if node is in the graph, if not throw an error. 524 if not self.is_class_in_schema(node_label=source_node_label): 525 raise KeyError( 526 f"Cannot find node: {source_node_label} in the graph, please check entry." 527 ) 528 529 edge_key = self.rel_dict[key]["edge_key"] 530 531 # Handle out edges 532 if self.rel_dict[key]["jsonld_direction"] == "out": 533 # use outedges 534 535 original_edge_weights_dict = { 536 attached_node: self.graph[source_node][attached_node][edge_key][ 537 "weight" 538 ] 539 for source_node, attached_node in self.graph.out_edges( 540 source_node_label 541 ) 542 if edge_key in self.graph[source_node][attached_node] 543 } 544 # Handle in edges 545 else: 546 # use inedges 547 original_edge_weights_dict = { 548 attached_node: self.graph[attached_node][source_node][edge_key][ 549 "weight" 550 ] 551 for attached_node, source_node in self.graph.in_edges(source_node_label) 552 if edge_key in self.graph[attached_node][source_node] 553 } 554 555 sorted_nodes = list( 556 dict( 557 sorted(original_edge_weights_dict.items(), key=lambda item: item[1]) 558 ).keys() 559 ) 560 561 return sorted_nodes
Order the values associated with a particular node and edge_key to match original ordering in schema.
Arguments:
- key (str): a key representing and edge relationship in DataModelRelationships.relationships_dictionary
- source_node_label (str): node to look for edges of and order
Raises:
- KeyError: cannot find source node in graph
Returns:
list[str]: list of sorted nodes, that share the specified relationship with the source node For the example data model, for key='rangeIncludes', source_node_label='CancerType' the return would be ['Breast, 'Colorectal', 'Lung', 'Prostate', 'Skin'] in that exact order.
564 def get_nodes_ancestors(self, subgraph: nx.DiGraph, node_label: str) -> list[str]: 565 """Get a list of nodes reachable from source component in graph 566 567 Args: 568 subgraph (nx.DiGraph): networkx graph object 569 node_label (str): label of node to find ancestors for 570 571 Returns: 572 list[str]: nodes reachable from source in graph 573 """ 574 all_ancestors = list(nx.ancestors(subgraph, node_label)) 575 576 return all_ancestors
Get a list of nodes reachable from source component in graph
Arguments:
- subgraph (nx.DiGraph): networkx graph object
- node_label (str): label of node to find ancestors for
Returns:
list[str]: nodes reachable from source in graph
578 def get_node_comment( 579 self, node_display_name: Optional[str] = None, node_label: Optional[str] = None 580 ) -> str: 581 """Get the node definition, i.e., the "comment" associated with a given node display name. 582 583 Args: 584 node_display_name, str: Display name of the node which you want to get the comment for. 585 node_label, str: Label of the node you would want to get the comment for. 586 Returns: 587 Comment associated with node, as a string. 588 """ 589 if not node_label: 590 assert node_display_name is not None 591 node_label = self.get_node_label(node_display_name) 592 593 if not node_label: 594 return "" 595 596 node_definition = self.graph.nodes[node_label][ 597 self.rel_dict["comment"]["node_label"] 598 ] 599 return node_definition
Get the node definition, i.e., the "comment" associated with a given node display name.
Arguments:
- node_display_name, str: Display name of the node which you want to get the comment for.
- node_label, str: Label of the node you would want to get the comment for.
Returns:
Comment associated with node, as a string.
601 def get_node_dependencies( 602 self, 603 source_node: str, 604 display_names: bool = True, 605 schema_ordered: bool = True, 606 ) -> list[str]: 607 """Get the immediate dependencies that are related to a given source node. 608 609 Args: 610 source_node: The node whose dependencies we need to compute. 611 display_names: if True, return list of display names of each of the dependencies. 612 if False, return list of node labels of each of the dependencies. 613 schema_ordered: 614 if True, return the dependencies of the node following the order of the schema 615 (slower). 616 if False, return dependencies from graph without guaranteeing schema order (faster) 617 618 Returns: 619 List of nodes that are dependent on the source node. 620 """ 621 622 if schema_ordered: 623 # get dependencies in the same order in which they are defined in the schema 624 required_dependencies = self.get_ordered_entry( 625 key=self.rel_dict["requiresDependency"]["edge_key"], 626 source_node_label=source_node, 627 ) 628 else: 629 required_dependencies = self.get_adjacent_nodes_by_relationship( 630 node_label=source_node, 631 relationship=self.rel_dict["requiresDependency"]["edge_key"], 632 ) 633 634 if display_names: 635 # get display names of dependencies 636 dependencies_display_names = [] 637 638 for req in required_dependencies: 639 dependencies_display_names.append( 640 self.graph.nodes[req][self.rel_dict["displayName"]["node_label"]] 641 ) 642 643 return dependencies_display_names 644 645 return required_dependencies
Get the immediate dependencies that are related to a given source node.
Arguments:
- source_node: The node whose dependencies we need to compute.
- display_names: if True, return list of display names of each of the dependencies. if False, return list of node labels of each of the dependencies.
- schema_ordered: if True, return the dependencies of the node following the order of the schema (slower). if False, return dependencies from graph without guaranteeing schema order (faster)
Returns:
List of nodes that are dependent on the source node.
647 def get_nodes_descendants(self, node_label: str) -> list[str]: 648 """Return a list of nodes reachable from source in graph 649 Args: 650 node_label, str: any given node 651 Return: 652 all_descendants, list: nodes reachable from source in graph 653 """ 654 all_descendants = list(nx.descendants(self.graph, node_label)) 655 656 return all_descendants
Return a list of nodes reachable from source in graph
Arguments:
- node_label, str: any given node
Return:
all_descendants, list: nodes reachable from source in graph
658 def get_nodes_display_names( 659 self, 660 node_list: list[str], 661 ) -> list[str]: 662 """Get display names associated with the given list of nodes. 663 664 Args: 665 node_list: List of nodes whose display names we need to retrieve. 666 667 Returns: 668 List of display names. 669 """ 670 node_list_display_names = [ 671 self.graph.nodes[node][self.rel_dict["displayName"]["node_label"]] 672 for node in node_list 673 ] 674 675 return node_list_display_names
Get display names associated with the given list of nodes.
Arguments:
- node_list: List of nodes whose display names we need to retrieve.
Returns:
List of display names.
677 def get_node_label(self, node_display_name: str) -> str: 678 """Get the node label for a given display name. 679 680 Args: 681 node_display_name: Display name of the node which you want to get the label for. 682 Returns: 683 Node label associated with given node. 684 If display name not part of schema, return an empty string. 685 """ 686 687 node_class_label = get_class_label_from_display_name( 688 display_name=node_display_name 689 ) 690 node_property_label = get_property_label_from_display_name( 691 display_name=node_display_name 692 ) 693 694 if node_class_label in self.graph.nodes: 695 node_label = node_class_label 696 elif node_property_label in self.graph.nodes: 697 node_label = node_property_label 698 else: 699 node_label = "" 700 701 return node_label
Get the node label for a given display name.
Arguments:
- node_display_name: Display name of the node which you want to get the label for.
Returns:
Node label associated with given node. If display name not part of schema, return an empty string.
703 def get_node_range( 704 self, 705 node_label: Optional[str] = None, 706 node_display_name: Optional[str] = None, 707 display_names: bool = False, 708 ) -> list[str]: 709 """ 710 Get the range, i.e., all the valid values that are associated with a node label. 711 712 713 Args: 714 node_label (Optional[str], optional): Node for which you need to retrieve the range. 715 Defaults to None. 716 node_display_name (Optional[str], optional): _description_. Defaults to None. 717 display_names (bool, optional): _description_. Defaults to False. 718 719 Raises: 720 ValueError: If the node cannot be found in the graph. 721 722 Returns: 723 list[str]: 724 If display_names=False, a list of valid values (labels) associated with a given node. 725 If display_names=True, a list of valid values (display names) associated 726 with a given node 727 """ 728 if not node_label: 729 assert node_display_name is not None 730 node_label = self.get_node_label(node_display_name) 731 732 try: 733 # get node range in the order defined in schema for given node 734 required_range = self.find_node_range(node_label=node_label) 735 except KeyError as exc: 736 raise ValueError( 737 f"The source node {node_label} does not exist in the graph. " 738 "Please use a different node." 739 ) from exc 740 741 if display_names: 742 # get the display name(s) of all dependencies 743 dependencies_display_names = [] 744 745 for req in required_range: 746 dependencies_display_names.append(self.graph.nodes[req]["displayName"]) 747 748 return dependencies_display_names 749 750 return required_range
Get the range, i.e., all the valid values that are associated with a node label.
Arguments:
- node_label (Optional[str], optional): Node for which you need to retrieve the range. Defaults to None.
- node_display_name (Optional[str], optional): _description_. Defaults to None.
- display_names (bool, optional): _description_. Defaults to False.
Raises:
- ValueError: If the node cannot be found in the graph.
Returns:
list[str]: If display_names=False, a list of valid values (labels) associated with a given node. If display_names=True, a list of valid values (display names) associated with a given node
752 def get_node_required( 753 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 754 ) -> bool: 755 """Check if a given node is required or not. 756 757 Note: The possible options that a node can be associated with -- "required" / "optional". 758 759 Args: 760 node_label: Label of the node for which you need to look up. 761 node_display_name: Display name of the node for which you want look up. 762 Returns: 763 True: If the given node is a "required" node. 764 False: If the given node is not a "required" (i.e., an "optional") node. 765 """ 766 if not node_label: 767 assert node_display_name is not None 768 node_label = self.get_node_label(node_display_name) 769 770 rel_node_label = self.rel_dict["required"]["node_label"] 771 node_required = self.graph.nodes[node_label][rel_node_label] 772 return node_required
Check if a given node is required or not.
Note: The possible options that a node can be associated with -- "required" / "optional".
Arguments:
- node_label: Label of the node for which you need to look up.
- node_display_name: Display name of the node for which you want look up.
Returns:
True: If the given node is a "required" node. False: If the given node is not a "required" (i.e., an "optional") node.
774 def get_node_validation_rules( 775 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 776 ) -> Union[list, dict[str, str]]: 777 """Get validation rules associated with a node, 778 779 Args: 780 node_label: Label of the node for which you need to look up. 781 node_display_name: Display name of the node which you want to get the label for. 782 Returns: 783 A set of validation rules associated with node, as a list or a dictionary. 784 """ 785 if not node_label: 786 if node_display_name is None: 787 raise ValueError( 788 "Either node_label or node_display_name must be provided." 789 ) 790 791 # try search node label using display name 792 node_label = self.get_node_label(node_display_name) 793 794 if not node_label: 795 return [] 796 797 try: 798 node_validation_rules = self.graph.nodes[node_label]["validationRules"] 799 except KeyError as key_error: 800 raise ValueError( 801 f"{node_label} is not in the graph, please provide a proper node label" 802 ) from key_error 803 804 return node_validation_rules
Get validation rules associated with a node,
Arguments:
- node_label: Label of the node for which you need to look up.
- node_display_name: Display name of the node which you want to get the label for.
Returns:
A set of validation rules associated with node, as a list or a dictionary.
806 def get_subgraph_by_edge_type(self, relationship: str) -> nx.DiGraph: 807 """Get a subgraph containing all edges of a given type (aka relationship). 808 809 Args: 810 relationship: edge / link relationship type with possible values same as in above docs. 811 812 Returns: 813 Directed graph on edges of a particular type (aka relationship) 814 """ 815 816 # prune the metadata model graph so as to include only those edges that 817 # match the relationship type 818 rel_edges = [] 819 for node_1, node_2, key, _ in self.graph.out_edges(data=True, keys=True): 820 if key == relationship: 821 rel_edges.append((node_1, node_2)) 822 823 relationship_subgraph = nx.DiGraph() 824 relationship_subgraph.add_edges_from(rel_edges) 825 826 return relationship_subgraph
Get a subgraph containing all edges of a given type (aka relationship).
Arguments:
- relationship: edge / link relationship type with possible values same as in above docs.
Returns:
Directed graph on edges of a particular type (aka relationship)
828 def find_adjacent_child_classes( 829 self, node_label: Optional[str] = None, node_display_name: Optional[str] = None 830 ) -> list[str]: 831 """Find child classes of a given node. 832 Args: 833 node_display_name: Display name of the node to look up. 834 node_label: Label of the node to look up. 835 Returns: 836 List of nodes that are adjacent to the given node, by SubclassOf relationship. 837 """ 838 if not node_label: 839 assert node_display_name is not None 840 node_label = self.get_node_label(node_display_name) 841 842 return self.get_adjacent_nodes_by_relationship( 843 node_label=node_label, relationship=self.rel_dict["subClassOf"]["edge_key"] 844 )
Find child classes of a given node.
Arguments:
- node_display_name: Display name of the node to look up.
- node_label: Label of the node to look up.
Returns:
List of nodes that are adjacent to the given node, by SubclassOf relationship.
846 def find_child_classes(self, schema_class: str) -> list: 847 """Find schema classes that inherit from the given class 848 Args: 849 schema_class: node label for the class to from which to look for children. 850 Returns: 851 list of children to the schema_class. 852 """ 853 child_classes = unlist(list(self.graph.successors(schema_class))) 854 assert isinstance(child_classes, list) 855 return child_classes
Find schema classes that inherit from the given class
Arguments:
- schema_class: node label for the class to from which to look for children.
Returns:
list of children to the schema_class.
857 def find_class_specific_properties(self, schema_class: str) -> list[str]: 858 """Find properties specifically associated with a given class 859 Args: 860 schema_class, str: node/class label, to identify properties for. 861 Returns: 862 properties, list: List of properties associate with a given schema class. 863 Raises: 864 KeyError: Key error is raised if the provded schema_class is not in the graph 865 """ 866 867 if not self.is_class_in_schema(schema_class): 868 raise KeyError( 869 ( 870 f"Schema_class provided: {schema_class} is not in the data model, please check " 871 "that you are providing the proper class/node label" 872 ) 873 ) 874 875 properties = [] 876 for node1, node2 in self.graph.edges(): 877 if ( 878 node2 == schema_class 879 and "domainValue" in self.graph[node1][schema_class] 880 ): 881 properties.append(node1) 882 return properties
Find properties specifically associated with a given class
Arguments:
- schema_class, str: node/class label, to identify properties for.
Returns:
properties, list: List of properties associate with a given schema class.
Raises:
- KeyError: Key error is raised if the provded schema_class is not in the graph
884 def find_parent_classes(self, node_label: str) -> list[list[str]]: 885 """Find all parents of the provided node 886 Args: 887 node_label: label of the node to find parents of 888 Returns: 889 List of list of Parents to the given node. 890 """ 891 # Get digraph of nodes with parents 892 digraph = self.get_digraph_by_edge_type("parentOf") 893 894 # Get root node 895 root_node = list(nx.topological_sort(digraph))[0] 896 897 # Get paths between root_node and the target node. 898 paths = nx.all_simple_paths(self.graph, source=root_node, target=node_label) 899 900 return [_path[:-1] for _path in paths]
Find all parents of the provided node
Arguments:
- node_label: label of the node to find parents of
Returns:
List of list of Parents to the given node.
902 def full_schema_graph(self, size: Optional[int] = None) -> graphviz.Digraph: 903 """Create a graph of the data model. 904 Args: 905 size, float: max height and width of the graph, if one value provided 906 it is used for both. 907 Returns: 908 schema graph viz 909 """ 910 edges = self.graph.edges() 911 return visualize(edges, size=size)
Create a graph of the data model.
Arguments:
- size, float: max height and width of the graph, if one value provided it is used for both.
Returns:
schema graph viz
913 def is_class_in_schema(self, node_label: str) -> bool: 914 """Determine if provided node_label is in the schema graph/data model. 915 Args: 916 node_label: label of node to search for in the 917 Returns: 918 True, if node is in the graph schema 919 False, if node is not in graph schema 920 """ 921 return node_label in self.graph.nodes()
Determine if provided node_label is in the schema graph/data model.
Arguments:
- node_label: label of node to search for in the
Returns:
True, if node is in the graph schema False, if node is not in graph schema
923 def sub_schema_graph( 924 self, source: str, direction: str, size: Optional[float] = None 925 ) -> Optional[graphviz.Digraph]: 926 """Create a sub-schema graph 927 Args: 928 source, str: source node label to start graph 929 direction, str: direction to create the vizualization, choose from "up", "down", "both" 930 size, float: max height and width of the graph, if one value provided it is used for 931 both. 932 Returns: 933 Sub-schema graph viz 934 """ 935 if direction == "down": 936 edges = list(nx.edge_bfs(self.graph, [source])) 937 return visualize(edges, size=size) 938 if direction == "up": 939 paths = self.find_parent_classes(source) 940 edges = [] 941 for _path in paths: 942 _path.append(source) 943 for i in range(0, len(_path) - 1): 944 edges.append((_path[i], _path[i + 1])) 945 return visualize(edges, size=size) 946 if direction == "both": 947 paths = self.find_parent_classes(source) 948 edges = list(nx.edge_bfs(self.graph, [source])) 949 for _path in paths: 950 _path.append(source) 951 for i in range(0, len(_path) - 1): 952 edges.append((_path[i], _path[i + 1])) 953 return visualize(edges, size=size) 954 return None
Create a sub-schema graph
Arguments:
- source, str: source node label to start graph
- direction, str: direction to create the vizualization, choose from "up", "down", "both"
- size, float: max height and width of the graph, if one value provided it is used for both.
Returns:
Sub-schema graph viz