Main Data Structures and classes
The Tsumugi project is built on top of classes generated by the protobuf compiler from protobuf messages. While the generated code is part of the Tsumugi python-client distribution, it is strongly recommended not to use these classes directly. From a developer's perspective, protobuf messages should be considered part of the internal API, not the public API. I cannot provide any guarantees that these messages will always remain backward compatible.
Analyzers
Each analyzer in the python-client is at the same moment a dataclass
but also it inherits a parent class AbstractAnalyzer
.
class AbstractAnalyzer(ABC):
"""Abstract class for all analyzers in tsumugi."""
@abstractmethod
def _to_proto(self) -> proto.Analyzer: ...
The majority of Tusumugi's python-client code serves as a thin facade over generated code. As a result, there is minimal implementation: each analyzer contains only the information necessary to create a corresponding protobuf message. The only private method is _to_proto(self) -> proto.Analyzer
, creates a message from the fields of the dataclass.
Analyzer Options
Most of the Deequ analyzers accept an AnalyzerOption
object as an optional argument. This object defines how the analyzer should handle NULL values in the checked columns and how it should operate if the user requests row-level results.
@dataclass
class AnalyzerOptions:
"""Container for Analyzer Options."""
null_behaviour: NullBehaviour
filtered_row_outcome: FilteredRowOutcome
null_behaviour
: should be one of:
IGNORE
: we just ignore nulls in the column;EMPTY_STRING
: we should pass empty string instead of NULL;FAIL
: our analyzer should fail if there is NULL;
filtered_row_outcome
: should be one of:
NULL
: in that case filtered rows should be marked as NULL;TRUE
: in that case filtered rows should be marked as BooleanTrue
;
The default implementation use NullBehaviour.IGNORE
and FilteredRowOutcome.NULL
values.
Where option
Most analyzers accept where
as an optional parameter. It should be an SQL-like predicate that is applied before the computation of the metric and comparison with the constraint. This can be useful, for example, when you need to compute metrics for different groups.
Let's imagine we heave the following data:
+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
| 5.1| 3.5| 1.4| 0.2| setosa|
| 4.9| 3.0| 1.4| 0.2| setosa|
| 4.7| 3.2| 1.3| 0.2| setosa|
| 4.6| 3.1| 1.5| 0.2| setosa|
| 5.0| 3.6| 1.4| 0.2| setosa|
| 5.4| 3.9| 1.7| 0.4| setosa|
| 4.6| 3.4| 1.4| 0.3| setosa|
| 5.0| 3.4| 1.5| 0.2| setosa|
| 4.4| 2.9| 1.4| 0.2| setosa|
| 4.9| 3.1| 1.5| 0.1| setosa|
| 5.4| 3.7| 1.5| 0.2| setosa|
| 4.8| 3.4| 1.6| 0.2| setosa|
| 4.8| 3.0| 1.4| 0.1| setosa|
| 4.3| 3.0| 1.1| 0.1| setosa|
| 5.8| 4.0| 1.2| 0.2| setosa|
| 5.7| 4.4| 1.5| 0.4| setosa|
| 5.4| 3.9| 1.3| 0.4| setosa|
| 5.1| 3.5| 1.4| 0.3| setosa|
| 5.7| 3.8| 1.7| 0.3| setosa|
| 5.1| 3.8| 1.5| 0.3| setosa|
+------------+-----------+------------+-----------+-------+
It is a well-know "Irir Dataset" (Fisher, Ronald A. "The use of multiple measurements in taxonomic problems." Annals of eugenics 7.2 (1936): 179-188., link). It contains data about different kinds of irises.
Our goal will be to perform data profiling on this dataset, not for all rows together, but based on the different species.
from tsumugi.verification import VerificationSuite
from tsumugi.analyzers import Mean
from tsumugi.checks import CheckBuilder
suite = (
VerificationSuite
.on_data(iris)
.add_required_analyzer(
Mean(column="sepal_length", where="species = 'setosa'")
)
.add_required_analyzer(
Mean(column="sepal_length", where="species = 'versicolor'")
)
)
result = suite.run()
As a a result we will ge the following values for each metric:
entity instance name value
0 Column sepal_length Mean (where: species = 'setosa') 5.006
1 Column sepal_length Mean (where: species = 'versicolor') 5.936
2 Column sepal_length Mean (where: species = 'virginica') 6.588
Constraints
Constraints in Tsumugi are a combination of an analyzer, assertion, and additional metadata. To create a new constraint object, it is highly recommended to use the ConstraintBuilder
, which is a helper class with user-friendly methods. Under the hood, this class will create a new proto class with a constraint.
class ConstraintBuilder:
def __init__(self) -> None:
def for_analyzer(self, analyzer: AbstractAnalyzer) -> Self:
"""Set an analyzer."""
def with_name(self, name: str) -> Self:
"""Set a name of the constraint."""
def with_hint(self, hint: str) -> Self:
"""Set a hint for the constraint.
Hint can be helpful in the case when one needs
to realize the reason of the constraint or why did it fail.
"""
def should_be_gt_than(self, value) -> Self:
def should_be_geq_than(self, value) -> Self:
def should_be_eq_to(self, value) -> Self:
def should_be_lt_than(self, value) -> Self:
def should_be_leq_than(self, value) -> Self:
def build(self) -> suite.Check.Constraint:
WARNING: The should_be_*
methods add a constraint value and a comparison sign to the proto message. These methods are type-sensitive! They work slightly differently depending on the type passed. For example, passing a float
constraint for a Size
analyzer that computes a Long
metric under the hood will always return False
because, for the JVM, 1 != 1.0
!
Alternative definition of constraints
Alternatively, users can call a syntactic sugar method from the CheckBuilder
class, which we will explain in detail later. For example, the hasSize
method is simply a syntactic sugar on top of the definition of the Constraint
for the Size
analyzer, where the metric should be equal to the passed value:
def has_size(
self, expected_size: int, hint: str = "", name: str | None = None
) -> Self:
"""Add a constraint that the DataFrame has size like expected."""
return self.with_constraint(
ConstraintBuilder()
.for_analyzer(Size())
.with_hint(hint)
.with_name(name or "Size")
.should_be_eq_to(expected_size)
.build()
)
But not all the possible constraints can be created in this way. You may expecte more syntax sugar like this in the future versions.
Checks
Although Check
is a top-level structure in Deequ, it is not directly exposed in Tsumugi. Instead, it is recommended to use CheckBuilder
. This helper class allows for easy generation of the completed proto.suite.Check
object, which is actually a protobuf message class.
Level
Each check has its own severity level. Currently, upstream Deequ only supports Error
and Warning
levels, with Warning
being the default in both Tsumugi and upstream Deequ. You can modify the severity level of a Check
in the builder by calling the method with_level(level)
, where level
is a value from the CheckLevel
enumeration.
Description
Each check can have its own description that will be exported to the results. The description is simply a regular string. You can modify it using the with_description(description)
method of the CheckBuilder
class.
Constraints
CheckBuilder
provides two generic methods for adding constraints:
with_constraint(constraint)
: This method expects an already builtConstraint
and adds it to the list of constraints.with_constraints(constraints)
: This method expects a collection of built constraints. It's important to note that this method replaces any existing constraints!
Additionally, CheckBuilder
provides a lot of syntactic sugar, such as has_size
or is_complete
. These methods offer a simpler approach to adding constraints. However, the trade-off is that not all possible constraints have a corresponding syntactic sugar method.
VerificationSuite
The VerificationSuite
is not directly accessible in Tsumugi. Instead, it is recommended to use the VerificationRunBuilder
. You can create a builder either by directly providing a DataFrame
or by calling the backward-compatibility helper VerificationSuite.on_date(df)
, which mimics the behavior of python-deequ
.
Row-level results
Row-level results are not returned by default. To request them, simply call the with_row_level_results()
method of the builder. Otherwise, the row_level_result
attribute of the object returned by Tsumugi will be None
.
Required analyzers
There are two methods for adding required analyzers. The first is add_required_analyzers(analyzers)
, which replaces all existing required analyzers with values from the provided collection. The second method is add_required_analyzer
, which appends an additional analyzer to the existing ones. Both methods expect an AbstractAnalyzer
object, so users don't need to manually create protobuf classes from analyzers.
Checks
The same two methods are provided for adding checks:
add_check(check)
appends a single check to the suiteadd_checks(checks)
replace all the already added checks by the provided collection
When adding a Check
, it is expected to be a protobuf class. Therefore, users need to explicitly call CheckBuilder()....build()
themselves.
Anomaly Detection
Similar to required analyzers and checks, two methods are provided: one for adding a single anomaly detection case, and another for replacing all anomaly detection cases with the provided collection:
add_anomaly_detection
add_anomaly_detections
Key and repository
By design of the upstream Deequ, any anomaly detection requires both a repository and a dataset key. There are two separate methods for this in the VerificationRunBuilder
. One method adds a FileSystemRepository
that should be a path to the JSON file, while the other adds a SparkTableRepository
that expects a Spark table name.
def with_fs_repository_and_key(
self,
filepath: str,
dataset_date: int,
dataset_tags: dict[str, str] | None = None,
) -> Self:
"""Add a FileSystem repository and date and tags for the ResultKey."""
def with_table_repository_and_key(
self,
table_name: str,
dataset_date: int,
dateset_tags: dict[str, str] | None = None,
) -> Self:
"""Add a Table repository and date and tags for the ResultKey."""
The dataset_date
is the most crucial element here, as it serves as the value Deequ will use to filter the required time series data of metrics for anomaly detection. This value should be a valid Java Long
(less than or equal to 9,223,372,036,854,775,807
). Passing a larger value may lead to unexpected behavior. It is also not recommended to use negative values, as by design and logic, it should represent something similar to a timestamp.
The dataset_tags
parameter is not used within Deequ itself but is exposed to the output repository. If you plan to use the repository directly (as a table or as JSON), you can include additional information here.
Anomaly Detection Builder
As described in the concepts section, anomaly detection is a combination of an analyzer, strategy, and additional metadata. To simplify the creation of AD-checks, an AnomalyDetectionBuilder
is provided. Calling build()
on this builder will generate a complete protobuf class for the defined anomaly detection case.
class AnomalyDetectionBuilder:
"""Helper object to build AnomalyDetection check."""
def for_analyzer(self, analyzer: AbstractAnalyzer) -> Self:
"""Add an analyzer."""
def with_strategy(self, strategy: AbstractStrategy) -> Self:
"""Add a strategy."""
def with_check_level(self, level: CheckLevel) -> Self:
"""Set a severity level."""
def with_description(self, description: str) -> Self:
"""Add a description."""
def with_tags(self, tags: dict[str, str]) -> Self:
"""Add tags."""
def after_date(self, dt: int) -> Self:
"""Set a minimal dataset date value
This value will be used to filter out part of
the timeseries of metrics.
"""
def before_date(self, dt: int) -> Self:
"""Set a maximal dataset date value
This value will be used to filter out part of
the timeseries of metrics.
"""
All the strategies are implemented in the same way as analyzers. They are actually just dataclasses with a single inherited private method that converts a dataclass to the corresponding protobuf class.
Note: This is not a bug, but a feature of upstream Deequ. If you run your anomaly detection case with the same dataset_tag
twice in a row, the first run will be compared to a previous run, but the second run will be compared with the first run. To avoid this behavior, add any value less than your dataset_tag
to the before_date
parameter in the builder.