Skip to content

Main Data Structures and classes

The Tsumugi project is built on top of classes generated by the protobuf compiler from protobuf messages. While the generated code is part of the Tsumugi python-client distribution, it is strongly recommended not to use these classes directly. From a developer's perspective, protobuf messages should be considered part of the internal API, not the public API. I cannot provide any guarantees that these messages will always remain backward compatible.

Analyzers

Each analyzer in the python-client is at the same moment a dataclass but also it inherits a parent class AbstractAnalyzer.

  class AbstractAnalyzer(ABC):
      """Abstract class for all analyzers in tsumugi."""

      @abstractmethod
      def _to_proto(self) -> proto.Analyzer: ...

The majority of Tusumugi's python-client code serves as a thin facade over generated code. As a result, there is minimal implementation: each analyzer contains only the information necessary to create a corresponding protobuf message. The only private method is _to_proto(self) -> proto.Analyzer, creates a message from the fields of the dataclass.

Analyzer Options

Most of the Deequ analyzers accept an AnalyzerOption object as an optional argument. This object defines how the analyzer should handle NULL values in the checked columns and how it should operate if the user requests row-level results.

  @dataclass
  class AnalyzerOptions:
      """Container for Analyzer Options."""

      null_behaviour: NullBehaviour
      filtered_row_outcome: FilteredRowOutcome

null_behaviour: should be one of:

  • IGNORE: we just ignore nulls in the column;
  • EMPTY_STRING: we should pass empty string instead of NULL;
  • FAIL: our analyzer should fail if there is NULL;

filtered_row_outcome: should be one of:

  • NULL: in that case filtered rows should be marked as NULL;
  • TRUE: in that case filtered rows should be marked as Boolean True;

The default implementation use NullBehaviour.IGNORE and FilteredRowOutcome.NULL values.

Where option

Most analyzers accept where as an optional parameter. It should be an SQL-like predicate that is applied before the computation of the metric and comparison with the constraint. This can be useful, for example, when you need to compute metrics for different groups.

Let's imagine we heave the following data:

  +------------+-----------+------------+-----------+-------+
  |sepal_length|sepal_width|petal_length|petal_width|species|
  +------------+-----------+------------+-----------+-------+
  |         5.1|        3.5|         1.4|        0.2| setosa|
  |         4.9|        3.0|         1.4|        0.2| setosa|
  |         4.7|        3.2|         1.3|        0.2| setosa|
  |         4.6|        3.1|         1.5|        0.2| setosa|
  |         5.0|        3.6|         1.4|        0.2| setosa|
  |         5.4|        3.9|         1.7|        0.4| setosa|
  |         4.6|        3.4|         1.4|        0.3| setosa|
  |         5.0|        3.4|         1.5|        0.2| setosa|
  |         4.4|        2.9|         1.4|        0.2| setosa|
  |         4.9|        3.1|         1.5|        0.1| setosa|
  |         5.4|        3.7|         1.5|        0.2| setosa|
  |         4.8|        3.4|         1.6|        0.2| setosa|
  |         4.8|        3.0|         1.4|        0.1| setosa|
  |         4.3|        3.0|         1.1|        0.1| setosa|
  |         5.8|        4.0|         1.2|        0.2| setosa|
  |         5.7|        4.4|         1.5|        0.4| setosa|
  |         5.4|        3.9|         1.3|        0.4| setosa|
  |         5.1|        3.5|         1.4|        0.3| setosa|
  |         5.7|        3.8|         1.7|        0.3| setosa|
  |         5.1|        3.8|         1.5|        0.3| setosa|
  +------------+-----------+------------+-----------+-------+

It is a well-know "Irir Dataset" (Fisher, Ronald A. "The use of multiple measurements in taxonomic problems." Annals of eugenics 7.2 (1936): 179-188., link). It contains data about different kinds of irises.

Our goal will be to perform data profiling on this dataset, not for all rows together, but based on the different species.

  from tsumugi.verification import VerificationSuite
  from tsumugi.analyzers import Mean
  from tsumugi.checks import CheckBuilder

  suite = (
      VerificationSuite
      .on_data(iris)
      .add_required_analyzer(
          Mean(column="sepal_length", where="species = 'setosa'")
      )
      .add_required_analyzer(
          Mean(column="sepal_length", where="species = 'versicolor'")
      )
  )

  result = suite.run()

As a a result we will ge the following values for each metric:

     entity      instance                                  name  value
  0  Column  sepal_length      Mean (where: species = 'setosa')  5.006
  1  Column  sepal_length  Mean (where: species = 'versicolor')  5.936
  2  Column  sepal_length   Mean (where: species = 'virginica')  6.588

Constraints

Constraints in Tsumugi are a combination of an analyzer, assertion, and additional metadata. To create a new constraint object, it is highly recommended to use the ConstraintBuilder, which is a helper class with user-friendly methods. Under the hood, this class will create a new proto class with a constraint.

  class ConstraintBuilder:
      def __init__(self) -> None:

      def for_analyzer(self, analyzer: AbstractAnalyzer) -> Self:
          """Set an analyzer."""

      def with_name(self, name: str) -> Self:
          """Set a name of the constraint."""

      def with_hint(self, hint: str) -> Self:
          """Set a hint for the constraint.

          Hint can be helpful in the case when one needs
          to realize the reason of the constraint or why did it fail.
          """

      def should_be_gt_than(self, value) -> Self:
      def should_be_geq_than(self, value) -> Self:
      def should_be_eq_to(self, value) -> Self:
      def should_be_lt_than(self, value) -> Self:
      def should_be_leq_than(self, value) -> Self:

      def build(self) -> suite.Check.Constraint:

WARNING: The should_be_* methods add a constraint value and a comparison sign to the proto message. These methods are type-sensitive! They work slightly differently depending on the type passed. For example, passing a float constraint for a Size analyzer that computes a Long metric under the hood will always return False because, for the JVM, 1 != 1.0!

Alternative definition of constraints

Alternatively, users can call a syntactic sugar method from the CheckBuilder class, which we will explain in detail later. For example, the hasSize method is simply a syntactic sugar on top of the definition of the Constraint for the Size analyzer, where the metric should be equal to the passed value:

  def has_size(
      self, expected_size: int, hint: str = "", name: str | None = None
  ) -> Self:
      """Add a constraint that the DataFrame has size like expected."""
      return self.with_constraint(
          ConstraintBuilder()
          .for_analyzer(Size())
          .with_hint(hint)
          .with_name(name or "Size")
          .should_be_eq_to(expected_size)
          .build()
      )

But not all the possible constraints can be created in this way. You may expecte more syntax sugar like this in the future versions.

Checks

Although Check is a top-level structure in Deequ, it is not directly exposed in Tsumugi. Instead, it is recommended to use CheckBuilder. This helper class allows for easy generation of the completed proto.suite.Check object, which is actually a protobuf message class.

Level

Each check has its own severity level. Currently, upstream Deequ only supports Error and Warning levels, with Warning being the default in both Tsumugi and upstream Deequ. You can modify the severity level of a Check in the builder by calling the method with_level(level), where level is a value from the CheckLevel enumeration.

Description

Each check can have its own description that will be exported to the results. The description is simply a regular string. You can modify it using the with_description(description) method of the CheckBuilder class.

Constraints

CheckBuilder provides two generic methods for adding constraints:

  1. with_constraint(constraint): This method expects an already built Constraint and adds it to the list of constraints.
  2. with_constraints(constraints): This method expects a collection of built constraints. It's important to note that this method replaces any existing constraints!

Additionally, CheckBuilder provides a lot of syntactic sugar, such as has_size or is_complete. These methods offer a simpler approach to adding constraints. However, the trade-off is that not all possible constraints have a corresponding syntactic sugar method.

VerificationSuite

The VerificationSuite is not directly accessible in Tsumugi. Instead, it is recommended to use the VerificationRunBuilder. You can create a builder either by directly providing a DataFrame or by calling the backward-compatibility helper VerificationSuite.on_date(df), which mimics the behavior of python-deequ.

Row-level results

Row-level results are not returned by default. To request them, simply call the with_row_level_results() method of the builder. Otherwise, the row_level_result attribute of the object returned by Tsumugi will be None.

Required analyzers

There are two methods for adding required analyzers. The first is add_required_analyzers(analyzers), which replaces all existing required analyzers with values from the provided collection. The second method is add_required_analyzer, which appends an additional analyzer to the existing ones. Both methods expect an AbstractAnalyzer object, so users don't need to manually create protobuf classes from analyzers.

Checks

The same two methods are provided for adding checks:

  • add_check(check) appends a single check to the suite
  • add_checks(checks) replace all the already added checks by the provided collection

When adding a Check, it is expected to be a protobuf class. Therefore, users need to explicitly call CheckBuilder()....build() themselves.

Anomaly Detection

Similar to required analyzers and checks, two methods are provided: one for adding a single anomaly detection case, and another for replacing all anomaly detection cases with the provided collection:

  • add_anomaly_detection
  • add_anomaly_detections

Key and repository

By design of the upstream Deequ, any anomaly detection requires both a repository and a dataset key. There are two separate methods for this in the VerificationRunBuilder. One method adds a FileSystemRepository that should be a path to the JSON file, while the other adds a SparkTableRepository that expects a Spark table name.

  def with_fs_repository_and_key(
      self,
      filepath: str,
      dataset_date: int,
      dataset_tags: dict[str, str] | None = None,
  ) -> Self:
      """Add a FileSystem repository and date and tags for the ResultKey."""

  def with_table_repository_and_key(
      self,
      table_name: str,
      dataset_date: int,
      dateset_tags: dict[str, str] | None = None,
  ) -> Self:
      """Add a Table repository and date and tags for the ResultKey."""

The dataset_date is the most crucial element here, as it serves as the value Deequ will use to filter the required time series data of metrics for anomaly detection. This value should be a valid Java Long (less than or equal to 9,223,372,036,854,775,807). Passing a larger value may lead to unexpected behavior. It is also not recommended to use negative values, as by design and logic, it should represent something similar to a timestamp.

The dataset_tags parameter is not used within Deequ itself but is exposed to the output repository. If you plan to use the repository directly (as a table or as JSON), you can include additional information here.

Anomaly Detection Builder

As described in the concepts section, anomaly detection is a combination of an analyzer, strategy, and additional metadata. To simplify the creation of AD-checks, an AnomalyDetectionBuilder is provided. Calling build() on this builder will generate a complete protobuf class for the defined anomaly detection case.

  class AnomalyDetectionBuilder:
      """Helper object to build AnomalyDetection check."""

      def for_analyzer(self, analyzer: AbstractAnalyzer) -> Self:
          """Add an analyzer."""

      def with_strategy(self, strategy: AbstractStrategy) -> Self:
          """Add a strategy."""

      def with_check_level(self, level: CheckLevel) -> Self:
          """Set a severity level."""

      def with_description(self, description: str) -> Self:
          """Add a description."""

      def with_tags(self, tags: dict[str, str]) -> Self:
          """Add tags."""

      def after_date(self, dt: int) -> Self:
          """Set a minimal dataset date value

          This value will be used to filter out part of
          the timeseries of metrics.
          """

      def before_date(self, dt: int) -> Self:
          """Set a maximal dataset date value

          This value will be used to filter out part of
          the timeseries of metrics.
          """

All the strategies are implemented in the same way as analyzers. They are actually just dataclasses with a single inherited private method that converts a dataclass to the corresponding protobuf class.

Note: This is not a bug, but a feature of upstream Deequ. If you run your anomaly detection case with the same dataset_tag twice in a row, the first run will be compared to a previous run, but the second run will be compared with the first run. To avoid this behavior, add any value less than your dataset_tag to the before_date parameter in the builder.