schematic.models.GE_Helpers

  1import logging
  2import os
  3import uuid
  4
  5# allows specifying explicit variable types
  6from typing import Dict, List
  7
  8import numpy as np
  9from great_expectations.core import ExpectationSuite
 10from great_expectations.core.expectation_configuration import ExpectationConfiguration
 11from great_expectations.data_context import BaseDataContext
 12from great_expectations.data_context.types.base import (
 13    AnonymizedUsageStatisticsConfig,
 14    DataContextConfig,
 15    DatasourceConfig,
 16    FilesystemStoreBackendDefaults,
 17)
 18from great_expectations.data_context.types.resource_identifiers import (
 19    ExpectationSuiteIdentifier,
 20)
 21from great_expectations.exceptions.exceptions import GreatExpectationsError
 22from opentelemetry import trace
 23
 24import great_expectations as ge
 25from schematic.models.validate_attribute import GenerateError
 26from schematic.schemas.data_model_graph import DataModelGraphExplorer
 27from schematic.utils.schema_utils import extract_component_validation_rules
 28from schematic.utils.validate_utils import (
 29    iterable_to_str_list,
 30    np_array_to_str_list,
 31    required_is_only_rule,
 32    rule_in_rule_list,
 33)
 34
 35logger = logging.getLogger(__name__)
 36tracer = trace.get_tracer("Schematic")
 37
 38# List of modifiers that users can add to a rule, that arent rules themselves.
 39# as additional modifiers are added will need to update this list
 40
 41RULE_MODIFIERS = ["error", "warning", "strict", "like", "set", "value"]
 42VALIDATION_EXPECTATION = {
 43    "int": "expect_column_values_to_be_in_type_list",
 44    "float": "expect_column_values_to_be_in_type_list",
 45    "str": "expect_column_values_to_be_of_type",
 46    "num": "expect_column_values_to_be_in_type_list",
 47    "date": "expect_column_values_to_be_dateutil_parseable",
 48    "recommended": "expect_column_values_to_not_be_null",
 49    "protectAges": "expect_column_values_to_be_between",
 50    "unique": "expect_column_values_to_be_unique",
 51    "inRange": "expect_column_values_to_be_between",
 52    "IsNA": "expect_column_values_to_match_regex_list",
 53    # To be implemented rules with possible expectations
 54    # "list": "expect_column_values_to_not_match_regex_list",
 55    # "regex": "expect_column_values_to_match_regex",
 56    # "url": "expect_column_values_to_be_valid_urls",
 57    # "matchAtLeastOne": "expect_foreign_keys_in_column_a_to_exist_in_column_b",
 58    # "matchExactlyOne": "expect_foreign_keys_in_column_a_to_exist_in_column_b",
 59    # "matchNone": "expect_compound_columns_to_be_unique",
 60}
 61
 62
 63class GreatExpectationsHelpers(object):
 64    """
 65    Great Expectations helper class
 66
 67    Provides basic utilities to:
 68        1) Create GE workflow specific to manifest according to validation rules
 69        2) Parse results dict to generate appropriate errors
 70    """
 71
 72    def __init__(self, dmge, unimplemented_expectations, manifest, manifestPath):
 73        """
 74        Purpose:
 75            Instantiate a great expectations helpers object
 76        Args:
 77            dmge:
 78                DataModelGraphExplorer Object
 79            unimplemented_expectations:
 80                dictionary of validation rules that currently do not have expectations developed
 81            manifest:
 82                manifest being validated
 83            manifestPath:
 84                path to manifest being validated
 85        """
 86        self.unimplemented_expectations = unimplemented_expectations
 87        self.dmge = dmge
 88        self.manifest = manifest
 89        self.manifestPath = manifestPath
 90
 91    @tracer.start_as_current_span("GreatExpectationsHelpers::build_context")
 92    def build_context(self):
 93        """
 94        Purpose:
 95            Create a dataContext and datasource and add to object
 96        Returns:
 97            saves dataContext and datasource to self
 98        """
 99        self.context = ge.get_context()
100
101        # create datasource configuration
102        datasource_config = {
103            "name": "example_datasource",
104            "class_name": "Datasource",
105            "module_name": "great_expectations.datasource",
106            "execution_engine": {
107                "module_name": "great_expectations.execution_engine",
108                "class_name": "PandasExecutionEngine",
109            },
110            "data_connectors": {
111                "default_runtime_data_connector_name": {
112                    "class_name": "RuntimeDataConnector",
113                    "batch_identifiers": ["default_identifier_name"],
114                },
115            },
116        }
117
118        # Setting this to False prevents extra data from leaving schematic
119        anonymous_usage_statistics = AnonymizedUsageStatisticsConfig(enabled=False)
120
121        # create data context configuration
122        data_context_config = DataContextConfig(
123            datasources={
124                "pandas": DatasourceConfig(
125                    class_name="Datasource",
126                    execution_engine={"class_name": "PandasExecutionEngine"},
127                    data_connectors={
128                        "default_runtime_data_connector_name": {
129                            "class_name": "RuntimeDataConnector",
130                            "batch_identifiers": ["default_identifier_name"],
131                        }
132                    },
133                )
134            },
135            store_backend_defaults=FilesystemStoreBackendDefaults(
136                root_directory=os.path.join(os.getcwd(), "great_expectations")
137            ),
138            anonymous_usage_statistics=anonymous_usage_statistics,
139        )
140
141        # build context and add data source
142        self.context = BaseDataContext(project_config=data_context_config)
143        # self.context.test_yaml_config(yaml.dump(datasource_config))
144        self.context.add_datasource(**datasource_config)
145
146    @tracer.start_as_current_span(
147        "GreatExpectationsHelpers::add_expectation_suite_if_not_exists"
148    )
149    def add_expectation_suite_if_not_exists(self) -> ExpectationSuite:
150        """
151        Purpose:
152            Add expectation suite if it does not exist
153        Input:
154        Returns:
155            saves expectation suite and identifier to self
156        """
157        self.expectation_suite_name = f"Manifest_test_suite_{uuid.uuid4()}"
158        expectation_suite = self.context.add_expectation_suite(
159            expectation_suite_name=self.expectation_suite_name,
160        )
161        self.suite = expectation_suite
162
163        return self.suite
164
165    @tracer.start_as_current_span("GreatExpectationsHelpers::build_expectation_suite")
166    def build_expectation_suite(
167        self,
168    ) -> None:
169        """
170        Purpose:
171            Construct an expectation suite to validate columns with rules that have expectations
172            Add suite to object
173        Input:
174
175        Returns:
176            saves expectation suite and identifier to self
177
178        """
179
180        # create blank expectation suite
181        self.suite = self.add_expectation_suite_if_not_exists()
182
183        # build expectation configurations for each expectation
184        for col in self.manifest.columns:
185            args = {}
186            meta = {}
187
188            # remove trailing/leading whitespaces from manifest
189            self.manifest.map(lambda x: x.strip() if isinstance(x, str) else x)
190
191            validation_rules = self.dmge.get_node_validation_rules(
192                node_display_name=col
193            )
194
195            # check if attribute has any rules associated with it
196            if validation_rules:
197                # Check if the validation rule applies to this manifest
198                if isinstance(validation_rules, dict):
199                    validation_rules = extract_component_validation_rules(
200                        manifest_component=self.manifest["Component"][0],
201                        validation_rules_dict=validation_rules,
202                    )
203                # iterate through all validation rules for an attribute
204                for rule in validation_rules:
205                    base_rule = rule.split(" ")[0]
206
207                    # check if rule has an implemented expectation
208                    if rule_in_rule_list(
209                        rule, self.unimplemented_expectations
210                    ) or required_is_only_rule(
211                        rule=rule,
212                        attribute=col,
213                        rule_modifiers=RULE_MODIFIERS,
214                        validation_expectation=VALIDATION_EXPECTATION,
215                    ):
216                        continue
217
218                    args["column"] = col
219                    args["result_format"] = "COMPLETE"
220
221                    # Validate num
222                    if base_rule == "num":
223                        args["mostly"] = 1.0
224                        args["type_list"] = ["int", "int64", "float", "float64"]
225                        meta = {
226                            "notes": {
227                                "format": "markdown",
228                                "content": "Expect column values to be of int or float type. **Markdown** `Supported`",
229                            },
230                            "validation_rule": rule,
231                        }
232
233                    # Validate float
234                    elif base_rule == "float":
235                        args["mostly"] = 1.0
236                        args["type_list"] = ["float", "float64"]
237                        meta = {
238                            "notes": {
239                                "format": "markdown",
240                                "content": "Expect column values to be of float type. **Markdown** `Supported`",
241                            },
242                            "validation_rule": rule,
243                        }
244
245                    # Validate int
246                    elif base_rule == "int":
247                        args["mostly"] = 1.0
248                        args["type_list"] = ["int", "int64"]
249                        meta = {
250                            "notes": {
251                                "format": "markdown",
252                                "content": "Expect column values to be of int type. **Markdown** `Supported`",
253                            },
254                            "validation_rule": rule,
255                        }
256
257                    # Validate string
258                    elif base_rule == "str":
259                        args["mostly"] = 1.0
260                        args["type_"] = "str"
261                        meta = {
262                            "notes": {
263                                "format": "markdown",
264                                "content": "Expect column values to be of string type. **Markdown** `Supported`",
265                            },
266                            "validation_rule": rule,
267                        }
268
269                    # Validate date
270                    elif base_rule == "date":
271                        args["mostly"] = 1.0
272                        meta = {
273                            "notes": {
274                                "format": "markdown",
275                                "content": (
276                                    "Expect column values to be parsable by dateutils. "
277                                    "**Markdown** `Supported`"
278                                ),
279                            },
280                            "validation_rule": rule,
281                        }
282
283                    elif base_rule == ("recommended"):
284                        args["mostly"] = 0.0000000001
285                        meta = {
286                            "notes": {
287                                "format": "markdown",
288                                "content": "Expect column to not be empty. **Markdown** `Supported`",
289                            },
290                            "validation_rule": rule,
291                        }
292
293                    elif base_rule == ("protectAges"):
294                        # Function to convert to different age limit formats
295                        min_age, max_age = self.get_age_limits()
296
297                        args["mostly"] = 1.0
298                        args["min_value"] = min_age
299                        args["max_value"] = max_age
300                        # args['allow_cross_type_comparisons']=True # TODO Can allow after issue #980 is completed
301                        meta = {
302                            "notes": {
303                                "format": "markdown",
304                                "content": "Expect ages to be between 18 years (6,570 days) and 90 years (32,850 days) of age. **Markdown** `Supported`",
305                            },
306                            "validation_rule": rule,
307                        }
308
309                    elif base_rule == ("unique"):
310                        args["mostly"] = 1.0
311                        meta = {
312                            "notes": {
313                                "format": "markdown",
314                                "content": "Expect column values to be Unique. **Markdown** `Supported`",
315                            },
316                            "validation_rule": rule,
317                        }
318
319                    elif base_rule == ("inRange"):
320                        args["mostly"] = 1.0
321                        args["min_value"] = (
322                            float(rule.split(" ")[1])
323                            if rule.split(" ")[1].lower() != "none"
324                            else None
325                        )
326                        args["max_value"] = (
327                            float(rule.split(" ")[2])
328                            if rule.split(" ")[2].lower() != "none"
329                            else None
330                        )
331                        args[
332                            "allow_cross_type_comparisons"
333                        ] = True  # TODO Should follow up with issue #980
334                        meta = {
335                            "notes": {
336                                "format": "markdown",
337                                "content": "Expect column values to be within a specified range. **Markdown** `Supported`",
338                            },
339                            "validation_rule": rule,
340                        }
341
342                    elif base_rule == ("IsNA"):
343                        args["mostly"] = 1.0
344                        args["regex_list"] = ["Not Applicable"]
345                        meta = {
346                            "notes": {
347                                "format": "markdown",
348                                "content": "Expect column values to be marked Not Applicable. **Markdown** `Supported`",
349                            },
350                            "validation_rule": rule,
351                        }
352
353                    # add expectation for attribute to suite
354                    self.add_expectation(
355                        rule=rule,
356                        args=args,
357                        meta=meta,
358                        validation_expectation=VALIDATION_EXPECTATION,
359                    )
360
361        self.context.update_expectation_suite(
362            expectation_suite=self.suite,
363        )
364
365        suite_identifier = ExpectationSuiteIdentifier(
366            expectation_suite_name=self.expectation_suite_name
367        )
368
369        if logger.isEnabledFor(logging.DEBUG):
370            self.context.build_data_docs(resource_identifiers=[suite_identifier])
371            # Webpage DataDocs opened here:
372            # self.context.open_data_docs(resource_identifier=suite_identifier)
373
374    def add_expectation(
375        self,
376        rule: str,
377        args: Dict,
378        meta: Dict,
379        validation_expectation: Dict,
380    ):
381        """
382        Purpose:
383            Add individual expectation for a rule to the suite
384        Input:
385            rule:
386                validation rule
387            args:
388                dict of arguments specifying expectation behavior
389            meta:
390                dict of additional information for each expectation
391            validation_expectation:
392                dictionary to map between rules and expectations
393        Returns:
394            adds expectation to self.suite
395
396        """
397        # Create an Expectation
398        expectation_configuration = ExpectationConfiguration(
399            # Name of expectation type being added
400            expectation_type=VALIDATION_EXPECTATION[rule.split(" ")[0]],
401            # add arguments and meta message
402            kwargs={**args},
403            meta={**meta},
404        )
405        # Add the Expectation to the suite
406        self.suite.add_expectation(expectation_configuration=expectation_configuration)
407
408    def build_checkpoint(self):
409        """
410        Purpose:
411            Build checkpoint to validate manifest
412        Input:
413        Returns:
414            adds checkpoint to self
415        """
416        # create manifest checkpoint
417        self.checkpoint_name = f"manifest_checkpoint_{uuid.uuid4()}"
418        checkpoint_config = {
419            "name": self.checkpoint_name,
420            "config_version": 1,
421            "class_name": "SimpleCheckpoint",
422            "validations": [
423                {
424                    "batch_request": {
425                        "datasource_name": "example_datasource",
426                        "data_connector_name": "default_runtime_data_connector_name",
427                        "data_asset_name": "Manifest",
428                    },
429                    "expectation_suite_name": self.expectation_suite_name,
430                }
431            ],
432        }
433
434        # self.context.test_yaml_config(yaml.dump(checkpoint_config),return_mode="report_object")
435        self.context.add_checkpoint(**checkpoint_config)
436
437    def generate_errors(
438        self,
439        validation_results: Dict,
440        validation_types: Dict,
441        errors: List,
442        warnings: List,
443        dmge: DataModelGraphExplorer,
444    ):
445        """
446        Purpose:
447            Parse results dictionary and generate errors for expectations
448        Input:
449            validation_results:
450                dictionary of results for each expectation
451            validation_types:
452                dict of types of errors to generate for each validation rule
453            errors:
454                list of errors
455            warnings:
456                list of warnings
457        Returns:
458            errors:
459                list of errors
460            warnings:
461                list of warnings
462            self.manifest:
463                manifest, possibly updated (censored ages)
464        """
465
466        type_dict = {
467            "float64": float,
468            "int64": int,
469            "str": str,
470        }
471        for result_dict in validation_results[0]["results"]:
472            indices = []
473            values = []
474
475            # if the expectaion failed, get infromation to generate error message
476            if not result_dict["success"]:
477                errColumn = result_dict["expectation_config"]["kwargs"]["column"]
478                rule = result_dict["expectation_config"]["meta"]["validation_rule"]
479
480                if (
481                    "exception_info" in result_dict.keys()
482                    # This changes in 0.18.x of GE, details on this:
483                    # https://docs.greatexpectations.io/docs/0.18/reference/learn/terms/validation_result/
484                    and result_dict["exception_info"]["exception_message"]
485                ):
486                    raise GreatExpectationsError(
487                        result_dict["exception_info"]["exception_traceback"]
488                    )
489
490                # only some expectations explicitly list unexpected values and indices, read or find if not present
491                elif "unexpected_index_list" in result_dict["result"]:
492                    indices = result_dict["result"]["unexpected_index_list"]
493                    values = result_dict["result"]["unexpected_list"]
494
495                # Technically, this shouldn't ever happen, but will keep as a failsafe in case many things go wrong
496                # because type validation is column aggregate expectation and not column map expectation when columns are not of object type,
497                # indices and values cannot be returned
498                else:
499                    # This changes in 0.17.x of GE, refactored code:
500                    # for i, item in enumerate(self.manifest[errColumn]):
501                    #     observed_type = result_dict.get("result", {}).get("observed_value", None)
502                    #     is_instance_type = observed_type is not None and isinstance(
503                    #         item, type_dict[observed_type]
504                    #     )
505                    #     indices.append(i) if is_instance_type else indices
506                    #     values.append(item) if is_instance_type else values
507                    for i, item in enumerate(self.manifest[errColumn]):
508                        observed_type = result_dict["result"]["observed_value"]
509                        indices.append(i) if isinstance(
510                            item, type_dict[observed_type]
511                        ) else indices
512                        values.append(item) if isinstance(
513                            item, type_dict[observed_type]
514                        ) else values
515
516                # call functions to generate error messages and add to error list
517                if validation_types[rule.split(" ")[0]]["type"] == "type_validation":
518                    for row, value in zip(indices, values):
519                        vr_errors, vr_warnings = GenerateError.generate_type_error(
520                            val_rule=rule,
521                            row_num=str(row + 2),
522                            attribute_name=errColumn,
523                            invalid_entry=str(value),
524                            dmge=dmge,
525                        )
526                        if vr_errors:
527                            errors.append(vr_errors)
528                        if vr_warnings:
529                            warnings.append(vr_warnings)
530                elif validation_types[rule.split(" ")[0]]["type"] == "regex_validation":
531                    expression = result_dict["expectation_config"]["kwargs"]["regex"]
532                    for row, value in zip(indices, values):
533                        vr_errors, vr_warnings = GenerateError.generate_regex_error(
534                            val_rule=rule,
535                            reg_expression=expression,
536                            row_num=str(row + 2),
537                            module_to_call="match",
538                            attribute_name=errColumn,
539                            invalid_entry=value,
540                            dmge=dmge,
541                        )
542                        if vr_errors:
543                            errors.append(vr_errors)
544                        if vr_warnings:
545                            warnings.append(vr_warnings)
546                elif (
547                    validation_types[rule.split(" ")[0]]["type"] == "content_validation"
548                ):
549                    vr_errors, vr_warnings = GenerateError.generate_content_error(
550                        val_rule=rule,
551                        attribute_name=errColumn,
552                        row_num=np_array_to_str_list(np.array(indices) + 2),
553                        invalid_entry=iterable_to_str_list(values),
554                        dmge=self.dmge,
555                    )
556                    if vr_errors:
557                        errors.append(vr_errors)
558                        if rule.startswith("protectAges"):
559                            self.censor_ages(vr_errors, errColumn)
560
561                    if vr_warnings:
562                        warnings.append(vr_warnings)
563                        if rule.startswith("protectAges"):
564                            self.censor_ages(vr_warnings, errColumn)
565
566        return errors, warnings
567
568    def get_age_limits(
569        self,
570    ):
571        """
572        Purpose:
573            Get boundaries of ages that need to be censored for different age formats
574        Input:
575        Returns:
576            min_age:
577                minimum age that will not be censored
578            max age:
579                maximum age that will not be censored
580
581        """
582
583        min_age = 6550  # days
584        max_age = 32849  # days
585
586        return min_age, max_age
587
588    def censor_ages(
589        self,
590        message: List,
591        col: str,
592    ):
593        """
594        Purpose:
595            Censor ages in manifest as appropriate
596        Input:
597            message:
598                error or warning message for age validation rule
599            col:
600                name of column containing ages
601        Returns:
602            updates self.manifest with censored ages
603        TODO: Speed up conversion from str list to int list
604        """
605        censor_rows = []
606
607        for row in message[0]:
608            censor_rows.append(int(row) - 2)
609
610        self.manifest.loc[censor_rows, (col)] = "age censored"
611
612        # update the manifest file, so that ages are censored
613        self.manifest.to_csv(
614            self.manifestPath.replace(".csv", "_censored.csv"), index=False
615        )
616        logging.info("Sensitive ages have been censored.")
617
618        return
logger = <Logger schematic.models.GE_Helpers (WARNING)>
tracer = <opentelemetry.trace.ProxyTracer object>
RULE_MODIFIERS = ['error', 'warning', 'strict', 'like', 'set', 'value']
VALIDATION_EXPECTATION = {'int': 'expect_column_values_to_be_in_type_list', 'float': 'expect_column_values_to_be_in_type_list', 'str': 'expect_column_values_to_be_of_type', 'num': 'expect_column_values_to_be_in_type_list', 'date': 'expect_column_values_to_be_dateutil_parseable', 'recommended': 'expect_column_values_to_not_be_null', 'protectAges': 'expect_column_values_to_be_between', 'unique': 'expect_column_values_to_be_unique', 'inRange': 'expect_column_values_to_be_between', 'IsNA': 'expect_column_values_to_match_regex_list'}
class GreatExpectationsHelpers:
 64class GreatExpectationsHelpers(object):
 65    """
 66    Great Expectations helper class
 67
 68    Provides basic utilities to:
 69        1) Create GE workflow specific to manifest according to validation rules
 70        2) Parse results dict to generate appropriate errors
 71    """
 72
 73    def __init__(self, dmge, unimplemented_expectations, manifest, manifestPath):
 74        """
 75        Purpose:
 76            Instantiate a great expectations helpers object
 77        Args:
 78            dmge:
 79                DataModelGraphExplorer Object
 80            unimplemented_expectations:
 81                dictionary of validation rules that currently do not have expectations developed
 82            manifest:
 83                manifest being validated
 84            manifestPath:
 85                path to manifest being validated
 86        """
 87        self.unimplemented_expectations = unimplemented_expectations
 88        self.dmge = dmge
 89        self.manifest = manifest
 90        self.manifestPath = manifestPath
 91
 92    @tracer.start_as_current_span("GreatExpectationsHelpers::build_context")
 93    def build_context(self):
 94        """
 95        Purpose:
 96            Create a dataContext and datasource and add to object
 97        Returns:
 98            saves dataContext and datasource to self
 99        """
100        self.context = ge.get_context()
101
102        # create datasource configuration
103        datasource_config = {
104            "name": "example_datasource",
105            "class_name": "Datasource",
106            "module_name": "great_expectations.datasource",
107            "execution_engine": {
108                "module_name": "great_expectations.execution_engine",
109                "class_name": "PandasExecutionEngine",
110            },
111            "data_connectors": {
112                "default_runtime_data_connector_name": {
113                    "class_name": "RuntimeDataConnector",
114                    "batch_identifiers": ["default_identifier_name"],
115                },
116            },
117        }
118
119        # Setting this to False prevents extra data from leaving schematic
120        anonymous_usage_statistics = AnonymizedUsageStatisticsConfig(enabled=False)
121
122        # create data context configuration
123        data_context_config = DataContextConfig(
124            datasources={
125                "pandas": DatasourceConfig(
126                    class_name="Datasource",
127                    execution_engine={"class_name": "PandasExecutionEngine"},
128                    data_connectors={
129                        "default_runtime_data_connector_name": {
130                            "class_name": "RuntimeDataConnector",
131                            "batch_identifiers": ["default_identifier_name"],
132                        }
133                    },
134                )
135            },
136            store_backend_defaults=FilesystemStoreBackendDefaults(
137                root_directory=os.path.join(os.getcwd(), "great_expectations")
138            ),
139            anonymous_usage_statistics=anonymous_usage_statistics,
140        )
141
142        # build context and add data source
143        self.context = BaseDataContext(project_config=data_context_config)
144        # self.context.test_yaml_config(yaml.dump(datasource_config))
145        self.context.add_datasource(**datasource_config)
146
147    @tracer.start_as_current_span(
148        "GreatExpectationsHelpers::add_expectation_suite_if_not_exists"
149    )
150    def add_expectation_suite_if_not_exists(self) -> ExpectationSuite:
151        """
152        Purpose:
153            Add expectation suite if it does not exist
154        Input:
155        Returns:
156            saves expectation suite and identifier to self
157        """
158        self.expectation_suite_name = f"Manifest_test_suite_{uuid.uuid4()}"
159        expectation_suite = self.context.add_expectation_suite(
160            expectation_suite_name=self.expectation_suite_name,
161        )
162        self.suite = expectation_suite
163
164        return self.suite
165
166    @tracer.start_as_current_span("GreatExpectationsHelpers::build_expectation_suite")
167    def build_expectation_suite(
168        self,
169    ) -> None:
170        """
171        Purpose:
172            Construct an expectation suite to validate columns with rules that have expectations
173            Add suite to object
174        Input:
175
176        Returns:
177            saves expectation suite and identifier to self
178
179        """
180
181        # create blank expectation suite
182        self.suite = self.add_expectation_suite_if_not_exists()
183
184        # build expectation configurations for each expectation
185        for col in self.manifest.columns:
186            args = {}
187            meta = {}
188
189            # remove trailing/leading whitespaces from manifest
190            self.manifest.map(lambda x: x.strip() if isinstance(x, str) else x)
191
192            validation_rules = self.dmge.get_node_validation_rules(
193                node_display_name=col
194            )
195
196            # check if attribute has any rules associated with it
197            if validation_rules:
198                # Check if the validation rule applies to this manifest
199                if isinstance(validation_rules, dict):
200                    validation_rules = extract_component_validation_rules(
201                        manifest_component=self.manifest["Component"][0],
202                        validation_rules_dict=validation_rules,
203                    )
204                # iterate through all validation rules for an attribute
205                for rule in validation_rules:
206                    base_rule = rule.split(" ")[0]
207
208                    # check if rule has an implemented expectation
209                    if rule_in_rule_list(
210                        rule, self.unimplemented_expectations
211                    ) or required_is_only_rule(
212                        rule=rule,
213                        attribute=col,
214                        rule_modifiers=RULE_MODIFIERS,
215                        validation_expectation=VALIDATION_EXPECTATION,
216                    ):
217                        continue
218
219                    args["column"] = col
220                    args["result_format"] = "COMPLETE"
221
222                    # Validate num
223                    if base_rule == "num":
224                        args["mostly"] = 1.0
225                        args["type_list"] = ["int", "int64", "float", "float64"]
226                        meta = {
227                            "notes": {
228                                "format": "markdown",
229                                "content": "Expect column values to be of int or float type. **Markdown** `Supported`",
230                            },
231                            "validation_rule": rule,
232                        }
233
234                    # Validate float
235                    elif base_rule == "float":
236                        args["mostly"] = 1.0
237                        args["type_list"] = ["float", "float64"]
238                        meta = {
239                            "notes": {
240                                "format": "markdown",
241                                "content": "Expect column values to be of float type. **Markdown** `Supported`",
242                            },
243                            "validation_rule": rule,
244                        }
245
246                    # Validate int
247                    elif base_rule == "int":
248                        args["mostly"] = 1.0
249                        args["type_list"] = ["int", "int64"]
250                        meta = {
251                            "notes": {
252                                "format": "markdown",
253                                "content": "Expect column values to be of int type. **Markdown** `Supported`",
254                            },
255                            "validation_rule": rule,
256                        }
257
258                    # Validate string
259                    elif base_rule == "str":
260                        args["mostly"] = 1.0
261                        args["type_"] = "str"
262                        meta = {
263                            "notes": {
264                                "format": "markdown",
265                                "content": "Expect column values to be of string type. **Markdown** `Supported`",
266                            },
267                            "validation_rule": rule,
268                        }
269
270                    # Validate date
271                    elif base_rule == "date":
272                        args["mostly"] = 1.0
273                        meta = {
274                            "notes": {
275                                "format": "markdown",
276                                "content": (
277                                    "Expect column values to be parsable by dateutils. "
278                                    "**Markdown** `Supported`"
279                                ),
280                            },
281                            "validation_rule": rule,
282                        }
283
284                    elif base_rule == ("recommended"):
285                        args["mostly"] = 0.0000000001
286                        meta = {
287                            "notes": {
288                                "format": "markdown",
289                                "content": "Expect column to not be empty. **Markdown** `Supported`",
290                            },
291                            "validation_rule": rule,
292                        }
293
294                    elif base_rule == ("protectAges"):
295                        # Function to convert to different age limit formats
296                        min_age, max_age = self.get_age_limits()
297
298                        args["mostly"] = 1.0
299                        args["min_value"] = min_age
300                        args["max_value"] = max_age
301                        # args['allow_cross_type_comparisons']=True # TODO Can allow after issue #980 is completed
302                        meta = {
303                            "notes": {
304                                "format": "markdown",
305                                "content": "Expect ages to be between 18 years (6,570 days) and 90 years (32,850 days) of age. **Markdown** `Supported`",
306                            },
307                            "validation_rule": rule,
308                        }
309
310                    elif base_rule == ("unique"):
311                        args["mostly"] = 1.0
312                        meta = {
313                            "notes": {
314                                "format": "markdown",
315                                "content": "Expect column values to be Unique. **Markdown** `Supported`",
316                            },
317                            "validation_rule": rule,
318                        }
319
320                    elif base_rule == ("inRange"):
321                        args["mostly"] = 1.0
322                        args["min_value"] = (
323                            float(rule.split(" ")[1])
324                            if rule.split(" ")[1].lower() != "none"
325                            else None
326                        )
327                        args["max_value"] = (
328                            float(rule.split(" ")[2])
329                            if rule.split(" ")[2].lower() != "none"
330                            else None
331                        )
332                        args[
333                            "allow_cross_type_comparisons"
334                        ] = True  # TODO Should follow up with issue #980
335                        meta = {
336                            "notes": {
337                                "format": "markdown",
338                                "content": "Expect column values to be within a specified range. **Markdown** `Supported`",
339                            },
340                            "validation_rule": rule,
341                        }
342
343                    elif base_rule == ("IsNA"):
344                        args["mostly"] = 1.0
345                        args["regex_list"] = ["Not Applicable"]
346                        meta = {
347                            "notes": {
348                                "format": "markdown",
349                                "content": "Expect column values to be marked Not Applicable. **Markdown** `Supported`",
350                            },
351                            "validation_rule": rule,
352                        }
353
354                    # add expectation for attribute to suite
355                    self.add_expectation(
356                        rule=rule,
357                        args=args,
358                        meta=meta,
359                        validation_expectation=VALIDATION_EXPECTATION,
360                    )
361
362        self.context.update_expectation_suite(
363            expectation_suite=self.suite,
364        )
365
366        suite_identifier = ExpectationSuiteIdentifier(
367            expectation_suite_name=self.expectation_suite_name
368        )
369
370        if logger.isEnabledFor(logging.DEBUG):
371            self.context.build_data_docs(resource_identifiers=[suite_identifier])
372            # Webpage DataDocs opened here:
373            # self.context.open_data_docs(resource_identifier=suite_identifier)
374
375    def add_expectation(
376        self,
377        rule: str,
378        args: Dict,
379        meta: Dict,
380        validation_expectation: Dict,
381    ):
382        """
383        Purpose:
384            Add individual expectation for a rule to the suite
385        Input:
386            rule:
387                validation rule
388            args:
389                dict of arguments specifying expectation behavior
390            meta:
391                dict of additional information for each expectation
392            validation_expectation:
393                dictionary to map between rules and expectations
394        Returns:
395            adds expectation to self.suite
396
397        """
398        # Create an Expectation
399        expectation_configuration = ExpectationConfiguration(
400            # Name of expectation type being added
401            expectation_type=VALIDATION_EXPECTATION[rule.split(" ")[0]],
402            # add arguments and meta message
403            kwargs={**args},
404            meta={**meta},
405        )
406        # Add the Expectation to the suite
407        self.suite.add_expectation(expectation_configuration=expectation_configuration)
408
409    def build_checkpoint(self):
410        """
411        Purpose:
412            Build checkpoint to validate manifest
413        Input:
414        Returns:
415            adds checkpoint to self
416        """
417        # create manifest checkpoint
418        self.checkpoint_name = f"manifest_checkpoint_{uuid.uuid4()}"
419        checkpoint_config = {
420            "name": self.checkpoint_name,
421            "config_version": 1,
422            "class_name": "SimpleCheckpoint",
423            "validations": [
424                {
425                    "batch_request": {
426                        "datasource_name": "example_datasource",
427                        "data_connector_name": "default_runtime_data_connector_name",
428                        "data_asset_name": "Manifest",
429                    },
430                    "expectation_suite_name": self.expectation_suite_name,
431                }
432            ],
433        }
434
435        # self.context.test_yaml_config(yaml.dump(checkpoint_config),return_mode="report_object")
436        self.context.add_checkpoint(**checkpoint_config)
437
438    def generate_errors(
439        self,
440        validation_results: Dict,
441        validation_types: Dict,
442        errors: List,
443        warnings: List,
444        dmge: DataModelGraphExplorer,
445    ):
446        """
447        Purpose:
448            Parse results dictionary and generate errors for expectations
449        Input:
450            validation_results:
451                dictionary of results for each expectation
452            validation_types:
453                dict of types of errors to generate for each validation rule
454            errors:
455                list of errors
456            warnings:
457                list of warnings
458        Returns:
459            errors:
460                list of errors
461            warnings:
462                list of warnings
463            self.manifest:
464                manifest, possibly updated (censored ages)
465        """
466
467        type_dict = {
468            "float64": float,
469            "int64": int,
470            "str": str,
471        }
472        for result_dict in validation_results[0]["results"]:
473            indices = []
474            values = []
475
476            # if the expectaion failed, get infromation to generate error message
477            if not result_dict["success"]:
478                errColumn = result_dict["expectation_config"]["kwargs"]["column"]
479                rule = result_dict["expectation_config"]["meta"]["validation_rule"]
480
481                if (
482                    "exception_info" in result_dict.keys()
483                    # This changes in 0.18.x of GE, details on this:
484                    # https://docs.greatexpectations.io/docs/0.18/reference/learn/terms/validation_result/
485                    and result_dict["exception_info"]["exception_message"]
486                ):
487                    raise GreatExpectationsError(
488                        result_dict["exception_info"]["exception_traceback"]
489                    )
490
491                # only some expectations explicitly list unexpected values and indices, read or find if not present
492                elif "unexpected_index_list" in result_dict["result"]:
493                    indices = result_dict["result"]["unexpected_index_list"]
494                    values = result_dict["result"]["unexpected_list"]
495
496                # Technically, this shouldn't ever happen, but will keep as a failsafe in case many things go wrong
497                # because type validation is column aggregate expectation and not column map expectation when columns are not of object type,
498                # indices and values cannot be returned
499                else:
500                    # This changes in 0.17.x of GE, refactored code:
501                    # for i, item in enumerate(self.manifest[errColumn]):
502                    #     observed_type = result_dict.get("result", {}).get("observed_value", None)
503                    #     is_instance_type = observed_type is not None and isinstance(
504                    #         item, type_dict[observed_type]
505                    #     )
506                    #     indices.append(i) if is_instance_type else indices
507                    #     values.append(item) if is_instance_type else values
508                    for i, item in enumerate(self.manifest[errColumn]):
509                        observed_type = result_dict["result"]["observed_value"]
510                        indices.append(i) if isinstance(
511                            item, type_dict[observed_type]
512                        ) else indices
513                        values.append(item) if isinstance(
514                            item, type_dict[observed_type]
515                        ) else values
516
517                # call functions to generate error messages and add to error list
518                if validation_types[rule.split(" ")[0]]["type"] == "type_validation":
519                    for row, value in zip(indices, values):
520                        vr_errors, vr_warnings = GenerateError.generate_type_error(
521                            val_rule=rule,
522                            row_num=str(row + 2),
523                            attribute_name=errColumn,
524                            invalid_entry=str(value),
525                            dmge=dmge,
526                        )
527                        if vr_errors:
528                            errors.append(vr_errors)
529                        if vr_warnings:
530                            warnings.append(vr_warnings)
531                elif validation_types[rule.split(" ")[0]]["type"] == "regex_validation":
532                    expression = result_dict["expectation_config"]["kwargs"]["regex"]
533                    for row, value in zip(indices, values):
534                        vr_errors, vr_warnings = GenerateError.generate_regex_error(
535                            val_rule=rule,
536                            reg_expression=expression,
537                            row_num=str(row + 2),
538                            module_to_call="match",
539                            attribute_name=errColumn,
540                            invalid_entry=value,
541                            dmge=dmge,
542                        )
543                        if vr_errors:
544                            errors.append(vr_errors)
545                        if vr_warnings:
546                            warnings.append(vr_warnings)
547                elif (
548                    validation_types[rule.split(" ")[0]]["type"] == "content_validation"
549                ):
550                    vr_errors, vr_warnings = GenerateError.generate_content_error(
551                        val_rule=rule,
552                        attribute_name=errColumn,
553                        row_num=np_array_to_str_list(np.array(indices) + 2),
554                        invalid_entry=iterable_to_str_list(values),
555                        dmge=self.dmge,
556                    )
557                    if vr_errors:
558                        errors.append(vr_errors)
559                        if rule.startswith("protectAges"):
560                            self.censor_ages(vr_errors, errColumn)
561
562                    if vr_warnings:
563                        warnings.append(vr_warnings)
564                        if rule.startswith("protectAges"):
565                            self.censor_ages(vr_warnings, errColumn)
566
567        return errors, warnings
568
569    def get_age_limits(
570        self,
571    ):
572        """
573        Purpose:
574            Get boundaries of ages that need to be censored for different age formats
575        Input:
576        Returns:
577            min_age:
578                minimum age that will not be censored
579            max age:
580                maximum age that will not be censored
581
582        """
583
584        min_age = 6550  # days
585        max_age = 32849  # days
586
587        return min_age, max_age
588
589    def censor_ages(
590        self,
591        message: List,
592        col: str,
593    ):
594        """
595        Purpose:
596            Censor ages in manifest as appropriate
597        Input:
598            message:
599                error or warning message for age validation rule
600            col:
601                name of column containing ages
602        Returns:
603            updates self.manifest with censored ages
604        TODO: Speed up conversion from str list to int list
605        """
606        censor_rows = []
607
608        for row in message[0]:
609            censor_rows.append(int(row) - 2)
610
611        self.manifest.loc[censor_rows, (col)] = "age censored"
612
613        # update the manifest file, so that ages are censored
614        self.manifest.to_csv(
615            self.manifestPath.replace(".csv", "_censored.csv"), index=False
616        )
617        logging.info("Sensitive ages have been censored.")
618
619        return

Great Expectations helper class

Provides basic utilities to:

1) Create GE workflow specific to manifest according to validation rules 2) Parse results dict to generate appropriate errors

GreatExpectationsHelpers(dmge, unimplemented_expectations, manifest, manifestPath)
73    def __init__(self, dmge, unimplemented_expectations, manifest, manifestPath):
74        """
75        Purpose:
76            Instantiate a great expectations helpers object
77        Args:
78            dmge:
79                DataModelGraphExplorer Object
80            unimplemented_expectations:
81                dictionary of validation rules that currently do not have expectations developed
82            manifest:
83                manifest being validated
84            manifestPath:
85                path to manifest being validated
86        """
87        self.unimplemented_expectations = unimplemented_expectations
88        self.dmge = dmge
89        self.manifest = manifest
90        self.manifestPath = manifestPath
Purpose:

Instantiate a great expectations helpers object

Arguments:
  • dmge: DataModelGraphExplorer Object
  • unimplemented_expectations: dictionary of validation rules that currently do not have expectations developed
  • manifest: manifest being validated
  • manifestPath: path to manifest being validated
unimplemented_expectations
dmge
manifest
manifestPath
@tracer.start_as_current_span('GreatExpectationsHelpers::build_context')
def build_context(self):
 92    @tracer.start_as_current_span("GreatExpectationsHelpers::build_context")
 93    def build_context(self):
 94        """
 95        Purpose:
 96            Create a dataContext and datasource and add to object
 97        Returns:
 98            saves dataContext and datasource to self
 99        """
100        self.context = ge.get_context()
101
102        # create datasource configuration
103        datasource_config = {
104            "name": "example_datasource",
105            "class_name": "Datasource",
106            "module_name": "great_expectations.datasource",
107            "execution_engine": {
108                "module_name": "great_expectations.execution_engine",
109                "class_name": "PandasExecutionEngine",
110            },
111            "data_connectors": {
112                "default_runtime_data_connector_name": {
113                    "class_name": "RuntimeDataConnector",
114                    "batch_identifiers": ["default_identifier_name"],
115                },
116            },
117        }
118
119        # Setting this to False prevents extra data from leaving schematic
120        anonymous_usage_statistics = AnonymizedUsageStatisticsConfig(enabled=False)
121
122        # create data context configuration
123        data_context_config = DataContextConfig(
124            datasources={
125                "pandas": DatasourceConfig(
126                    class_name="Datasource",
127                    execution_engine={"class_name": "PandasExecutionEngine"},
128                    data_connectors={
129                        "default_runtime_data_connector_name": {
130                            "class_name": "RuntimeDataConnector",
131                            "batch_identifiers": ["default_identifier_name"],
132                        }
133                    },
134                )
135            },
136            store_backend_defaults=FilesystemStoreBackendDefaults(
137                root_directory=os.path.join(os.getcwd(), "great_expectations")
138            ),
139            anonymous_usage_statistics=anonymous_usage_statistics,
140        )
141
142        # build context and add data source
143        self.context = BaseDataContext(project_config=data_context_config)
144        # self.context.test_yaml_config(yaml.dump(datasource_config))
145        self.context.add_datasource(**datasource_config)
Purpose:

Create a dataContext and datasource and add to object

Returns:

saves dataContext and datasource to self

@tracer.start_as_current_span('GreatExpectationsHelpers::add_expectation_suite_if_not_exists')
def add_expectation_suite_if_not_exists(self) -> great_expectations.core.expectation_suite.ExpectationSuite:
147    @tracer.start_as_current_span(
148        "GreatExpectationsHelpers::add_expectation_suite_if_not_exists"
149    )
150    def add_expectation_suite_if_not_exists(self) -> ExpectationSuite:
151        """
152        Purpose:
153            Add expectation suite if it does not exist
154        Input:
155        Returns:
156            saves expectation suite and identifier to self
157        """
158        self.expectation_suite_name = f"Manifest_test_suite_{uuid.uuid4()}"
159        expectation_suite = self.context.add_expectation_suite(
160            expectation_suite_name=self.expectation_suite_name,
161        )
162        self.suite = expectation_suite
163
164        return self.suite
Purpose:

Add expectation suite if it does not exist

Input:

Returns:

saves expectation suite and identifier to self

@tracer.start_as_current_span('GreatExpectationsHelpers::build_expectation_suite')
def build_expectation_suite(self) -> None:
166    @tracer.start_as_current_span("GreatExpectationsHelpers::build_expectation_suite")
167    def build_expectation_suite(
168        self,
169    ) -> None:
170        """
171        Purpose:
172            Construct an expectation suite to validate columns with rules that have expectations
173            Add suite to object
174        Input:
175
176        Returns:
177            saves expectation suite and identifier to self
178
179        """
180
181        # create blank expectation suite
182        self.suite = self.add_expectation_suite_if_not_exists()
183
184        # build expectation configurations for each expectation
185        for col in self.manifest.columns:
186            args = {}
187            meta = {}
188
189            # remove trailing/leading whitespaces from manifest
190            self.manifest.map(lambda x: x.strip() if isinstance(x, str) else x)
191
192            validation_rules = self.dmge.get_node_validation_rules(
193                node_display_name=col
194            )
195
196            # check if attribute has any rules associated with it
197            if validation_rules:
198                # Check if the validation rule applies to this manifest
199                if isinstance(validation_rules, dict):
200                    validation_rules = extract_component_validation_rules(
201                        manifest_component=self.manifest["Component"][0],
202                        validation_rules_dict=validation_rules,
203                    )
204                # iterate through all validation rules for an attribute
205                for rule in validation_rules:
206                    base_rule = rule.split(" ")[0]
207
208                    # check if rule has an implemented expectation
209                    if rule_in_rule_list(
210                        rule, self.unimplemented_expectations
211                    ) or required_is_only_rule(
212                        rule=rule,
213                        attribute=col,
214                        rule_modifiers=RULE_MODIFIERS,
215                        validation_expectation=VALIDATION_EXPECTATION,
216                    ):
217                        continue
218
219                    args["column"] = col
220                    args["result_format"] = "COMPLETE"
221
222                    # Validate num
223                    if base_rule == "num":
224                        args["mostly"] = 1.0
225                        args["type_list"] = ["int", "int64", "float", "float64"]
226                        meta = {
227                            "notes": {
228                                "format": "markdown",
229                                "content": "Expect column values to be of int or float type. **Markdown** `Supported`",
230                            },
231                            "validation_rule": rule,
232                        }
233
234                    # Validate float
235                    elif base_rule == "float":
236                        args["mostly"] = 1.0
237                        args["type_list"] = ["float", "float64"]
238                        meta = {
239                            "notes": {
240                                "format": "markdown",
241                                "content": "Expect column values to be of float type. **Markdown** `Supported`",
242                            },
243                            "validation_rule": rule,
244                        }
245
246                    # Validate int
247                    elif base_rule == "int":
248                        args["mostly"] = 1.0
249                        args["type_list"] = ["int", "int64"]
250                        meta = {
251                            "notes": {
252                                "format": "markdown",
253                                "content": "Expect column values to be of int type. **Markdown** `Supported`",
254                            },
255                            "validation_rule": rule,
256                        }
257
258                    # Validate string
259                    elif base_rule == "str":
260                        args["mostly"] = 1.0
261                        args["type_"] = "str"
262                        meta = {
263                            "notes": {
264                                "format": "markdown",
265                                "content": "Expect column values to be of string type. **Markdown** `Supported`",
266                            },
267                            "validation_rule": rule,
268                        }
269
270                    # Validate date
271                    elif base_rule == "date":
272                        args["mostly"] = 1.0
273                        meta = {
274                            "notes": {
275                                "format": "markdown",
276                                "content": (
277                                    "Expect column values to be parsable by dateutils. "
278                                    "**Markdown** `Supported`"
279                                ),
280                            },
281                            "validation_rule": rule,
282                        }
283
284                    elif base_rule == ("recommended"):
285                        args["mostly"] = 0.0000000001
286                        meta = {
287                            "notes": {
288                                "format": "markdown",
289                                "content": "Expect column to not be empty. **Markdown** `Supported`",
290                            },
291                            "validation_rule": rule,
292                        }
293
294                    elif base_rule == ("protectAges"):
295                        # Function to convert to different age limit formats
296                        min_age, max_age = self.get_age_limits()
297
298                        args["mostly"] = 1.0
299                        args["min_value"] = min_age
300                        args["max_value"] = max_age
301                        # args['allow_cross_type_comparisons']=True # TODO Can allow after issue #980 is completed
302                        meta = {
303                            "notes": {
304                                "format": "markdown",
305                                "content": "Expect ages to be between 18 years (6,570 days) and 90 years (32,850 days) of age. **Markdown** `Supported`",
306                            },
307                            "validation_rule": rule,
308                        }
309
310                    elif base_rule == ("unique"):
311                        args["mostly"] = 1.0
312                        meta = {
313                            "notes": {
314                                "format": "markdown",
315                                "content": "Expect column values to be Unique. **Markdown** `Supported`",
316                            },
317                            "validation_rule": rule,
318                        }
319
320                    elif base_rule == ("inRange"):
321                        args["mostly"] = 1.0
322                        args["min_value"] = (
323                            float(rule.split(" ")[1])
324                            if rule.split(" ")[1].lower() != "none"
325                            else None
326                        )
327                        args["max_value"] = (
328                            float(rule.split(" ")[2])
329                            if rule.split(" ")[2].lower() != "none"
330                            else None
331                        )
332                        args[
333                            "allow_cross_type_comparisons"
334                        ] = True  # TODO Should follow up with issue #980
335                        meta = {
336                            "notes": {
337                                "format": "markdown",
338                                "content": "Expect column values to be within a specified range. **Markdown** `Supported`",
339                            },
340                            "validation_rule": rule,
341                        }
342
343                    elif base_rule == ("IsNA"):
344                        args["mostly"] = 1.0
345                        args["regex_list"] = ["Not Applicable"]
346                        meta = {
347                            "notes": {
348                                "format": "markdown",
349                                "content": "Expect column values to be marked Not Applicable. **Markdown** `Supported`",
350                            },
351                            "validation_rule": rule,
352                        }
353
354                    # add expectation for attribute to suite
355                    self.add_expectation(
356                        rule=rule,
357                        args=args,
358                        meta=meta,
359                        validation_expectation=VALIDATION_EXPECTATION,
360                    )
361
362        self.context.update_expectation_suite(
363            expectation_suite=self.suite,
364        )
365
366        suite_identifier = ExpectationSuiteIdentifier(
367            expectation_suite_name=self.expectation_suite_name
368        )
369
370        if logger.isEnabledFor(logging.DEBUG):
371            self.context.build_data_docs(resource_identifiers=[suite_identifier])
372            # Webpage DataDocs opened here:
373            # self.context.open_data_docs(resource_identifier=suite_identifier)
Purpose:

Construct an expectation suite to validate columns with rules that have expectations Add suite to object

Input:

Returns:

saves expectation suite and identifier to self

def add_expectation( self, rule: str, args: Dict, meta: Dict, validation_expectation: Dict):
375    def add_expectation(
376        self,
377        rule: str,
378        args: Dict,
379        meta: Dict,
380        validation_expectation: Dict,
381    ):
382        """
383        Purpose:
384            Add individual expectation for a rule to the suite
385        Input:
386            rule:
387                validation rule
388            args:
389                dict of arguments specifying expectation behavior
390            meta:
391                dict of additional information for each expectation
392            validation_expectation:
393                dictionary to map between rules and expectations
394        Returns:
395            adds expectation to self.suite
396
397        """
398        # Create an Expectation
399        expectation_configuration = ExpectationConfiguration(
400            # Name of expectation type being added
401            expectation_type=VALIDATION_EXPECTATION[rule.split(" ")[0]],
402            # add arguments and meta message
403            kwargs={**args},
404            meta={**meta},
405        )
406        # Add the Expectation to the suite
407        self.suite.add_expectation(expectation_configuration=expectation_configuration)
Purpose:

Add individual expectation for a rule to the suite

Input:

rule: validation rule args: dict of arguments specifying expectation behavior meta: dict of additional information for each expectation validation_expectation: dictionary to map between rules and expectations

Returns:

adds expectation to self.suite

def build_checkpoint(self):
409    def build_checkpoint(self):
410        """
411        Purpose:
412            Build checkpoint to validate manifest
413        Input:
414        Returns:
415            adds checkpoint to self
416        """
417        # create manifest checkpoint
418        self.checkpoint_name = f"manifest_checkpoint_{uuid.uuid4()}"
419        checkpoint_config = {
420            "name": self.checkpoint_name,
421            "config_version": 1,
422            "class_name": "SimpleCheckpoint",
423            "validations": [
424                {
425                    "batch_request": {
426                        "datasource_name": "example_datasource",
427                        "data_connector_name": "default_runtime_data_connector_name",
428                        "data_asset_name": "Manifest",
429                    },
430                    "expectation_suite_name": self.expectation_suite_name,
431                }
432            ],
433        }
434
435        # self.context.test_yaml_config(yaml.dump(checkpoint_config),return_mode="report_object")
436        self.context.add_checkpoint(**checkpoint_config)
Purpose:

Build checkpoint to validate manifest

Input:

Returns:

adds checkpoint to self

def generate_errors( self, validation_results: Dict, validation_types: Dict, errors: List, warnings: List, dmge: schematic.schemas.data_model_graph.DataModelGraphExplorer):
438    def generate_errors(
439        self,
440        validation_results: Dict,
441        validation_types: Dict,
442        errors: List,
443        warnings: List,
444        dmge: DataModelGraphExplorer,
445    ):
446        """
447        Purpose:
448            Parse results dictionary and generate errors for expectations
449        Input:
450            validation_results:
451                dictionary of results for each expectation
452            validation_types:
453                dict of types of errors to generate for each validation rule
454            errors:
455                list of errors
456            warnings:
457                list of warnings
458        Returns:
459            errors:
460                list of errors
461            warnings:
462                list of warnings
463            self.manifest:
464                manifest, possibly updated (censored ages)
465        """
466
467        type_dict = {
468            "float64": float,
469            "int64": int,
470            "str": str,
471        }
472        for result_dict in validation_results[0]["results"]:
473            indices = []
474            values = []
475
476            # if the expectaion failed, get infromation to generate error message
477            if not result_dict["success"]:
478                errColumn = result_dict["expectation_config"]["kwargs"]["column"]
479                rule = result_dict["expectation_config"]["meta"]["validation_rule"]
480
481                if (
482                    "exception_info" in result_dict.keys()
483                    # This changes in 0.18.x of GE, details on this:
484                    # https://docs.greatexpectations.io/docs/0.18/reference/learn/terms/validation_result/
485                    and result_dict["exception_info"]["exception_message"]
486                ):
487                    raise GreatExpectationsError(
488                        result_dict["exception_info"]["exception_traceback"]
489                    )
490
491                # only some expectations explicitly list unexpected values and indices, read or find if not present
492                elif "unexpected_index_list" in result_dict["result"]:
493                    indices = result_dict["result"]["unexpected_index_list"]
494                    values = result_dict["result"]["unexpected_list"]
495
496                # Technically, this shouldn't ever happen, but will keep as a failsafe in case many things go wrong
497                # because type validation is column aggregate expectation and not column map expectation when columns are not of object type,
498                # indices and values cannot be returned
499                else:
500                    # This changes in 0.17.x of GE, refactored code:
501                    # for i, item in enumerate(self.manifest[errColumn]):
502                    #     observed_type = result_dict.get("result", {}).get("observed_value", None)
503                    #     is_instance_type = observed_type is not None and isinstance(
504                    #         item, type_dict[observed_type]
505                    #     )
506                    #     indices.append(i) if is_instance_type else indices
507                    #     values.append(item) if is_instance_type else values
508                    for i, item in enumerate(self.manifest[errColumn]):
509                        observed_type = result_dict["result"]["observed_value"]
510                        indices.append(i) if isinstance(
511                            item, type_dict[observed_type]
512                        ) else indices
513                        values.append(item) if isinstance(
514                            item, type_dict[observed_type]
515                        ) else values
516
517                # call functions to generate error messages and add to error list
518                if validation_types[rule.split(" ")[0]]["type"] == "type_validation":
519                    for row, value in zip(indices, values):
520                        vr_errors, vr_warnings = GenerateError.generate_type_error(
521                            val_rule=rule,
522                            row_num=str(row + 2),
523                            attribute_name=errColumn,
524                            invalid_entry=str(value),
525                            dmge=dmge,
526                        )
527                        if vr_errors:
528                            errors.append(vr_errors)
529                        if vr_warnings:
530                            warnings.append(vr_warnings)
531                elif validation_types[rule.split(" ")[0]]["type"] == "regex_validation":
532                    expression = result_dict["expectation_config"]["kwargs"]["regex"]
533                    for row, value in zip(indices, values):
534                        vr_errors, vr_warnings = GenerateError.generate_regex_error(
535                            val_rule=rule,
536                            reg_expression=expression,
537                            row_num=str(row + 2),
538                            module_to_call="match",
539                            attribute_name=errColumn,
540                            invalid_entry=value,
541                            dmge=dmge,
542                        )
543                        if vr_errors:
544                            errors.append(vr_errors)
545                        if vr_warnings:
546                            warnings.append(vr_warnings)
547                elif (
548                    validation_types[rule.split(" ")[0]]["type"] == "content_validation"
549                ):
550                    vr_errors, vr_warnings = GenerateError.generate_content_error(
551                        val_rule=rule,
552                        attribute_name=errColumn,
553                        row_num=np_array_to_str_list(np.array(indices) + 2),
554                        invalid_entry=iterable_to_str_list(values),
555                        dmge=self.dmge,
556                    )
557                    if vr_errors:
558                        errors.append(vr_errors)
559                        if rule.startswith("protectAges"):
560                            self.censor_ages(vr_errors, errColumn)
561
562                    if vr_warnings:
563                        warnings.append(vr_warnings)
564                        if rule.startswith("protectAges"):
565                            self.censor_ages(vr_warnings, errColumn)
566
567        return errors, warnings
Purpose:

Parse results dictionary and generate errors for expectations

Input:

validation_results: dictionary of results for each expectation validation_types: dict of types of errors to generate for each validation rule errors: list of errors warnings: list of warnings

Returns:

errors: list of errors warnings: list of warnings self.manifest: manifest, possibly updated (censored ages)

def get_age_limits(self):
569    def get_age_limits(
570        self,
571    ):
572        """
573        Purpose:
574            Get boundaries of ages that need to be censored for different age formats
575        Input:
576        Returns:
577            min_age:
578                minimum age that will not be censored
579            max age:
580                maximum age that will not be censored
581
582        """
583
584        min_age = 6550  # days
585        max_age = 32849  # days
586
587        return min_age, max_age
Purpose:

Get boundaries of ages that need to be censored for different age formats

Input:

Returns:

min_age: minimum age that will not be censored max age: maximum age that will not be censored

def censor_ages(self, message: List, col: str):
589    def censor_ages(
590        self,
591        message: List,
592        col: str,
593    ):
594        """
595        Purpose:
596            Censor ages in manifest as appropriate
597        Input:
598            message:
599                error or warning message for age validation rule
600            col:
601                name of column containing ages
602        Returns:
603            updates self.manifest with censored ages
604        TODO: Speed up conversion from str list to int list
605        """
606        censor_rows = []
607
608        for row in message[0]:
609            censor_rows.append(int(row) - 2)
610
611        self.manifest.loc[censor_rows, (col)] = "age censored"
612
613        # update the manifest file, so that ages are censored
614        self.manifest.to_csv(
615            self.manifestPath.replace(".csv", "_censored.csv"), index=False
616        )
617        logging.info("Sensitive ages have been censored.")
618
619        return
Purpose:

Censor ages in manifest as appropriate

Input:

message: error or warning message for age validation rule col: name of column containing ages

Returns:

updates self.manifest with censored ages

TODO: Speed up conversion from str list to int list