Why Data Quality Matters

Data quality problems compound. A small issue in your pipeline becomes a big problem in your dashboard, which becomes a trust problem with stakeholders.

The Cost of Bad Data

  • Bad decisions: Wrong data leads to wrong conclusions
  • Lost time: Debugging data issues instead of building
  • Lost trust: Once stakeholders don't trust data, they stop using it
  • Downstream failures: ML models trained on bad data produce bad predictions

Data Quality Dimensions

Data quality isn't binary. There are multiple dimensions to consider:

1. Completeness

Are all expected records present? Are required fields populated?

  • Row counts match expectations
  • Required fields are not null
  • Expected date ranges are covered

2. Accuracy

Does the data correctly represent reality?

  • Values are within expected ranges
  • Calculations are correct
  • Data matches source of truth

3. Consistency

Is the data internally consistent and consistent across systems?

  • No duplicate primary keys
  • Referential integrity maintained
  • Related tables agree

4. Timeliness

Is the data fresh enough for its purpose?

  • Data arrives within SLA
  • No unexpectedly old records
  • Timestamps are reasonable

5. Validity

Does the data conform to expected formats and constraints?

  • Emails are valid email format
  • Dates are parseable
  • Enums contain only allowed values

Types of Checks

Schema Checks

-- Verify expected columns exist with correct types
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'orders'
ORDER BY ordinal_position;

Null Checks

-- Check for unexpected nulls in required fields
SELECT
  COUNT(*) as total_rows,
  COUNT(order_id) as orders_with_id,
  COUNT(customer_id) as orders_with_customer,
  COUNT(amount) as orders_with_amount
FROM orders
WHERE order_date = CURRENT_DATE - 1;

Uniqueness Checks

-- Find duplicate primary keys
SELECT order_id, COUNT(*) as cnt
FROM orders
GROUP BY order_id
HAVING COUNT(*) > 1;

Range Checks

-- Verify values are within expected ranges
SELECT COUNT(*) as invalid_amounts
FROM orders
WHERE amount < 0 OR amount > 1000000;

SELECT COUNT(*) as future_orders
FROM orders
WHERE order_date > CURRENT_DATE;

Referential Integrity

-- Find orphaned records
SELECT o.order_id
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;

Freshness Checks

-- Verify data is recent
SELECT MAX(updated_at) as last_update,
       CURRENT_TIMESTAMP - MAX(updated_at) as staleness
FROM orders;

-- Alert if more than 1 hour old
-- (implement in monitoring layer)

Reconciliation Testing

Reconciliation compares data between systems to verify accuracy. Essential for migrations, ETL pipelines, and any data movement.

Row Count Reconciliation

-- Compare row counts between source and target
WITH source_counts AS (
  SELECT 'orders' as table_name, COUNT(*) as cnt FROM source.orders
  UNION ALL
  SELECT 'customers', COUNT(*) FROM source.customers
),
target_counts AS (
  SELECT 'orders' as table_name, COUNT(*) as cnt FROM target.orders
  UNION ALL
  SELECT 'customers', COUNT(*) FROM target.customers
)
SELECT
  s.table_name,
  s.cnt as source_count,
  t.cnt as target_count,
  s.cnt - t.cnt as difference
FROM source_counts s
JOIN target_counts t ON s.table_name = t.table_name
WHERE s.cnt != t.cnt;

Aggregate Reconciliation

-- Compare key metrics between source and target
SELECT
  'source' as system,
  SUM(amount) as total_revenue,
  COUNT(DISTINCT customer_id) as unique_customers,
  AVG(amount) as avg_order_value
FROM source.orders
WHERE order_date = '2024-01-15'

UNION ALL

SELECT
  'target' as system,
  SUM(amount) as total_revenue,
  COUNT(DISTINCT customer_id) as unique_customers,
  AVG(amount) as avg_order_value
FROM target.orders
WHERE order_date = '2024-01-15';

Sample-Based Row Comparison

# Python example for row-level comparison
import pandas as pd

def compare_samples(source_df, target_df, key_column, sample_size=1000):
    # Get random sample of keys
    sample_keys = source_df[key_column].sample(n=sample_size).tolist()

    source_sample = source_df[source_df[key_column].isin(sample_keys)]
    target_sample = target_df[target_df[key_column].isin(sample_keys)]

    # Merge and compare
    comparison = source_sample.merge(
        target_sample,
        on=key_column,
        suffixes=('_source', '_target')
    )

    # Find differences
    differences = []
    for col in source_df.columns:
        if col == key_column:
            continue
        source_col = f"{col}_source"
        target_col = f"{col}_target"
        mismatches = comparison[comparison[source_col] != comparison[target_col]]
        if len(mismatches) > 0:
            differences.append({
                'column': col,
                'mismatch_count': len(mismatches),
                'examples': mismatches[[key_column, source_col, target_col]].head()
            })

    return differences

Automating Quality Checks

Using dbt Tests

# schema.yml
version: 2

models:
  - name: orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('customers')
              field: customer_id
      - name: amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'completed', 'cancelled']

Using Great Expectations

import great_expectations as gx

context = gx.get_context()

# Define expectations
expectation_suite = context.add_expectation_suite("orders_suite")

expectation_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
expectation_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=1000000
    )
)
expectation_suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000, max_value=1000000
    )
)

# Validate data
checkpoint = context.add_checkpoint(name="orders_checkpoint", ...)
result = checkpoint.run()

Building a Quality Pipeline

# Airflow example
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator

def run_quality_checks(**context):
    results = {
        'row_count': check_row_count(),
        'null_check': check_nulls(),
        'range_check': check_ranges(),
        'reconciliation': run_reconciliation()
    }

    failures = [k for k, v in results.items() if not v['passed']]
    context['ti'].xcom_push(key='failures', value=failures)
    return failures

def decide_next_step(**context):
    failures = context['ti'].xcom_pull(key='failures')
    if failures:
        return 'alert_on_failure'
    return 'continue_pipeline'

with DAG('data_quality_pipeline', ...):
    quality_checks = PythonOperator(
        task_id='run_quality_checks',
        python_callable=run_quality_checks
    )

    branch = BranchPythonOperator(
        task_id='decide_next',
        python_callable=decide_next_step
    )

    quality_checks >> branch

Frequently Asked Questions

When should quality checks run?

Run checks at multiple points: after extraction (validate source), after transformation (validate logic), and after loading (validate completeness). Critical checks should block the pipeline; others can alert and continue.

How strict should checks be?

It depends on the use case. Financial data needs strict, blocking checks. Marketing analytics might tolerate some issues with alerts. Define thresholds based on business impact.

What about historical data issues?

Don't just check today's data. Periodically audit historical data for drift. Build trend monitoring to catch gradual degradation.

Need help with data quality?

I build data pipelines with quality checks and reconciliation testing built in. Get in touch to discuss your project.

Book a Call