Upstream Data Source Validation
If you want to run data validations based on data generated or data from another data source, you can use the upstream
data source validations. An example would be generating a Parquet file that gets ingested by a job and inserted into
Postgres. The validations can then check for each account_id
generated in the Parquet, it exists in account_number
column in Postgres. The validations can be chained with basic and group by validations or even other upstream data
sources, to cover any complex validations.
Basic join
Join across datasets by particular columns. Then run validations on the joined dataset. You will notice that the data
source name is appended onto the column names when joined (i.e. my_first_json_customer_details
), to ensure column
names do not clash and make it obvious which columns are being validated.
In the below example, we check that the for the same account_id
, then customer_details.name
in the my_first_json
dataset should equal to the name
column in the my_second_json
.
var firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("customer_details")
.schema(
field().name("name").expression("#{Name.name}")
)
);
var secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation().upstreamData(firstJsonTask)
.joinColumns("account_id")
.withValidation(
validation().col("my_first_json_customer_details.name")
.isEqualCol("name")
)
);
val firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("customer_details")
.schema(
field.name("name").expression("#{Name.name}")
)
)
val secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation.upstreamData(firstJsonTask)
.joinColumns("account_id")
.withValidation(
validation.col("my_first_json_customer_details.name")
.isEqualCol("name")
)
)
Join expression
Define join expression to link two datasets together. This can be any SQL expression that returns a boolean value. Useful in situations where join is based on transformations or complex logic.
In the below example, we have to use CONCAT
SQL function to combine 'ACC'
and account_number
to join with
account_id
column in my_first_json
dataset.
var firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("customer_details")
.schema(
field().name("name").expression("#{Name.name}")
)
);
var secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation().upstreamData(firstJsonTask)
.joinExpr("my_first_json_account_id == CONCAT('ACC', account_number)")
.withValidation(
validation().col("my_first_json_customer_details.name")
.isEqualCol("name")
)
);
val firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("customer_details")
.schema(
field.name("name").expression("#{Name.name}")
)
)
val secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation.upstreamData(firstJsonTask)
.joinExpr("my_first_json_account_id == CONCAT('ACC', account_number)")
.withValidation(
validation.col("my_first_json_customer_details.name")
.isEqualCol("name")
)
)
Different join type
By default, an outer join is used to gather columns from both datasets together for validation. But there may be scenarios where you want to control the join type.
Possible join types include: - inner - outer, full, fullouter, full_outer - leftouter, left, left_outer - rightouter, right, right_outer - leftsemi, left_semi, semi - leftanti, left_anti, anti - cross
In the example below, we do an anti
join by column account_id
and check if there are no records. This essentially
checks that all account_id
's from my_second_json
exist in my_first_json
. The second validation also does something
similar but does an outer
join (by default) and checks that the joined dataset has 30 records.
var firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("customer_details")
.schema(
field().name("name").expression("#{Name.name}")
)
);
var secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation().upstreamData(firstJsonTask)
.joinColumns("account_id")
.joinType("anti")
.withValidation(validation().count().isEqual(0)),
validation().upstreamData(firstJsonTask)
.joinColumns("account_id")
.withValidation(validation().count().isEqual(30))
);
val firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("customer_details")
.schema(
field.name("name").expression("#{Name.name}")
)
)
val secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation.upstreamData(firstJsonTask)
.joinColumns("account_id")
.joinType("anti")
.withValidation(validation.count().isEqual(0)),
validation.upstreamData(firstJsonTask)
.joinColumns("account_id")
.withValidation(validation.count().isEqual(30))
)
Join then group by validation
We can apply aggregate or group by validations to the resulting joined dataset as the withValidation
method accepts
any type of validation.
Here we group by account_id, my_first_json_balance
to check that when the amount
field is summed up per group, it is
between 0.8 and 1.2 times the balance.
var firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("balance").type(DoubleType.instance()).min(10).max(1000),
field().name("customer_details")
.schema(
field().name("name").expression("#{Name.name}")
)
);
var secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation().upstreamData(firstJsonTask).joinColumns("account_id")
.withValidation(
validation().groupBy("account_id", "my_first_json_balance")
.sum("amount")
.betweenCol("my_first_json_balance * 0.8", "my_first_json_balance * 1.2")
)
);
val firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("balance").`type`(DoubleType).min(10).max(1000),
field.name("customer_details")
.schema(
field.name("name").expression("#{Name.name}")
)
)
val secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation.upstreamData(firstJsonTask).joinColumns("account_id")
.withValidation(
validation.groupBy("account_id", "my_first_json_balance")
.sum("amount")
.betweenCol("my_first_json_balance * 0.8", "my_first_json_balance * 1.2")
)
)
Chained validations
Given that the withValidation
method accepts any other type of validation, you can chain other upstream data sources
with it. Here we will show a third upstream data source being checked to ensure 30 records exists after joining them
together by account_id
.
var firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field().name("account_id").regex("ACC[0-9]{8}"),
field().name("balance").type(DoubleType.instance()).min(10).max(1000),
field().name("customer_details")
.schema(
field().name("name").expression("#{Name.name}")
)
)
.count(count().records(10));
var thirdJsonTask = json("my_third_json", "/tmp/data/third_json")
.schema(
field().name("account_id"),
field().name("amount").type(IntegerType.instance()).min(1).max(100),
field().name("name").expression("#{Name.name}")
)
.count(count().records(10));
var secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation().upstreamData(firstJsonTask)
.joinColumns("account_id")
.withValidation(
validation().upstreamData(thirdJsonTask)
.joinColumns("account_id")
.withValidation(validation().count().isEqual(30))
)
);
val firstJsonTask = json("my_first_json", "/tmp/data/first_json")
.schema(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("balance").`type`(DoubleType).min(10).max(1000),
field.name("customer_details")
.schema(
field.name("name").expression("#{Name.name}")
)
)
.count(count.records(10))
val thirdJsonTask = json("my_third_json", "/tmp/data/third_json")
.schema(
field.name("account_id"),
field.name("amount").`type`(IntegerType).min(1).max(100),
field.name("name").expression("#{Name.name}"),
)
.count(count.records(10))
val secondJsonTask = json("my_second_json", "/tmp/data/second_json")
.validations(
validation.upstreamData(firstJsonTask).joinColumns("account_id")
.withValidation(
validation.groupBy("account_id", "my_first_json_balance")
.sum("amount")
.betweenCol("my_first_json_balance * 0.8", "my_first_json_balance * 1.2")
),
)