Table of Contents
Why Data Quality Matters
Data quality problems compound. A small issue in your pipeline becomes a big problem in your dashboard, which becomes a trust problem with stakeholders.
The Cost of Bad Data
- Bad decisions: Wrong data leads to wrong conclusions
- Lost time: Debugging data issues instead of building
- Lost trust: Once stakeholders don't trust data, they stop using it
- Downstream failures: ML models trained on bad data produce bad predictions
Data Quality Dimensions
Data quality isn't binary. There are multiple dimensions to consider:
1. Completeness
Are all expected records present? Are required fields populated?
- Row counts match expectations
- Required fields are not null
- Expected date ranges are covered
2. Accuracy
Does the data correctly represent reality?
- Values are within expected ranges
- Calculations are correct
- Data matches source of truth
3. Consistency
Is the data internally consistent and consistent across systems?
- No duplicate primary keys
- Referential integrity maintained
- Related tables agree
4. Timeliness
Is the data fresh enough for its purpose?
- Data arrives within SLA
- No unexpectedly old records
- Timestamps are reasonable
5. Validity
Does the data conform to expected formats and constraints?
- Emails are valid email format
- Dates are parseable
- Enums contain only allowed values
Types of Checks
Schema Checks
-- Verify expected columns exist with correct types
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'orders'
ORDER BY ordinal_position;
Null Checks
-- Check for unexpected nulls in required fields
SELECT
COUNT(*) as total_rows,
COUNT(order_id) as orders_with_id,
COUNT(customer_id) as orders_with_customer,
COUNT(amount) as orders_with_amount
FROM orders
WHERE order_date = CURRENT_DATE - 1;
Uniqueness Checks
-- Find duplicate primary keys
SELECT order_id, COUNT(*) as cnt
FROM orders
GROUP BY order_id
HAVING COUNT(*) > 1;
Range Checks
-- Verify values are within expected ranges
SELECT COUNT(*) as invalid_amounts
FROM orders
WHERE amount < 0 OR amount > 1000000;
SELECT COUNT(*) as future_orders
FROM orders
WHERE order_date > CURRENT_DATE;
Referential Integrity
-- Find orphaned records
SELECT o.order_id
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;
Freshness Checks
-- Verify data is recent
SELECT MAX(updated_at) as last_update,
CURRENT_TIMESTAMP - MAX(updated_at) as staleness
FROM orders;
-- Alert if more than 1 hour old
-- (implement in monitoring layer)
Reconciliation Testing
Reconciliation compares data between systems to verify accuracy. Essential for migrations, ETL pipelines, and any data movement.
Row Count Reconciliation
-- Compare row counts between source and target
WITH source_counts AS (
SELECT 'orders' as table_name, COUNT(*) as cnt FROM source.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM source.customers
),
target_counts AS (
SELECT 'orders' as table_name, COUNT(*) as cnt FROM target.orders
UNION ALL
SELECT 'customers', COUNT(*) FROM target.customers
)
SELECT
s.table_name,
s.cnt as source_count,
t.cnt as target_count,
s.cnt - t.cnt as difference
FROM source_counts s
JOIN target_counts t ON s.table_name = t.table_name
WHERE s.cnt != t.cnt;
Aggregate Reconciliation
-- Compare key metrics between source and target
SELECT
'source' as system,
SUM(amount) as total_revenue,
COUNT(DISTINCT customer_id) as unique_customers,
AVG(amount) as avg_order_value
FROM source.orders
WHERE order_date = '2024-01-15'
UNION ALL
SELECT
'target' as system,
SUM(amount) as total_revenue,
COUNT(DISTINCT customer_id) as unique_customers,
AVG(amount) as avg_order_value
FROM target.orders
WHERE order_date = '2024-01-15';
Sample-Based Row Comparison
# Python example for row-level comparison
import pandas as pd
def compare_samples(source_df, target_df, key_column, sample_size=1000):
# Get random sample of keys
sample_keys = source_df[key_column].sample(n=sample_size).tolist()
source_sample = source_df[source_df[key_column].isin(sample_keys)]
target_sample = target_df[target_df[key_column].isin(sample_keys)]
# Merge and compare
comparison = source_sample.merge(
target_sample,
on=key_column,
suffixes=('_source', '_target')
)
# Find differences
differences = []
for col in source_df.columns:
if col == key_column:
continue
source_col = f"{col}_source"
target_col = f"{col}_target"
mismatches = comparison[comparison[source_col] != comparison[target_col]]
if len(mismatches) > 0:
differences.append({
'column': col,
'mismatch_count': len(mismatches),
'examples': mismatches[[key_column, source_col, target_col]].head()
})
return differences
Automating Quality Checks
Using dbt Tests
# schema.yml
version: 2
models:
- name: orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_id
- name: amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: status
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled']
Using Great Expectations
import great_expectations as gx
context = gx.get_context()
# Define expectations
expectation_suite = context.add_expectation_suite("orders_suite")
expectation_suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
expectation_suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=1000000
)
)
expectation_suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000, max_value=1000000
)
)
# Validate data
checkpoint = context.add_checkpoint(name="orders_checkpoint", ...)
result = checkpoint.run()
Building a Quality Pipeline
# Airflow example
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
def run_quality_checks(**context):
results = {
'row_count': check_row_count(),
'null_check': check_nulls(),
'range_check': check_ranges(),
'reconciliation': run_reconciliation()
}
failures = [k for k, v in results.items() if not v['passed']]
context['ti'].xcom_push(key='failures', value=failures)
return failures
def decide_next_step(**context):
failures = context['ti'].xcom_pull(key='failures')
if failures:
return 'alert_on_failure'
return 'continue_pipeline'
with DAG('data_quality_pipeline', ...):
quality_checks = PythonOperator(
task_id='run_quality_checks',
python_callable=run_quality_checks
)
branch = BranchPythonOperator(
task_id='decide_next',
python_callable=decide_next_step
)
quality_checks >> branch
Frequently Asked Questions
When should quality checks run?
Run checks at multiple points: after extraction (validate source), after transformation (validate logic), and after loading (validate completeness). Critical checks should block the pipeline; others can alert and continue.
How strict should checks be?
It depends on the use case. Financial data needs strict, blocking checks. Marketing analytics might tolerate some issues with alerts. Define thresholds based on business impact.
What about historical data issues?
Don't just check today's data. Periodically audit historical data for drift. Build trend monitoring to catch gradual degradation.
Need help with data quality?
I build data pipelines with quality checks and reconciliation testing built in. Get in touch to discuss your project.
Book a Call