| Crates.io | datafusion-quality |
| lib.rs | datafusion-quality |
| version | 0.1.1 |
| created_at | 2025-04-22 10:45:14.745603+00 |
| updated_at | 2025-04-22 10:52:06.919889+00 |
| description | Data quality tools for DataFusion |
| homepage | https://github.com/eadgbear/datafusion_quality |
| repository | |
| max_upload_size | |
| id | 1643869 |
| size | 222,051 |
A data quality framework for DataFusion, inspired by Great Expectations and Spark Expectations.
Add the following to your Cargo.toml:
[dependencies]
datafusion-quality = "0.1.0"
You can use either the traditional API or the new fluent API for creating rules. For a complete example, see the basic example.
use datafusion_quality::{rules::{column::{dfq_in_range, dfq_not_null}, dfq_gt}, RuleSet};
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a new DataFusion context
let ctx = SessionContext::new();
// Create a sample DataFrame
let df = ctx.read_csv("data.csv", CsvReadOptions::new()).await?;
// Create a new RuleSet instance
let mut rule_set = RuleSet::new();
// Add rules using fluent API
rule_set.with_column_rule("name", dfq_not_null())
.with_column_rule("age", dfq_in_range(18.0, 100.0))
.with_column_rule("score", dfq_gt(lit(50.0)));
// Apply rules
let result_df = rule_set.apply(&df).await?;
// Show the results
result_df.show().await?;
// Partition data into good and bad records
let (good_data, bad_data) = rule_set.partition(&df).await?;
Ok(())
}
use dfq::{RuleSet, rules::{column::{NotNullRule, RangeRule, PatternRule, CustomRule}}};
use datafusion::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a new DataFusion context
let ctx = SessionContext::new();
// Create a sample DataFrame
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
Field::new("email", DataType::Utf8, false),
]);
let df = ctx.read_csv("data.csv", CsvReadOptions::new().schema(&schema)).await?;
// Create a new RuleSet instance
let mut rule_set = RuleSet::new();
// Add rules
rule_set.add_column_rule(Arc::new(NotNullRule::new("name")));
rule_set.add_column_rule(Arc::new(RangeRule::new("age", 18.0, 100.0)));
rule_set.add_column_rule(Arc::new(PatternRule::new("email", "%@%.%")));
rule_set.add_column_rule(Arc::new(CustomRule::new("age", "age > 25")));
// Apply rules
let result_df = rule_set.apply(&df).await?;
// Show the results
result_df.show().await?;
Ok(())
}
dfq_not_null(): Checks if values in a column are not nulldfq_null(): Checks if values in a column are nulldfq_in_range(min, max): Checks if values in a column fall within a specified rangedfq_not_in_range(min, max): Checks if values in a column fall outside a specified rangedfq_like(pattern): Checks if string values match a case-sensitive patterndfq_not_like(pattern): Checks if string values do not match a case-sensitive patterndfq_ilike(pattern): Checks if string values match a case-insensitive patterndfq_not_ilike(pattern): Checks if string values do not match a case-insensitive patterndfq_lt(value): Checks if values are less than a specified valuedfq_lte(value): Checks if values are less than or equal to a specified valuedfq_not_lt(value): Checks if values are not less than a specified valuedfq_not_lte(value): Checks if values are not less than or equal to a specified valuedfq_gt(value): Checks if values are greater than a specified valuedfq_gte(value): Checks if values are greater than or equal to a specified valuedfq_not_gt(value): Checks if values are not greater than a specified valuedfq_not_gte(value): Checks if values are not greater than or equal to a specified valuedfq_eq(value): Checks if values are equal to a specified valuedfq_not_eq(value): Checks if values are not equal to a specified valuedfq_str_length(min, max): Checks if string length is within specified boundsdfq_str_min_length(min): Checks if string length is at least the specified minimumdfq_str_max_length(max): Checks if string length is at most the specified maximumdfq_str_empty(): Checks if strings are emptydfq_str_not_empty(): Checks if strings are not emptydfq_custom(rule_name, expression): Applies a custom SQL expression to a columndfq_null_count(): Counts the number of null values in a columndfq_not_null_count(): Counts the number of non-null values in a columndfq_count(): Counts the total number of rows in a columndfq_count_distinct(): Counts the number of distinct values in a columndfq_avg(): Calculates the average value of a columndfq_stddev(): Calculates the standard deviation of a columndfq_max(): Finds the maximum value in a columndfq_min(): Finds the minimum value in a columndfq_sum(): Calculates the sum of values in a columndfq_median(): Calculates the median value of a columndfq_last_value(): Gets the last value in a columndfq_stddev_pop(): Calculates the population standard deviation of a columndfq_var_pop(): Calculates the population variance of a columndfq_var_samp(): Calculates the sample variance of a columndfq_covar_pop(x, y): Calculates the population covariance between two columnsdfq_covar_samp(x, y): Calculates the sample covariance between two columnsdfq_regr_avgx(x, y): Calculates the average of x values in a linear regressiondfq_regr_avgy(x, y): Calculates the average of y values in a linear regressiondfq_regr_count(x, y): Counts the number of rows used in a linear regressiondfq_regr_intercept(x, y): Calculates the intercept of a linear regressiondfq_regr_r2(x, y): Calculates the R-squared value of a linear regressiondfq_regr_slope(x, y): Calculates the slope of a linear regressiondfq_regr_sxx(x, y): Calculates the sum of squared deviations from the mean for x valuesdfq_regr_sxy(x, y): Calculates the sum of products of deviations from the mean for x and y valuesdfq_regr_syy(x, y): Calculates the sum of squared deviations from the mean for y valuesdfq_nth_value(n, sort_exprs): Gets the nth value in a column with optional sortingdfq_first_value(sort_exprs): Gets the first value in a column with optional sortingdfq_custom_agg(aggregation, rule_name): Creates a custom aggregation rule with a specified expression and nameColumnExistsRule: Checks if a column exists in the schemaColumnTypeRule: Checks if a column has a specific data typeColumnNullableRule: Checks if a column is nullableYou can create custom rules by implementing the appropriate trait (ColumnRule, TableRule, or SchemaRule):
use dfq::{ColumnRule, ValidationError};
use datafusion::prelude::*;
pub struct CustomColumnRule {
column_name: String,
expression: String,
}
impl CustomColumnRule {
pub fn new(column_name: &str, expression: &str) -> Self {
Self {
column_name: column_name.to_string(),
expression: expression.to_string(),
}
}
}
impl ColumnRule for CustomColumnRule {
fn apply(&self, df: &DataFrame) -> Result<DataFrame, ValidationError> {
let result_col = format!("{}_custom", self.column_name);
df.select(vec![
col("*"),
sql(&self.expression).alias(&result_col),
])
}
fn name(&self) -> &str {
"custom"
}
fn description(&self) -> &str {
"Applies a custom SQL expression to a column"
}
fn column_name(&self) -> &str {
&self.column_name
}
}
Each rule adds a new column to the DataFrame with a name in the format <column_name>_<rule_name>. The value in these columns is a boolean indicating whether the rule passed for that row. One final column is created called dq_pass that is the boolean AND of all of the rule columns.
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the Apache License - see the LICENSE file for details.