schematic.models.GE_Helpers
1import logging 2import os 3import uuid 4 5# allows specifying explicit variable types 6from typing import Dict, List 7 8import numpy as np 9from great_expectations.core import ExpectationSuite 10from great_expectations.core.expectation_configuration import ExpectationConfiguration 11from great_expectations.data_context import BaseDataContext 12from great_expectations.data_context.types.base import ( 13 AnonymizedUsageStatisticsConfig, 14 DataContextConfig, 15 DatasourceConfig, 16 FilesystemStoreBackendDefaults, 17) 18from great_expectations.data_context.types.resource_identifiers import ( 19 ExpectationSuiteIdentifier, 20) 21from great_expectations.exceptions.exceptions import GreatExpectationsError 22from opentelemetry import trace 23 24import great_expectations as ge 25from schematic.models.validate_attribute import GenerateError 26from schematic.schemas.data_model_graph import DataModelGraphExplorer 27from schematic.utils.schema_utils import extract_component_validation_rules 28from schematic.utils.validate_utils import ( 29 iterable_to_str_list, 30 np_array_to_str_list, 31 required_is_only_rule, 32 rule_in_rule_list, 33) 34 35logger = logging.getLogger(__name__) 36tracer = trace.get_tracer("Schematic") 37 38# List of modifiers that users can add to a rule, that arent rules themselves. 39# as additional modifiers are added will need to update this list 40 41RULE_MODIFIERS = ["error", "warning", "strict", "like", "set", "value"] 42VALIDATION_EXPECTATION = { 43 "int": "expect_column_values_to_be_in_type_list", 44 "float": "expect_column_values_to_be_in_type_list", 45 "str": "expect_column_values_to_be_of_type", 46 "num": "expect_column_values_to_be_in_type_list", 47 "date": "expect_column_values_to_be_dateutil_parseable", 48 "recommended": "expect_column_values_to_not_be_null", 49 "protectAges": "expect_column_values_to_be_between", 50 "unique": "expect_column_values_to_be_unique", 51 "inRange": "expect_column_values_to_be_between", 52 "IsNA": "expect_column_values_to_match_regex_list", 53 # To be implemented rules with possible expectations 54 # "list": "expect_column_values_to_not_match_regex_list", 55 # "regex": "expect_column_values_to_match_regex", 56 # "url": "expect_column_values_to_be_valid_urls", 57 # "matchAtLeastOne": "expect_foreign_keys_in_column_a_to_exist_in_column_b", 58 # "matchExactlyOne": "expect_foreign_keys_in_column_a_to_exist_in_column_b", 59 # "matchNone": "expect_compound_columns_to_be_unique", 60} 61 62 63class GreatExpectationsHelpers(object): 64 """ 65 Great Expectations helper class 66 67 Provides basic utilities to: 68 1) Create GE workflow specific to manifest according to validation rules 69 2) Parse results dict to generate appropriate errors 70 """ 71 72 def __init__(self, dmge, unimplemented_expectations, manifest, manifestPath): 73 """ 74 Purpose: 75 Instantiate a great expectations helpers object 76 Args: 77 dmge: 78 DataModelGraphExplorer Object 79 unimplemented_expectations: 80 dictionary of validation rules that currently do not have expectations developed 81 manifest: 82 manifest being validated 83 manifestPath: 84 path to manifest being validated 85 """ 86 self.unimplemented_expectations = unimplemented_expectations 87 self.dmge = dmge 88 self.manifest = manifest 89 self.manifestPath = manifestPath 90 91 @tracer.start_as_current_span("GreatExpectationsHelpers::build_context") 92 def build_context(self): 93 """ 94 Purpose: 95 Create a dataContext and datasource and add to object 96 Returns: 97 saves dataContext and datasource to self 98 """ 99 self.context = ge.get_context() 100 101 # create datasource configuration 102 datasource_config = { 103 "name": "example_datasource", 104 "class_name": "Datasource", 105 "module_name": "great_expectations.datasource", 106 "execution_engine": { 107 "module_name": "great_expectations.execution_engine", 108 "class_name": "PandasExecutionEngine", 109 }, 110 "data_connectors": { 111 "default_runtime_data_connector_name": { 112 "class_name": "RuntimeDataConnector", 113 "batch_identifiers": ["default_identifier_name"], 114 }, 115 }, 116 } 117 118 # Setting this to False prevents extra data from leaving schematic 119 anonymous_usage_statistics = AnonymizedUsageStatisticsConfig(enabled=False) 120 121 # create data context configuration 122 data_context_config = DataContextConfig( 123 datasources={ 124 "pandas": DatasourceConfig( 125 class_name="Datasource", 126 execution_engine={"class_name": "PandasExecutionEngine"}, 127 data_connectors={ 128 "default_runtime_data_connector_name": { 129 "class_name": "RuntimeDataConnector", 130 "batch_identifiers": ["default_identifier_name"], 131 } 132 }, 133 ) 134 }, 135 store_backend_defaults=FilesystemStoreBackendDefaults( 136 root_directory=os.path.join(os.getcwd(), "great_expectations") 137 ), 138 anonymous_usage_statistics=anonymous_usage_statistics, 139 ) 140 141 # build context and add data source 142 self.context = BaseDataContext(project_config=data_context_config) 143 # self.context.test_yaml_config(yaml.dump(datasource_config)) 144 self.context.add_datasource(**datasource_config) 145 146 @tracer.start_as_current_span( 147 "GreatExpectationsHelpers::add_expectation_suite_if_not_exists" 148 ) 149 def add_expectation_suite_if_not_exists(self) -> ExpectationSuite: 150 """ 151 Purpose: 152 Add expectation suite if it does not exist 153 Input: 154 Returns: 155 saves expectation suite and identifier to self 156 """ 157 self.expectation_suite_name = f"Manifest_test_suite_{uuid.uuid4()}" 158 expectation_suite = self.context.add_expectation_suite( 159 expectation_suite_name=self.expectation_suite_name, 160 ) 161 self.suite = expectation_suite 162 163 return self.suite 164 165 @tracer.start_as_current_span("GreatExpectationsHelpers::build_expectation_suite") 166 def build_expectation_suite( 167 self, 168 ) -> None: 169 """ 170 Purpose: 171 Construct an expectation suite to validate columns with rules that have expectations 172 Add suite to object 173 Input: 174 175 Returns: 176 saves expectation suite and identifier to self 177 178 """ 179 180 # create blank expectation suite 181 self.suite = self.add_expectation_suite_if_not_exists() 182 183 # build expectation configurations for each expectation 184 for col in self.manifest.columns: 185 args = {} 186 meta = {} 187 188 # remove trailing/leading whitespaces from manifest 189 self.manifest.map(lambda x: x.strip() if isinstance(x, str) else x) 190 191 validation_rules = self.dmge.get_node_validation_rules( 192 node_display_name=col 193 ) 194 195 # check if attribute has any rules associated with it 196 if validation_rules: 197 # Check if the validation rule applies to this manifest 198 if isinstance(validation_rules, dict): 199 validation_rules = extract_component_validation_rules( 200 manifest_component=self.manifest["Component"][0], 201 validation_rules_dict=validation_rules, 202 ) 203 # iterate through all validation rules for an attribute 204 for rule in validation_rules: 205 base_rule = rule.split(" ")[0] 206 207 # check if rule has an implemented expectation 208 if rule_in_rule_list( 209 rule, self.unimplemented_expectations 210 ) or required_is_only_rule( 211 rule=rule, 212 attribute=col, 213 rule_modifiers=RULE_MODIFIERS, 214 validation_expectation=VALIDATION_EXPECTATION, 215 ): 216 continue 217 218 args["column"] = col 219 args["result_format"] = "COMPLETE" 220 221 # Validate num 222 if base_rule == "num": 223 args["mostly"] = 1.0 224 args["type_list"] = ["int", "int64", "float", "float64"] 225 meta = { 226 "notes": { 227 "format": "markdown", 228 "content": "Expect column values to be of int or float type. **Markdown** `Supported`", 229 }, 230 "validation_rule": rule, 231 } 232 233 # Validate float 234 elif base_rule == "float": 235 args["mostly"] = 1.0 236 args["type_list"] = ["float", "float64"] 237 meta = { 238 "notes": { 239 "format": "markdown", 240 "content": "Expect column values to be of float type. **Markdown** `Supported`", 241 }, 242 "validation_rule": rule, 243 } 244 245 # Validate int 246 elif base_rule == "int": 247 args["mostly"] = 1.0 248 args["type_list"] = ["int", "int64"] 249 meta = { 250 "notes": { 251 "format": "markdown", 252 "content": "Expect column values to be of int type. **Markdown** `Supported`", 253 }, 254 "validation_rule": rule, 255 } 256 257 # Validate string 258 elif base_rule == "str": 259 args["mostly"] = 1.0 260 args["type_"] = "str" 261 meta = { 262 "notes": { 263 "format": "markdown", 264 "content": "Expect column values to be of string type. **Markdown** `Supported`", 265 }, 266 "validation_rule": rule, 267 } 268 269 # Validate date 270 elif base_rule == "date": 271 args["mostly"] = 1.0 272 meta = { 273 "notes": { 274 "format": "markdown", 275 "content": ( 276 "Expect column values to be parsable by dateutils. " 277 "**Markdown** `Supported`" 278 ), 279 }, 280 "validation_rule": rule, 281 } 282 283 elif base_rule == ("recommended"): 284 args["mostly"] = 0.0000000001 285 meta = { 286 "notes": { 287 "format": "markdown", 288 "content": "Expect column to not be empty. **Markdown** `Supported`", 289 }, 290 "validation_rule": rule, 291 } 292 293 elif base_rule == ("protectAges"): 294 # Function to convert to different age limit formats 295 min_age, max_age = self.get_age_limits() 296 297 args["mostly"] = 1.0 298 args["min_value"] = min_age 299 args["max_value"] = max_age 300 # args['allow_cross_type_comparisons']=True # TODO Can allow after issue #980 is completed 301 meta = { 302 "notes": { 303 "format": "markdown", 304 "content": "Expect ages to be between 18 years (6,570 days) and 90 years (32,850 days) of age. **Markdown** `Supported`", 305 }, 306 "validation_rule": rule, 307 } 308 309 elif base_rule == ("unique"): 310 args["mostly"] = 1.0 311 meta = { 312 "notes": { 313 "format": "markdown", 314 "content": "Expect column values to be Unique. **Markdown** `Supported`", 315 }, 316 "validation_rule": rule, 317 } 318 319 elif base_rule == ("inRange"): 320 args["mostly"] = 1.0 321 args["min_value"] = ( 322 float(rule.split(" ")[1]) 323 if rule.split(" ")[1].lower() != "none" 324 else None 325 ) 326 args["max_value"] = ( 327 float(rule.split(" ")[2]) 328 if rule.split(" ")[2].lower() != "none" 329 else None 330 ) 331 args[ 332 "allow_cross_type_comparisons" 333 ] = True # TODO Should follow up with issue #980 334 meta = { 335 "notes": { 336 "format": "markdown", 337 "content": "Expect column values to be within a specified range. **Markdown** `Supported`", 338 }, 339 "validation_rule": rule, 340 } 341 342 elif base_rule == ("IsNA"): 343 args["mostly"] = 1.0 344 args["regex_list"] = ["Not Applicable"] 345 meta = { 346 "notes": { 347 "format": "markdown", 348 "content": "Expect column values to be marked Not Applicable. **Markdown** `Supported`", 349 }, 350 "validation_rule": rule, 351 } 352 353 # add expectation for attribute to suite 354 self.add_expectation( 355 rule=rule, 356 args=args, 357 meta=meta, 358 validation_expectation=VALIDATION_EXPECTATION, 359 ) 360 361 self.context.update_expectation_suite( 362 expectation_suite=self.suite, 363 ) 364 365 suite_identifier = ExpectationSuiteIdentifier( 366 expectation_suite_name=self.expectation_suite_name 367 ) 368 369 if logger.isEnabledFor(logging.DEBUG): 370 self.context.build_data_docs(resource_identifiers=[suite_identifier]) 371 # Webpage DataDocs opened here: 372 # self.context.open_data_docs(resource_identifier=suite_identifier) 373 374 def add_expectation( 375 self, 376 rule: str, 377 args: Dict, 378 meta: Dict, 379 validation_expectation: Dict, 380 ): 381 """ 382 Purpose: 383 Add individual expectation for a rule to the suite 384 Input: 385 rule: 386 validation rule 387 args: 388 dict of arguments specifying expectation behavior 389 meta: 390 dict of additional information for each expectation 391 validation_expectation: 392 dictionary to map between rules and expectations 393 Returns: 394 adds expectation to self.suite 395 396 """ 397 # Create an Expectation 398 expectation_configuration = ExpectationConfiguration( 399 # Name of expectation type being added 400 expectation_type=VALIDATION_EXPECTATION[rule.split(" ")[0]], 401 # add arguments and meta message 402 kwargs={**args}, 403 meta={**meta}, 404 ) 405 # Add the Expectation to the suite 406 self.suite.add_expectation(expectation_configuration=expectation_configuration) 407 408 def build_checkpoint(self): 409 """ 410 Purpose: 411 Build checkpoint to validate manifest 412 Input: 413 Returns: 414 adds checkpoint to self 415 """ 416 # create manifest checkpoint 417 self.checkpoint_name = f"manifest_checkpoint_{uuid.uuid4()}" 418 checkpoint_config = { 419 "name": self.checkpoint_name, 420 "config_version": 1, 421 "class_name": "SimpleCheckpoint", 422 "validations": [ 423 { 424 "batch_request": { 425 "datasource_name": "example_datasource", 426 "data_connector_name": "default_runtime_data_connector_name", 427 "data_asset_name": "Manifest", 428 }, 429 "expectation_suite_name": self.expectation_suite_name, 430 } 431 ], 432 } 433 434 # self.context.test_yaml_config(yaml.dump(checkpoint_config),return_mode="report_object") 435 self.context.add_checkpoint(**checkpoint_config) 436 437 def generate_errors( 438 self, 439 validation_results: Dict, 440 validation_types: Dict, 441 errors: List, 442 warnings: List, 443 dmge: DataModelGraphExplorer, 444 ): 445 """ 446 Purpose: 447 Parse results dictionary and generate errors for expectations 448 Input: 449 validation_results: 450 dictionary of results for each expectation 451 validation_types: 452 dict of types of errors to generate for each validation rule 453 errors: 454 list of errors 455 warnings: 456 list of warnings 457 Returns: 458 errors: 459 list of errors 460 warnings: 461 list of warnings 462 self.manifest: 463 manifest, possibly updated (censored ages) 464 """ 465 466 type_dict = { 467 "float64": float, 468 "int64": int, 469 "str": str, 470 } 471 for result_dict in validation_results[0]["results"]: 472 indices = [] 473 values = [] 474 475 # if the expectaion failed, get infromation to generate error message 476 if not result_dict["success"]: 477 errColumn = result_dict["expectation_config"]["kwargs"]["column"] 478 rule = result_dict["expectation_config"]["meta"]["validation_rule"] 479 480 if ( 481 "exception_info" in result_dict.keys() 482 # This changes in 0.18.x of GE, details on this: 483 # https://docs.greatexpectations.io/docs/0.18/reference/learn/terms/validation_result/ 484 and result_dict["exception_info"]["exception_message"] 485 ): 486 raise GreatExpectationsError( 487 result_dict["exception_info"]["exception_traceback"] 488 ) 489 490 # only some expectations explicitly list unexpected values and indices, read or find if not present 491 elif "unexpected_index_list" in result_dict["result"]: 492 indices = result_dict["result"]["unexpected_index_list"] 493 values = result_dict["result"]["unexpected_list"] 494 495 # Technically, this shouldn't ever happen, but will keep as a failsafe in case many things go wrong 496 # because type validation is column aggregate expectation and not column map expectation when columns are not of object type, 497 # indices and values cannot be returned 498 else: 499 # This changes in 0.17.x of GE, refactored code: 500 # for i, item in enumerate(self.manifest[errColumn]): 501 # observed_type = result_dict.get("result", {}).get("observed_value", None) 502 # is_instance_type = observed_type is not None and isinstance( 503 # item, type_dict[observed_type] 504 # ) 505 # indices.append(i) if is_instance_type else indices 506 # values.append(item) if is_instance_type else values 507 for i, item in enumerate(self.manifest[errColumn]): 508 observed_type = result_dict["result"]["observed_value"] 509 indices.append(i) if isinstance( 510 item, type_dict[observed_type] 511 ) else indices 512 values.append(item) if isinstance( 513 item, type_dict[observed_type] 514 ) else values 515 516 # call functions to generate error messages and add to error list 517 if validation_types[rule.split(" ")[0]]["type"] == "type_validation": 518 for row, value in zip(indices, values): 519 vr_errors, vr_warnings = GenerateError.generate_type_error( 520 val_rule=rule, 521 row_num=str(row + 2), 522 attribute_name=errColumn, 523 invalid_entry=str(value), 524 dmge=dmge, 525 ) 526 if vr_errors: 527 errors.append(vr_errors) 528 if vr_warnings: 529 warnings.append(vr_warnings) 530 elif validation_types[rule.split(" ")[0]]["type"] == "regex_validation": 531 expression = result_dict["expectation_config"]["kwargs"]["regex"] 532 for row, value in zip(indices, values): 533 vr_errors, vr_warnings = GenerateError.generate_regex_error( 534 val_rule=rule, 535 reg_expression=expression, 536 row_num=str(row + 2), 537 module_to_call="match", 538 attribute_name=errColumn, 539 invalid_entry=value, 540 dmge=dmge, 541 ) 542 if vr_errors: 543 errors.append(vr_errors) 544 if vr_warnings: 545 warnings.append(vr_warnings) 546 elif ( 547 validation_types[rule.split(" ")[0]]["type"] == "content_validation" 548 ): 549 vr_errors, vr_warnings = GenerateError.generate_content_error( 550 val_rule=rule, 551 attribute_name=errColumn, 552 row_num=np_array_to_str_list(np.array(indices) + 2), 553 invalid_entry=iterable_to_str_list(values), 554 dmge=self.dmge, 555 ) 556 if vr_errors: 557 errors.append(vr_errors) 558 if rule.startswith("protectAges"): 559 self.censor_ages(vr_errors, errColumn) 560 561 if vr_warnings: 562 warnings.append(vr_warnings) 563 if rule.startswith("protectAges"): 564 self.censor_ages(vr_warnings, errColumn) 565 566 return errors, warnings 567 568 def get_age_limits( 569 self, 570 ): 571 """ 572 Purpose: 573 Get boundaries of ages that need to be censored for different age formats 574 Input: 575 Returns: 576 min_age: 577 minimum age that will not be censored 578 max age: 579 maximum age that will not be censored 580 581 """ 582 583 min_age = 6550 # days 584 max_age = 32849 # days 585 586 return min_age, max_age 587 588 def censor_ages( 589 self, 590 message: List, 591 col: str, 592 ): 593 """ 594 Purpose: 595 Censor ages in manifest as appropriate 596 Input: 597 message: 598 error or warning message for age validation rule 599 col: 600 name of column containing ages 601 Returns: 602 updates self.manifest with censored ages 603 TODO: Speed up conversion from str list to int list 604 """ 605 censor_rows = [] 606 607 for row in message[0]: 608 censor_rows.append(int(row) - 2) 609 610 self.manifest.loc[censor_rows, (col)] = "age censored" 611 612 # update the manifest file, so that ages are censored 613 self.manifest.to_csv( 614 self.manifestPath.replace(".csv", "_censored.csv"), index=False 615 ) 616 logging.info("Sensitive ages have been censored.") 617 618 return
64class GreatExpectationsHelpers(object): 65 """ 66 Great Expectations helper class 67 68 Provides basic utilities to: 69 1) Create GE workflow specific to manifest according to validation rules 70 2) Parse results dict to generate appropriate errors 71 """ 72 73 def __init__(self, dmge, unimplemented_expectations, manifest, manifestPath): 74 """ 75 Purpose: 76 Instantiate a great expectations helpers object 77 Args: 78 dmge: 79 DataModelGraphExplorer Object 80 unimplemented_expectations: 81 dictionary of validation rules that currently do not have expectations developed 82 manifest: 83 manifest being validated 84 manifestPath: 85 path to manifest being validated 86 """ 87 self.unimplemented_expectations = unimplemented_expectations 88 self.dmge = dmge 89 self.manifest = manifest 90 self.manifestPath = manifestPath 91 92 @tracer.start_as_current_span("GreatExpectationsHelpers::build_context") 93 def build_context(self): 94 """ 95 Purpose: 96 Create a dataContext and datasource and add to object 97 Returns: 98 saves dataContext and datasource to self 99 """ 100 self.context = ge.get_context() 101 102 # create datasource configuration 103 datasource_config = { 104 "name": "example_datasource", 105 "class_name": "Datasource", 106 "module_name": "great_expectations.datasource", 107 "execution_engine": { 108 "module_name": "great_expectations.execution_engine", 109 "class_name": "PandasExecutionEngine", 110 }, 111 "data_connectors": { 112 "default_runtime_data_connector_name": { 113 "class_name": "RuntimeDataConnector", 114 "batch_identifiers": ["default_identifier_name"], 115 }, 116 }, 117 } 118 119 # Setting this to False prevents extra data from leaving schematic 120 anonymous_usage_statistics = AnonymizedUsageStatisticsConfig(enabled=False) 121 122 # create data context configuration 123 data_context_config = DataContextConfig( 124 datasources={ 125 "pandas": DatasourceConfig( 126 class_name="Datasource", 127 execution_engine={"class_name": "PandasExecutionEngine"}, 128 data_connectors={ 129 "default_runtime_data_connector_name": { 130 "class_name": "RuntimeDataConnector", 131 "batch_identifiers": ["default_identifier_name"], 132 } 133 }, 134 ) 135 }, 136 store_backend_defaults=FilesystemStoreBackendDefaults( 137 root_directory=os.path.join(os.getcwd(), "great_expectations") 138 ), 139 anonymous_usage_statistics=anonymous_usage_statistics, 140 ) 141 142 # build context and add data source 143 self.context = BaseDataContext(project_config=data_context_config) 144 # self.context.test_yaml_config(yaml.dump(datasource_config)) 145 self.context.add_datasource(**datasource_config) 146 147 @tracer.start_as_current_span( 148 "GreatExpectationsHelpers::add_expectation_suite_if_not_exists" 149 ) 150 def add_expectation_suite_if_not_exists(self) -> ExpectationSuite: 151 """ 152 Purpose: 153 Add expectation suite if it does not exist 154 Input: 155 Returns: 156 saves expectation suite and identifier to self 157 """ 158 self.expectation_suite_name = f"Manifest_test_suite_{uuid.uuid4()}" 159 expectation_suite = self.context.add_expectation_suite( 160 expectation_suite_name=self.expectation_suite_name, 161 ) 162 self.suite = expectation_suite 163 164 return self.suite 165 166 @tracer.start_as_current_span("GreatExpectationsHelpers::build_expectation_suite") 167 def build_expectation_suite( 168 self, 169 ) -> None: 170 """ 171 Purpose: 172 Construct an expectation suite to validate columns with rules that have expectations 173 Add suite to object 174 Input: 175 176 Returns: 177 saves expectation suite and identifier to self 178 179 """ 180 181 # create blank expectation suite 182 self.suite = self.add_expectation_suite_if_not_exists() 183 184 # build expectation configurations for each expectation 185 for col in self.manifest.columns: 186 args = {} 187 meta = {} 188 189 # remove trailing/leading whitespaces from manifest 190 self.manifest.map(lambda x: x.strip() if isinstance(x, str) else x) 191 192 validation_rules = self.dmge.get_node_validation_rules( 193 node_display_name=col 194 ) 195 196 # check if attribute has any rules associated with it 197 if validation_rules: 198 # Check if the validation rule applies to this manifest 199 if isinstance(validation_rules, dict): 200 validation_rules = extract_component_validation_rules( 201 manifest_component=self.manifest["Component"][0], 202 validation_rules_dict=validation_rules, 203 ) 204 # iterate through all validation rules for an attribute 205 for rule in validation_rules: 206 base_rule = rule.split(" ")[0] 207 208 # check if rule has an implemented expectation 209 if rule_in_rule_list( 210 rule, self.unimplemented_expectations 211 ) or required_is_only_rule( 212 rule=rule, 213 attribute=col, 214 rule_modifiers=RULE_MODIFIERS, 215 validation_expectation=VALIDATION_EXPECTATION, 216 ): 217 continue 218 219 args["column"] = col 220 args["result_format"] = "COMPLETE" 221 222 # Validate num 223 if base_rule == "num": 224 args["mostly"] = 1.0 225 args["type_list"] = ["int", "int64", "float", "float64"] 226 meta = { 227 "notes": { 228 "format": "markdown", 229 "content": "Expect column values to be of int or float type. **Markdown** `Supported`", 230 }, 231 "validation_rule": rule, 232 } 233 234 # Validate float 235 elif base_rule == "float": 236 args["mostly"] = 1.0 237 args["type_list"] = ["float", "float64"] 238 meta = { 239 "notes": { 240 "format": "markdown", 241 "content": "Expect column values to be of float type. **Markdown** `Supported`", 242 }, 243 "validation_rule": rule, 244 } 245 246 # Validate int 247 elif base_rule == "int": 248 args["mostly"] = 1.0 249 args["type_list"] = ["int", "int64"] 250 meta = { 251 "notes": { 252 "format": "markdown", 253 "content": "Expect column values to be of int type. **Markdown** `Supported`", 254 }, 255 "validation_rule": rule, 256 } 257 258 # Validate string 259 elif base_rule == "str": 260 args["mostly"] = 1.0 261 args["type_"] = "str" 262 meta = { 263 "notes": { 264 "format": "markdown", 265 "content": "Expect column values to be of string type. **Markdown** `Supported`", 266 }, 267 "validation_rule": rule, 268 } 269 270 # Validate date 271 elif base_rule == "date": 272 args["mostly"] = 1.0 273 meta = { 274 "notes": { 275 "format": "markdown", 276 "content": ( 277 "Expect column values to be parsable by dateutils. " 278 "**Markdown** `Supported`" 279 ), 280 }, 281 "validation_rule": rule, 282 } 283 284 elif base_rule == ("recommended"): 285 args["mostly"] = 0.0000000001 286 meta = { 287 "notes": { 288 "format": "markdown", 289 "content": "Expect column to not be empty. **Markdown** `Supported`", 290 }, 291 "validation_rule": rule, 292 } 293 294 elif base_rule == ("protectAges"): 295 # Function to convert to different age limit formats 296 min_age, max_age = self.get_age_limits() 297 298 args["mostly"] = 1.0 299 args["min_value"] = min_age 300 args["max_value"] = max_age 301 # args['allow_cross_type_comparisons']=True # TODO Can allow after issue #980 is completed 302 meta = { 303 "notes": { 304 "format": "markdown", 305 "content": "Expect ages to be between 18 years (6,570 days) and 90 years (32,850 days) of age. **Markdown** `Supported`", 306 }, 307 "validation_rule": rule, 308 } 309 310 elif base_rule == ("unique"): 311 args["mostly"] = 1.0 312 meta = { 313 "notes": { 314 "format": "markdown", 315 "content": "Expect column values to be Unique. **Markdown** `Supported`", 316 }, 317 "validation_rule": rule, 318 } 319 320 elif base_rule == ("inRange"): 321 args["mostly"] = 1.0 322 args["min_value"] = ( 323 float(rule.split(" ")[1]) 324 if rule.split(" ")[1].lower() != "none" 325 else None 326 ) 327 args["max_value"] = ( 328 float(rule.split(" ")[2]) 329 if rule.split(" ")[2].lower() != "none" 330 else None 331 ) 332 args[ 333 "allow_cross_type_comparisons" 334 ] = True # TODO Should follow up with issue #980 335 meta = { 336 "notes": { 337 "format": "markdown", 338 "content": "Expect column values to be within a specified range. **Markdown** `Supported`", 339 }, 340 "validation_rule": rule, 341 } 342 343 elif base_rule == ("IsNA"): 344 args["mostly"] = 1.0 345 args["regex_list"] = ["Not Applicable"] 346 meta = { 347 "notes": { 348 "format": "markdown", 349 "content": "Expect column values to be marked Not Applicable. **Markdown** `Supported`", 350 }, 351 "validation_rule": rule, 352 } 353 354 # add expectation for attribute to suite 355 self.add_expectation( 356 rule=rule, 357 args=args, 358 meta=meta, 359 validation_expectation=VALIDATION_EXPECTATION, 360 ) 361 362 self.context.update_expectation_suite( 363 expectation_suite=self.suite, 364 ) 365 366 suite_identifier = ExpectationSuiteIdentifier( 367 expectation_suite_name=self.expectation_suite_name 368 ) 369 370 if logger.isEnabledFor(logging.DEBUG): 371 self.context.build_data_docs(resource_identifiers=[suite_identifier]) 372 # Webpage DataDocs opened here: 373 # self.context.open_data_docs(resource_identifier=suite_identifier) 374 375 def add_expectation( 376 self, 377 rule: str, 378 args: Dict, 379 meta: Dict, 380 validation_expectation: Dict, 381 ): 382 """ 383 Purpose: 384 Add individual expectation for a rule to the suite 385 Input: 386 rule: 387 validation rule 388 args: 389 dict of arguments specifying expectation behavior 390 meta: 391 dict of additional information for each expectation 392 validation_expectation: 393 dictionary to map between rules and expectations 394 Returns: 395 adds expectation to self.suite 396 397 """ 398 # Create an Expectation 399 expectation_configuration = ExpectationConfiguration( 400 # Name of expectation type being added 401 expectation_type=VALIDATION_EXPECTATION[rule.split(" ")[0]], 402 # add arguments and meta message 403 kwargs={**args}, 404 meta={**meta}, 405 ) 406 # Add the Expectation to the suite 407 self.suite.add_expectation(expectation_configuration=expectation_configuration) 408 409 def build_checkpoint(self): 410 """ 411 Purpose: 412 Build checkpoint to validate manifest 413 Input: 414 Returns: 415 adds checkpoint to self 416 """ 417 # create manifest checkpoint 418 self.checkpoint_name = f"manifest_checkpoint_{uuid.uuid4()}" 419 checkpoint_config = { 420 "name": self.checkpoint_name, 421 "config_version": 1, 422 "class_name": "SimpleCheckpoint", 423 "validations": [ 424 { 425 "batch_request": { 426 "datasource_name": "example_datasource", 427 "data_connector_name": "default_runtime_data_connector_name", 428 "data_asset_name": "Manifest", 429 }, 430 "expectation_suite_name": self.expectation_suite_name, 431 } 432 ], 433 } 434 435 # self.context.test_yaml_config(yaml.dump(checkpoint_config),return_mode="report_object") 436 self.context.add_checkpoint(**checkpoint_config) 437 438 def generate_errors( 439 self, 440 validation_results: Dict, 441 validation_types: Dict, 442 errors: List, 443 warnings: List, 444 dmge: DataModelGraphExplorer, 445 ): 446 """ 447 Purpose: 448 Parse results dictionary and generate errors for expectations 449 Input: 450 validation_results: 451 dictionary of results for each expectation 452 validation_types: 453 dict of types of errors to generate for each validation rule 454 errors: 455 list of errors 456 warnings: 457 list of warnings 458 Returns: 459 errors: 460 list of errors 461 warnings: 462 list of warnings 463 self.manifest: 464 manifest, possibly updated (censored ages) 465 """ 466 467 type_dict = { 468 "float64": float, 469 "int64": int, 470 "str": str, 471 } 472 for result_dict in validation_results[0]["results"]: 473 indices = [] 474 values = [] 475 476 # if the expectaion failed, get infromation to generate error message 477 if not result_dict["success"]: 478 errColumn = result_dict["expectation_config"]["kwargs"]["column"] 479 rule = result_dict["expectation_config"]["meta"]["validation_rule"] 480 481 if ( 482 "exception_info" in result_dict.keys() 483 # This changes in 0.18.x of GE, details on this: 484 # https://docs.greatexpectations.io/docs/0.18/reference/learn/terms/validation_result/ 485 and result_dict["exception_info"]["exception_message"] 486 ): 487 raise GreatExpectationsError( 488 result_dict["exception_info"]["exception_traceback"] 489 ) 490 491 # only some expectations explicitly list unexpected values and indices, read or find if not present 492 elif "unexpected_index_list" in result_dict["result"]: 493 indices = result_dict["result"]["unexpected_index_list"] 494 values = result_dict["result"]["unexpected_list"] 495 496 # Technically, this shouldn't ever happen, but will keep as a failsafe in case many things go wrong 497 # because type validation is column aggregate expectation and not column map expectation when columns are not of object type, 498 # indices and values cannot be returned 499 else: 500 # This changes in 0.17.x of GE, refactored code: 501 # for i, item in enumerate(self.manifest[errColumn]): 502 # observed_type = result_dict.get("result", {}).get("observed_value", None) 503 # is_instance_type = observed_type is not None and isinstance( 504 # item, type_dict[observed_type] 505 # ) 506 # indices.append(i) if is_instance_type else indices 507 # values.append(item) if is_instance_type else values 508 for i, item in enumerate(self.manifest[errColumn]): 509 observed_type = result_dict["result"]["observed_value"] 510 indices.append(i) if isinstance( 511 item, type_dict[observed_type] 512 ) else indices 513 values.append(item) if isinstance( 514 item, type_dict[observed_type] 515 ) else values 516 517 # call functions to generate error messages and add to error list 518 if validation_types[rule.split(" ")[0]]["type"] == "type_validation": 519 for row, value in zip(indices, values): 520 vr_errors, vr_warnings = GenerateError.generate_type_error( 521 val_rule=rule, 522 row_num=str(row + 2), 523 attribute_name=errColumn, 524 invalid_entry=str(value), 525 dmge=dmge, 526 ) 527 if vr_errors: 528 errors.append(vr_errors) 529 if vr_warnings: 530 warnings.append(vr_warnings) 531 elif validation_types[rule.split(" ")[0]]["type"] == "regex_validation": 532 expression = result_dict["expectation_config"]["kwargs"]["regex"] 533 for row, value in zip(indices, values): 534 vr_errors, vr_warnings = GenerateError.generate_regex_error( 535 val_rule=rule, 536 reg_expression=expression, 537 row_num=str(row + 2), 538 module_to_call="match", 539 attribute_name=errColumn, 540 invalid_entry=value, 541 dmge=dmge, 542 ) 543 if vr_errors: 544 errors.append(vr_errors) 545 if vr_warnings: 546 warnings.append(vr_warnings) 547 elif ( 548 validation_types[rule.split(" ")[0]]["type"] == "content_validation" 549 ): 550 vr_errors, vr_warnings = GenerateError.generate_content_error( 551 val_rule=rule, 552 attribute_name=errColumn, 553 row_num=np_array_to_str_list(np.array(indices) + 2), 554 invalid_entry=iterable_to_str_list(values), 555 dmge=self.dmge, 556 ) 557 if vr_errors: 558 errors.append(vr_errors) 559 if rule.startswith("protectAges"): 560 self.censor_ages(vr_errors, errColumn) 561 562 if vr_warnings: 563 warnings.append(vr_warnings) 564 if rule.startswith("protectAges"): 565 self.censor_ages(vr_warnings, errColumn) 566 567 return errors, warnings 568 569 def get_age_limits( 570 self, 571 ): 572 """ 573 Purpose: 574 Get boundaries of ages that need to be censored for different age formats 575 Input: 576 Returns: 577 min_age: 578 minimum age that will not be censored 579 max age: 580 maximum age that will not be censored 581 582 """ 583 584 min_age = 6550 # days 585 max_age = 32849 # days 586 587 return min_age, max_age 588 589 def censor_ages( 590 self, 591 message: List, 592 col: str, 593 ): 594 """ 595 Purpose: 596 Censor ages in manifest as appropriate 597 Input: 598 message: 599 error or warning message for age validation rule 600 col: 601 name of column containing ages 602 Returns: 603 updates self.manifest with censored ages 604 TODO: Speed up conversion from str list to int list 605 """ 606 censor_rows = [] 607 608 for row in message[0]: 609 censor_rows.append(int(row) - 2) 610 611 self.manifest.loc[censor_rows, (col)] = "age censored" 612 613 # update the manifest file, so that ages are censored 614 self.manifest.to_csv( 615 self.manifestPath.replace(".csv", "_censored.csv"), index=False 616 ) 617 logging.info("Sensitive ages have been censored.") 618 619 return
Great Expectations helper class
Provides basic utilities to:
1) Create GE workflow specific to manifest according to validation rules 2) Parse results dict to generate appropriate errors
73 def __init__(self, dmge, unimplemented_expectations, manifest, manifestPath): 74 """ 75 Purpose: 76 Instantiate a great expectations helpers object 77 Args: 78 dmge: 79 DataModelGraphExplorer Object 80 unimplemented_expectations: 81 dictionary of validation rules that currently do not have expectations developed 82 manifest: 83 manifest being validated 84 manifestPath: 85 path to manifest being validated 86 """ 87 self.unimplemented_expectations = unimplemented_expectations 88 self.dmge = dmge 89 self.manifest = manifest 90 self.manifestPath = manifestPath
Purpose:
Instantiate a great expectations helpers object
Arguments:
- dmge: DataModelGraphExplorer Object
- unimplemented_expectations: dictionary of validation rules that currently do not have expectations developed
- manifest: manifest being validated
- manifestPath: path to manifest being validated
92 @tracer.start_as_current_span("GreatExpectationsHelpers::build_context") 93 def build_context(self): 94 """ 95 Purpose: 96 Create a dataContext and datasource and add to object 97 Returns: 98 saves dataContext and datasource to self 99 """ 100 self.context = ge.get_context() 101 102 # create datasource configuration 103 datasource_config = { 104 "name": "example_datasource", 105 "class_name": "Datasource", 106 "module_name": "great_expectations.datasource", 107 "execution_engine": { 108 "module_name": "great_expectations.execution_engine", 109 "class_name": "PandasExecutionEngine", 110 }, 111 "data_connectors": { 112 "default_runtime_data_connector_name": { 113 "class_name": "RuntimeDataConnector", 114 "batch_identifiers": ["default_identifier_name"], 115 }, 116 }, 117 } 118 119 # Setting this to False prevents extra data from leaving schematic 120 anonymous_usage_statistics = AnonymizedUsageStatisticsConfig(enabled=False) 121 122 # create data context configuration 123 data_context_config = DataContextConfig( 124 datasources={ 125 "pandas": DatasourceConfig( 126 class_name="Datasource", 127 execution_engine={"class_name": "PandasExecutionEngine"}, 128 data_connectors={ 129 "default_runtime_data_connector_name": { 130 "class_name": "RuntimeDataConnector", 131 "batch_identifiers": ["default_identifier_name"], 132 } 133 }, 134 ) 135 }, 136 store_backend_defaults=FilesystemStoreBackendDefaults( 137 root_directory=os.path.join(os.getcwd(), "great_expectations") 138 ), 139 anonymous_usage_statistics=anonymous_usage_statistics, 140 ) 141 142 # build context and add data source 143 self.context = BaseDataContext(project_config=data_context_config) 144 # self.context.test_yaml_config(yaml.dump(datasource_config)) 145 self.context.add_datasource(**datasource_config)
Purpose:
Create a dataContext and datasource and add to object
Returns:
saves dataContext and datasource to self
147 @tracer.start_as_current_span( 148 "GreatExpectationsHelpers::add_expectation_suite_if_not_exists" 149 ) 150 def add_expectation_suite_if_not_exists(self) -> ExpectationSuite: 151 """ 152 Purpose: 153 Add expectation suite if it does not exist 154 Input: 155 Returns: 156 saves expectation suite and identifier to self 157 """ 158 self.expectation_suite_name = f"Manifest_test_suite_{uuid.uuid4()}" 159 expectation_suite = self.context.add_expectation_suite( 160 expectation_suite_name=self.expectation_suite_name, 161 ) 162 self.suite = expectation_suite 163 164 return self.suite
Purpose:
Add expectation suite if it does not exist
Input:
Returns:
saves expectation suite and identifier to self
166 @tracer.start_as_current_span("GreatExpectationsHelpers::build_expectation_suite") 167 def build_expectation_suite( 168 self, 169 ) -> None: 170 """ 171 Purpose: 172 Construct an expectation suite to validate columns with rules that have expectations 173 Add suite to object 174 Input: 175 176 Returns: 177 saves expectation suite and identifier to self 178 179 """ 180 181 # create blank expectation suite 182 self.suite = self.add_expectation_suite_if_not_exists() 183 184 # build expectation configurations for each expectation 185 for col in self.manifest.columns: 186 args = {} 187 meta = {} 188 189 # remove trailing/leading whitespaces from manifest 190 self.manifest.map(lambda x: x.strip() if isinstance(x, str) else x) 191 192 validation_rules = self.dmge.get_node_validation_rules( 193 node_display_name=col 194 ) 195 196 # check if attribute has any rules associated with it 197 if validation_rules: 198 # Check if the validation rule applies to this manifest 199 if isinstance(validation_rules, dict): 200 validation_rules = extract_component_validation_rules( 201 manifest_component=self.manifest["Component"][0], 202 validation_rules_dict=validation_rules, 203 ) 204 # iterate through all validation rules for an attribute 205 for rule in validation_rules: 206 base_rule = rule.split(" ")[0] 207 208 # check if rule has an implemented expectation 209 if rule_in_rule_list( 210 rule, self.unimplemented_expectations 211 ) or required_is_only_rule( 212 rule=rule, 213 attribute=col, 214 rule_modifiers=RULE_MODIFIERS, 215 validation_expectation=VALIDATION_EXPECTATION, 216 ): 217 continue 218 219 args["column"] = col 220 args["result_format"] = "COMPLETE" 221 222 # Validate num 223 if base_rule == "num": 224 args["mostly"] = 1.0 225 args["type_list"] = ["int", "int64", "float", "float64"] 226 meta = { 227 "notes": { 228 "format": "markdown", 229 "content": "Expect column values to be of int or float type. **Markdown** `Supported`", 230 }, 231 "validation_rule": rule, 232 } 233 234 # Validate float 235 elif base_rule == "float": 236 args["mostly"] = 1.0 237 args["type_list"] = ["float", "float64"] 238 meta = { 239 "notes": { 240 "format": "markdown", 241 "content": "Expect column values to be of float type. **Markdown** `Supported`", 242 }, 243 "validation_rule": rule, 244 } 245 246 # Validate int 247 elif base_rule == "int": 248 args["mostly"] = 1.0 249 args["type_list"] = ["int", "int64"] 250 meta = { 251 "notes": { 252 "format": "markdown", 253 "content": "Expect column values to be of int type. **Markdown** `Supported`", 254 }, 255 "validation_rule": rule, 256 } 257 258 # Validate string 259 elif base_rule == "str": 260 args["mostly"] = 1.0 261 args["type_"] = "str" 262 meta = { 263 "notes": { 264 "format": "markdown", 265 "content": "Expect column values to be of string type. **Markdown** `Supported`", 266 }, 267 "validation_rule": rule, 268 } 269 270 # Validate date 271 elif base_rule == "date": 272 args["mostly"] = 1.0 273 meta = { 274 "notes": { 275 "format": "markdown", 276 "content": ( 277 "Expect column values to be parsable by dateutils. " 278 "**Markdown** `Supported`" 279 ), 280 }, 281 "validation_rule": rule, 282 } 283 284 elif base_rule == ("recommended"): 285 args["mostly"] = 0.0000000001 286 meta = { 287 "notes": { 288 "format": "markdown", 289 "content": "Expect column to not be empty. **Markdown** `Supported`", 290 }, 291 "validation_rule": rule, 292 } 293 294 elif base_rule == ("protectAges"): 295 # Function to convert to different age limit formats 296 min_age, max_age = self.get_age_limits() 297 298 args["mostly"] = 1.0 299 args["min_value"] = min_age 300 args["max_value"] = max_age 301 # args['allow_cross_type_comparisons']=True # TODO Can allow after issue #980 is completed 302 meta = { 303 "notes": { 304 "format": "markdown", 305 "content": "Expect ages to be between 18 years (6,570 days) and 90 years (32,850 days) of age. **Markdown** `Supported`", 306 }, 307 "validation_rule": rule, 308 } 309 310 elif base_rule == ("unique"): 311 args["mostly"] = 1.0 312 meta = { 313 "notes": { 314 "format": "markdown", 315 "content": "Expect column values to be Unique. **Markdown** `Supported`", 316 }, 317 "validation_rule": rule, 318 } 319 320 elif base_rule == ("inRange"): 321 args["mostly"] = 1.0 322 args["min_value"] = ( 323 float(rule.split(" ")[1]) 324 if rule.split(" ")[1].lower() != "none" 325 else None 326 ) 327 args["max_value"] = ( 328 float(rule.split(" ")[2]) 329 if rule.split(" ")[2].lower() != "none" 330 else None 331 ) 332 args[ 333 "allow_cross_type_comparisons" 334 ] = True # TODO Should follow up with issue #980 335 meta = { 336 "notes": { 337 "format": "markdown", 338 "content": "Expect column values to be within a specified range. **Markdown** `Supported`", 339 }, 340 "validation_rule": rule, 341 } 342 343 elif base_rule == ("IsNA"): 344 args["mostly"] = 1.0 345 args["regex_list"] = ["Not Applicable"] 346 meta = { 347 "notes": { 348 "format": "markdown", 349 "content": "Expect column values to be marked Not Applicable. **Markdown** `Supported`", 350 }, 351 "validation_rule": rule, 352 } 353 354 # add expectation for attribute to suite 355 self.add_expectation( 356 rule=rule, 357 args=args, 358 meta=meta, 359 validation_expectation=VALIDATION_EXPECTATION, 360 ) 361 362 self.context.update_expectation_suite( 363 expectation_suite=self.suite, 364 ) 365 366 suite_identifier = ExpectationSuiteIdentifier( 367 expectation_suite_name=self.expectation_suite_name 368 ) 369 370 if logger.isEnabledFor(logging.DEBUG): 371 self.context.build_data_docs(resource_identifiers=[suite_identifier]) 372 # Webpage DataDocs opened here: 373 # self.context.open_data_docs(resource_identifier=suite_identifier)
Purpose:
Construct an expectation suite to validate columns with rules that have expectations Add suite to object
Input:
Returns:
saves expectation suite and identifier to self
375 def add_expectation( 376 self, 377 rule: str, 378 args: Dict, 379 meta: Dict, 380 validation_expectation: Dict, 381 ): 382 """ 383 Purpose: 384 Add individual expectation for a rule to the suite 385 Input: 386 rule: 387 validation rule 388 args: 389 dict of arguments specifying expectation behavior 390 meta: 391 dict of additional information for each expectation 392 validation_expectation: 393 dictionary to map between rules and expectations 394 Returns: 395 adds expectation to self.suite 396 397 """ 398 # Create an Expectation 399 expectation_configuration = ExpectationConfiguration( 400 # Name of expectation type being added 401 expectation_type=VALIDATION_EXPECTATION[rule.split(" ")[0]], 402 # add arguments and meta message 403 kwargs={**args}, 404 meta={**meta}, 405 ) 406 # Add the Expectation to the suite 407 self.suite.add_expectation(expectation_configuration=expectation_configuration)
Purpose:
Add individual expectation for a rule to the suite
Input:
rule: validation rule args: dict of arguments specifying expectation behavior meta: dict of additional information for each expectation validation_expectation: dictionary to map between rules and expectations
Returns:
adds expectation to self.suite
409 def build_checkpoint(self): 410 """ 411 Purpose: 412 Build checkpoint to validate manifest 413 Input: 414 Returns: 415 adds checkpoint to self 416 """ 417 # create manifest checkpoint 418 self.checkpoint_name = f"manifest_checkpoint_{uuid.uuid4()}" 419 checkpoint_config = { 420 "name": self.checkpoint_name, 421 "config_version": 1, 422 "class_name": "SimpleCheckpoint", 423 "validations": [ 424 { 425 "batch_request": { 426 "datasource_name": "example_datasource", 427 "data_connector_name": "default_runtime_data_connector_name", 428 "data_asset_name": "Manifest", 429 }, 430 "expectation_suite_name": self.expectation_suite_name, 431 } 432 ], 433 } 434 435 # self.context.test_yaml_config(yaml.dump(checkpoint_config),return_mode="report_object") 436 self.context.add_checkpoint(**checkpoint_config)
Purpose:
Build checkpoint to validate manifest
Input:
Returns:
adds checkpoint to self
438 def generate_errors( 439 self, 440 validation_results: Dict, 441 validation_types: Dict, 442 errors: List, 443 warnings: List, 444 dmge: DataModelGraphExplorer, 445 ): 446 """ 447 Purpose: 448 Parse results dictionary and generate errors for expectations 449 Input: 450 validation_results: 451 dictionary of results for each expectation 452 validation_types: 453 dict of types of errors to generate for each validation rule 454 errors: 455 list of errors 456 warnings: 457 list of warnings 458 Returns: 459 errors: 460 list of errors 461 warnings: 462 list of warnings 463 self.manifest: 464 manifest, possibly updated (censored ages) 465 """ 466 467 type_dict = { 468 "float64": float, 469 "int64": int, 470 "str": str, 471 } 472 for result_dict in validation_results[0]["results"]: 473 indices = [] 474 values = [] 475 476 # if the expectaion failed, get infromation to generate error message 477 if not result_dict["success"]: 478 errColumn = result_dict["expectation_config"]["kwargs"]["column"] 479 rule = result_dict["expectation_config"]["meta"]["validation_rule"] 480 481 if ( 482 "exception_info" in result_dict.keys() 483 # This changes in 0.18.x of GE, details on this: 484 # https://docs.greatexpectations.io/docs/0.18/reference/learn/terms/validation_result/ 485 and result_dict["exception_info"]["exception_message"] 486 ): 487 raise GreatExpectationsError( 488 result_dict["exception_info"]["exception_traceback"] 489 ) 490 491 # only some expectations explicitly list unexpected values and indices, read or find if not present 492 elif "unexpected_index_list" in result_dict["result"]: 493 indices = result_dict["result"]["unexpected_index_list"] 494 values = result_dict["result"]["unexpected_list"] 495 496 # Technically, this shouldn't ever happen, but will keep as a failsafe in case many things go wrong 497 # because type validation is column aggregate expectation and not column map expectation when columns are not of object type, 498 # indices and values cannot be returned 499 else: 500 # This changes in 0.17.x of GE, refactored code: 501 # for i, item in enumerate(self.manifest[errColumn]): 502 # observed_type = result_dict.get("result", {}).get("observed_value", None) 503 # is_instance_type = observed_type is not None and isinstance( 504 # item, type_dict[observed_type] 505 # ) 506 # indices.append(i) if is_instance_type else indices 507 # values.append(item) if is_instance_type else values 508 for i, item in enumerate(self.manifest[errColumn]): 509 observed_type = result_dict["result"]["observed_value"] 510 indices.append(i) if isinstance( 511 item, type_dict[observed_type] 512 ) else indices 513 values.append(item) if isinstance( 514 item, type_dict[observed_type] 515 ) else values 516 517 # call functions to generate error messages and add to error list 518 if validation_types[rule.split(" ")[0]]["type"] == "type_validation": 519 for row, value in zip(indices, values): 520 vr_errors, vr_warnings = GenerateError.generate_type_error( 521 val_rule=rule, 522 row_num=str(row + 2), 523 attribute_name=errColumn, 524 invalid_entry=str(value), 525 dmge=dmge, 526 ) 527 if vr_errors: 528 errors.append(vr_errors) 529 if vr_warnings: 530 warnings.append(vr_warnings) 531 elif validation_types[rule.split(" ")[0]]["type"] == "regex_validation": 532 expression = result_dict["expectation_config"]["kwargs"]["regex"] 533 for row, value in zip(indices, values): 534 vr_errors, vr_warnings = GenerateError.generate_regex_error( 535 val_rule=rule, 536 reg_expression=expression, 537 row_num=str(row + 2), 538 module_to_call="match", 539 attribute_name=errColumn, 540 invalid_entry=value, 541 dmge=dmge, 542 ) 543 if vr_errors: 544 errors.append(vr_errors) 545 if vr_warnings: 546 warnings.append(vr_warnings) 547 elif ( 548 validation_types[rule.split(" ")[0]]["type"] == "content_validation" 549 ): 550 vr_errors, vr_warnings = GenerateError.generate_content_error( 551 val_rule=rule, 552 attribute_name=errColumn, 553 row_num=np_array_to_str_list(np.array(indices) + 2), 554 invalid_entry=iterable_to_str_list(values), 555 dmge=self.dmge, 556 ) 557 if vr_errors: 558 errors.append(vr_errors) 559 if rule.startswith("protectAges"): 560 self.censor_ages(vr_errors, errColumn) 561 562 if vr_warnings: 563 warnings.append(vr_warnings) 564 if rule.startswith("protectAges"): 565 self.censor_ages(vr_warnings, errColumn) 566 567 return errors, warnings
Purpose:
Parse results dictionary and generate errors for expectations
Input:
validation_results: dictionary of results for each expectation validation_types: dict of types of errors to generate for each validation rule errors: list of errors warnings: list of warnings
Returns:
errors: list of errors warnings: list of warnings self.manifest: manifest, possibly updated (censored ages)
569 def get_age_limits( 570 self, 571 ): 572 """ 573 Purpose: 574 Get boundaries of ages that need to be censored for different age formats 575 Input: 576 Returns: 577 min_age: 578 minimum age that will not be censored 579 max age: 580 maximum age that will not be censored 581 582 """ 583 584 min_age = 6550 # days 585 max_age = 32849 # days 586 587 return min_age, max_age
Purpose:
Get boundaries of ages that need to be censored for different age formats
Input:
Returns:
min_age: minimum age that will not be censored max age: maximum age that will not be censored
589 def censor_ages( 590 self, 591 message: List, 592 col: str, 593 ): 594 """ 595 Purpose: 596 Censor ages in manifest as appropriate 597 Input: 598 message: 599 error or warning message for age validation rule 600 col: 601 name of column containing ages 602 Returns: 603 updates self.manifest with censored ages 604 TODO: Speed up conversion from str list to int list 605 """ 606 censor_rows = [] 607 608 for row in message[0]: 609 censor_rows.append(int(row) - 2) 610 611 self.manifest.loc[censor_rows, (col)] = "age censored" 612 613 # update the manifest file, so that ages are censored 614 self.manifest.to_csv( 615 self.manifestPath.replace(".csv", "_censored.csv"), index=False 616 ) 617 logging.info("Sensitive ages have been censored.") 618 619 return
Purpose:
Censor ages in manifest as appropriate
Input:
message: error or warning message for age validation rule col: name of column containing ages
Returns:
updates self.manifest with censored ages
TODO: Speed up conversion from str list to int list