Basic usage¶
This notebook contains a minimal example of how Tsumugi can be used with PySpark Connect.
In [1]:
Copied!
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
In [2]:
Copied!
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
In [3]:
Copied!
test_data = spark.createDataFrame(
[
["foo", 1, 4],
["bar", 2, 6],
["baz", 3, None],
],
schema="struct<a:string, b:int, c:int>"
)
test_data = spark.createDataFrame(
[
["foo", 1, 4],
["bar", 2, 6],
["baz", 3, None],
],
schema="struct"
)
In [4]:
Copied!
test_data.show()
test_data.show()
+---+---+----+ | a| b| c| +---+---+----+ |foo| 1| 4| |bar| 2| 6| |baz| 3|NULL| +---+---+----+
In [5]:
Copied!
from tsumugi.verification import VerificationSuite
from tsumugi.analyzers import Size, Minimum, Completeness, CustomSql, ConstraintBuilder
from tsumugi.checks import CheckBuilder
from tsumugi.verification import VerificationSuite
from tsumugi.analyzers import Size, Minimum, Completeness, CustomSql, ConstraintBuilder
from tsumugi.checks import CheckBuilder
In [6]:
Copied!
suite = (
VerificationSuite
.on_data(test_data) # add DataFrame
.with_row_level_results() # mark that row-level results are required
.add_check(
CheckBuilder()
.with_description("Basic checks")
.has_size(expected_size=3, name="Size(*)")
.is_primary_key(column="b", name="col(b) should be PK-like")
.build()
)
.add_check(
CheckBuilder()
.with_description("Additional checks")
.is_complete(column="c", name="col(c) should be complete")
.with_constraint(
ConstraintBuilder()
.for_analyzer(Minimum(column="b"))
.should_be_eq_to(0.0)
.build()
)
.build()
)
)
suite = (
VerificationSuite
.on_data(test_data) # add DataFrame
.with_row_level_results() # mark that row-level results are required
.add_check(
CheckBuilder()
.with_description("Basic checks")
.has_size(expected_size=3, name="Size(*)")
.is_primary_key(column="b", name="col(b) should be PK-like")
.build()
)
.add_check(
CheckBuilder()
.with_description("Additional checks")
.is_complete(column="c", name="col(c) should be complete")
.with_constraint(
ConstraintBuilder()
.for_analyzer(Minimum(column="b"))
.should_be_eq_to(0.0)
.build()
)
.build()
)
)
In [7]:
Copied!
result = suite.run()
result = suite.run()
In [8]:
Copied!
result.check_results_as_pandas()
result.check_results_as_pandas()
Out[8]:
level | check_description | constraint_message | metric_name | metric_instance | metric_entity | metric_value | status | constraint | |
---|---|---|---|---|---|---|---|---|---|
0 | Warning | Basic checks | Size | * | Dataset | 3.000000 | Success | SizeConstraint(Size(None)) | |
1 | Warning | Basic checks | Uniqueness | b | Column | 1.000000 | Success | UniquenessConstraint(Uniqueness(Buffer(b),None... | |
2 | Warning | Basic checks | Completeness | b | Column | 1.000000 | Success | CompletenessConstraint(Completeness(b,None,Som... | |
3 | Warning | Additional checks | Value: 0.6666666666666666 does not meet the co... | Completeness | c | Column | 0.666667 | Failure | CompletenessConstraint(Completeness(c,None,Som... |
4 | Warning | Additional checks | Value: 1.0 does not meet the constraint requir... | Minimum | b | Column | 1.000000 | Failure | MinimumConstraint(Minimum(b,None,Some(Analyzer... |
In [9]:
Copied!
result.checks_as_pandas()
result.checks_as_pandas()
Out[9]:
check | check_level | check_status | constraint | constraint_status | constraint_message | |
---|---|---|---|---|---|---|
0 | Basic checks | Warning | Success | SizeConstraint(Size(None)) | Success | |
1 | Basic checks | Warning | Success | UniquenessConstraint(Uniqueness(Buffer(b),None... | Success | |
2 | Basic checks | Warning | Success | CompletenessConstraint(Completeness(b,None,Som... | Success | |
3 | Additional checks | Warning | Warning | CompletenessConstraint(Completeness(c,None,Som... | Failure | Value: 0.6666666666666666 does not meet the co... |
4 | Additional checks | Warning | Warning | MinimumConstraint(Minimum(b,None,Some(Analyzer... | Failure | Value: 1.0 does not meet the constraint requir... |
In [10]:
Copied!
result.metrics_as_pandas()
result.metrics_as_pandas()
Out[10]:
entity | instance | name | value | |
---|---|---|---|---|
0 | Column | c | Completeness | 0.666667 |
1 | Column | b | Uniqueness | 1.000000 |
2 | Dataset | * | Size | 3.000000 |
3 | Column | b | Completeness | 1.000000 |
4 | Column | b | Minimum | 1.000000 |
In [11]:
Copied!
result.row_level_results.toPandas()
result.row_level_results.toPandas()
Out[11]:
a | b | c | Basic checks | Additional checks | |
---|---|---|---|---|---|
0 | foo | 1 | 4.0 | True | False |
1 | bar | 2 | 6.0 | True | False |
2 | baz | 3 | NaN | True | False |