senior-data-engineerData engineering skill for building scalable data pipelines, ETL/ELT systems, and data infrastructure. Expertise in Python, SQL, Spark, Airflow, dbt, Kafka, and modern data stack. Includes data modeling, pipeline orchestration, data quality, and DataOps. Use when designing data architectures, building data pipelines, optimizing data workflows, implementing data governance, or troubleshooting data issues.
Install via ClawdBot CLI:
clawdbot install alirezarezvani/senior-data-engineerProduction-grade data engineering skill for building scalable, reliable data systems.
Activate this skill when you see:
Pipeline Design:
Architecture:
Data Modeling:
Data Quality:
Performance:
# Generate pipeline orchestration config
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--destination snowflake \
--schedule "0 5 * * *"
# Validate data quality
python scripts/data_quality_validator.py validate \
--input data/sales.parquet \
--schema schemas/sales.json \
--checks freshness,completeness,uniqueness
# Optimize ETL performance
python scripts/etl_performance_optimizer.py analyze \
--query queries/daily_aggregation.sql \
--engine spark \
--recommend
Scenario: Extract data from PostgreSQL, transform with dbt, load to Snowflake.
-- Document source tables
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--tables orders,customers,products \
--mode incremental \
--watermark updated_at \
--output dags/extract_source.py
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('postgres', 'orders') }}
),
renamed AS (
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
_extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)
SELECT * FROM renamed
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
cluster_by=['order_date']
)
}}
SELECT
o.order_id,
o.customer_id,
c.customer_segment,
o.order_date,
o.total_amount,
o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: "Order fact table"
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: order_date
tests:
- not_null
- dbt_utils.recency:
datepart: day
field: order_date
interval: 1
# dags/daily_etl.py
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL from PostgreSQL to Snowflake',
schedule_interval='0 5 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'daily'],
) as dag:
extract = BashOperator(
task_id='extract_source_data',
bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
)
transform = BashOperator(
task_id='run_dbt_models',
bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
)
test = BashOperator(
task_id='run_dbt_tests',
bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
)
notify = BashOperator(
task_id='send_notification',
bash_command='python /opt/airflow/scripts/notify.py --status success',
trigger_rule='all_success',
)
extract >> transform >> test >> notify
# Test locally
dbt run --select stg_orders fct_orders
dbt test --select fct_orders
# Validate data quality
python scripts/data_quality_validator.py validate \
--table fct_orders \
--checks all \
--output reports/quality_report.json
Scenario: Stream events from Kafka, process with Flink/Spark Streaming, sink to data lake.
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "UserEvent",
"type": "object",
"required": ["event_id", "user_id", "event_type", "timestamp"],
"properties": {
"event_id": {"type": "string", "format": "uuid"},
"user_id": {"type": "string"},
"event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
"timestamp": {"type": "string", "format": "date-time"},
"properties": {"type": "object"}
}
}
# Create topic with appropriate partitions
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete
# Verify topic
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic user-events
# streaming/user_events_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, count, avg,
to_timestamp, current_timestamp
)
from pyspark.sql.types import (
StructType, StructField, StringType,
TimestampType, MapType
)
# Initialize Spark
spark = SparkSession.builder \
.appName("UserEventsProcessor") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events") \
.config("spark.sql.shuffle.partitions", "12") \
.getOrCreate()
# Define schema
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", StringType(), False),
StructField("properties", MapType(StringType(), StringType()), True)
])
# Read from Kafka
events_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
# Parse JSON
parsed_df = events_df \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*") \
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
# Windowed aggregation
aggregated_df = parsed_df \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "5 minutes"),
col("event_type")
) \
.agg(
count("*").alias("event_count"),
approx_count_distinct("user_id").alias("unique_users")
)
# Write to Delta Lake
query = aggregated_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/user-events-aggregated") \
.option("path", "/data/lake/user_events_aggregated") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
# Dead letter queue for failed records
from pyspark.sql.functions import current_timestamp, lit
def process_with_error_handling(batch_df, batch_id):
try:
# Attempt processing
valid_df = batch_df.filter(col("event_id").isNotNull())
invalid_df = batch_df.filter(col("event_id").isNull())
# Write valid records
valid_df.write \
.format("delta") \
.mode("append") \
.save("/data/lake/user_events")
# Write invalid to DLQ
if invalid_df.count() > 0:
invalid_df \
.withColumn("error_timestamp", current_timestamp()) \
.withColumn("error_reason", lit("missing_event_id")) \
.write \
.format("delta") \
.mode("append") \
.save("/data/lake/dlq/user_events")
except Exception as e:
# Log error, alert, continue
logger.error(f"Batch {batch_id} failed: {e}")
raise
# Use foreachBatch for custom processing
query = parsed_df.writeStream \
.foreachBatch(process_with_error_handling) \
.option("checkpointLocation", "/checkpoints/user-events") \
.start()
# monitoring/stream_metrics.py
from prometheus_client import Gauge, Counter, start_http_server
# Define metrics
RECORDS_PROCESSED = Counter(
'stream_records_processed_total',
'Total records processed',
['stream_name', 'status']
)
PROCESSING_LAG = Gauge(
'stream_processing_lag_seconds',
'Current processing lag',
['stream_name']
)
BATCH_DURATION = Gauge(
'stream_batch_duration_seconds',
'Last batch processing duration',
['stream_name']
)
def emit_metrics(query):
"""Emit Prometheus metrics from streaming query."""
progress = query.lastProgress
if progress:
RECORDS_PROCESSED.labels(
stream_name='user-events',
status='success'
).inc(progress['numInputRows'])
if progress['sources']:
# Calculate lag from latest offset
for source in progress['sources']:
end_offset = source.get('endOffset', {})
# Parse Kafka offsets and calculate lag
Scenario: Implement comprehensive data quality monitoring with Great Expectations.
# Install and initialize
pip install great_expectations
great_expectations init
# Connect to data source
great_expectations datasource new
# expectations/orders_suite.py
import great_expectations as gx
context = gx.get_context()
# Create expectation suite
suite = context.add_expectation_suite("orders_quality_suite")
# Add expectations
validator = context.get_validator(
batch_request={
"datasource_name": "warehouse",
"data_asset_name": "orders",
},
expectation_suite_name="orders_quality_suite"
)
# Schema expectations
validator.expect_table_columns_to_match_ordered_list(
column_list=[
"order_id", "customer_id", "order_date",
"total_amount", "status", "created_at"
]
)
# Completeness expectations
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")
# Uniqueness expectations
validator.expect_column_values_to_be_unique("order_id")
# Range expectations
validator.expect_column_values_to_be_between(
"total_amount",
min_value=0,
max_value=1000000
)
# Categorical expectations
validator.expect_column_values_to_be_in_set(
"status",
["pending", "confirmed", "shipped", "delivered", "cancelled"]
)
# Freshness expectation
validator.expect_column_max_to_be_between(
"order_date",
min_value={"$PARAMETER": "now - timedelta(days=1)"},
max_value={"$PARAMETER": "now"}
)
# Referential integrity
validator.expect_column_values_to_be_in_set(
"customer_id",
value_set={"$PARAMETER": "valid_customer_ids"}
)
validator.save_expectation_suite(discard_failed_expectations=False)
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: "Order fact table with data quality checks"
tests:
# Row count check
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders')
# Freshness check
- dbt_utils.recency:
datepart: hour
field: created_at
interval: 24
columns:
- name: order_id
description: "Unique order identifier"
tests:
- unique
- not_null
- relationships:
to: ref('dim_orders')
field: order_id
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
inclusive: true
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
row_condition: "status != 'cancelled'"
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
severity: warn
# contracts/orders_contract.yaml
contract:
name: orders_data_contract
version: "1.0.0"
owner: data-team@company.com
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: "Unique order identifier"
customer_id:
type: string
not_null: true
order_date:
type: date
not_null: true
total_amount:
type: decimal
precision: 10
scale: 2
minimum: 0
status:
type: string
enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
sla:
freshness:
max_delay_hours: 1
completeness:
min_percentage: 99.9
accuracy:
duplicate_tolerance: 0.01
consumers:
- name: analytics-team
usage: "Daily reporting dashboards"
- name: ml-team
usage: "Churn prediction model"
# monitoring/quality_dashboard.py
from datetime import datetime, timedelta
import pandas as pd
def generate_quality_report(connection, table_name: str) -> dict:
"""Generate comprehensive data quality report."""
report = {
"table": table_name,
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# Row count check
row_count = connection.execute(
f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
report["checks"]["row_count"] = {
"value": row_count,
"status": "pass" if row_count > 0 else "fail"
}
# Freshness check
max_date = connection.execute(
f"SELECT MAX(created_at) FROM {table_name}"
).fetchone()[0]
hours_old = (datetime.now() - max_date).total_seconds() / 3600
report["checks"]["freshness"] = {
"max_timestamp": max_date.isoformat(),
"hours_old": round(hours_old, 2),
"status": "pass" if hours_old < 24 else "fail"
}
# Null rate check
null_query = f"""
SELECT
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_id,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_id,
COUNT(*) as total
FROM {table_name}
"""
null_result = connection.execute(null_query).fetchone()
report["checks"]["null_rates"] = {
"order_id": null_result[0] / null_result[2] if null_result[2] > 0 else 0,
"customer_id": null_result[1] / null_result[2] if null_result[2] > 0 else 0,
"status": "pass" if null_result[0] == 0 and null_result[1] == 0 else "fail"
}
# Duplicate check
dup_query = f"""
SELECT COUNT(*) - COUNT(DISTINCT order_id) as duplicates
FROM {table_name}
"""
duplicates = connection.execute(dup_query).fetchone()[0]
report["checks"]["duplicates"] = {
"count": duplicates,
"status": "pass" if duplicates == 0 else "fail"
}
# Overall status
all_passed = all(
check["status"] == "pass"
for check in report["checks"].values()
)
report["overall_status"] = "pass" if all_passed else "fail"
return report
Use this framework to choose the right approach for your data pipeline.
| Criteria | Batch | Streaming |
|----------|-------|-----------|
| Latency requirement | Hours to days | Seconds to minutes |
| Data volume | Large historical datasets | Continuous event streams |
| Processing complexity | Complex transformations, ML | Simple aggregations, filtering |
| Cost sensitivity | More cost-effective | Higher infrastructure cost |
| Error handling | Easier to reprocess | Requires careful design |
Decision Tree:
Is real-time insight required?
├── Yes → Use streaming
│ └── Is exactly-once semantics needed?
│ ├── Yes → Kafka + Flink/Spark Structured Streaming
│ └── No → Kafka + consumer groups
└── No → Use batch
└── Is data volume > 1TB daily?
├── Yes → Spark/Databricks
└── No → dbt + warehouse compute
| Aspect | Lambda | Kappa |
|--------|--------|-------|
| Complexity | Two codebases (batch + stream) | Single codebase |
| Maintenance | Higher (sync batch/stream logic) | Lower |
| Reprocessing | Native batch layer | Replay from source |
| Use case | ML training + real-time serving | Pure event-driven |
When to choose Lambda:
When to choose Kappa:
| Feature | Warehouse (Snowflake/BigQuery) | Lakehouse (Delta/Iceberg) |
|---------|-------------------------------|---------------------------|
| Best for | BI, SQL analytics | ML, unstructured data |
| Storage cost | Higher (proprietary format) | Lower (open formats) |
| Flexibility | Schema-on-write | Schema-on-read |
| Performance | Excellent for SQL | Good, improving |
| Ecosystem | Mature BI tools | Growing ML tooling |
| Category | Technologies |
|----------|--------------|
| Languages | Python, SQL, Scala |
| Orchestration | Airflow, Prefect, Dagster |
| Transformation | dbt, Spark, Flink |
| Streaming | Kafka, Kinesis, Pub/Sub |
| Storage | S3, GCS, Delta Lake, Iceberg |
| Warehouses | Snowflake, BigQuery, Redshift, Databricks |
| Quality | Great Expectations, dbt tests, Monte Carlo |
| Monitoring | Prometheus, Grafana, Datadog |
See references/data_pipeline_architecture.md for:
See references/data_modeling_patterns.md for:
See references/dataops_best_practices.md for:
Symptom: Airflow DAG fails with timeout
Task exceeded max execution time
Solution:
# Increase timeout
default_args = {
'execution_timeout': timedelta(hours=2),
}
# Or use incremental loads
WHERE updated_at > '{{ prev_ds }}'
Symptom: Spark job OOM
java.lang.OutOfMemoryError: Java heap space
Solution:
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.memory.fraction", "0.8")
Symptom: Kafka consumer lag increasing
Consumer lag: 1000000 messages
Solution:
# Add more partitions
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 24
Symptom: Duplicate records appearing
Expected unique, found 150 duplicates
Solution:
-- dbt incremental with dedup
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
SELECT * FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY updated_at DESC
) as rn
FROM {{ source('raw', 'orders') }}
) WHERE rn = 1
Symptom: Stale data in tables
Last update: 3 days ago
Solution:
# dbt freshness check
sources:
- name: raw
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _loaded_at
Symptom: Schema drift detected
Column 'new_field' not in expected schema
Solution:
# Handle schema evolution
df = spark.read.format("delta") \
.option("mergeSchema", "true") \
.load("/data/orders")
Symptom: Query takes hours
Query runtime: 4 hours (expected: 30 minutes)
Solution:
-- Before: Full table scan
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- After: Partition pruning
-- Table partitioned by order_date
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- Add clustering for frequent filters
ALTER TABLE orders CLUSTER BY (customer_id);
Symptom: dbt model takes too long
Model fct_orders completed in 45 minutes
Solution:
-- Convert to incremental
{{
config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
)
}}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
Generated Mar 1, 2026
A retail company needs to analyze daily sales data from multiple online platforms to track revenue, customer behavior, and inventory trends. This involves building a batch ETL pipeline to ingest data from PostgreSQL databases, transform it using dbt models for dimensional modeling, and load it into Snowflake for BI dashboards, with data quality checks to ensure accuracy.
A financial services firm requires a streaming data pipeline to monitor transactions in real-time for fraudulent activities. Using Kafka for event ingestion and Spark for processing, the system analyzes patterns and triggers alerts, with Airflow orchestrating batch jobs for historical data reconciliation and compliance reporting.
A healthcare provider aims to consolidate patient records from various sources like EHR systems and IoT devices into a unified data lakehouse. This scenario involves designing an ELT pipeline with data quality frameworks to ensure HIPAA compliance, using dbt for transformations and Airflow for scheduling incremental loads to support analytics on patient outcomes.
A logistics company seeks to optimize supply chain operations by processing real-time sensor data from shipments. The pipeline uses Kafka for streaming IoT events, Spark for aggregating metrics like delivery times, and batch ETL with dbt to model data in a data warehouse, enabling predictive analytics for route planning and inventory management.
A subscription-based service offering data engineering tools and managed pipelines to businesses, generating revenue through tiered pricing based on data volume and features like real-time processing or advanced analytics. This model leverages the skill's expertise in scalable infrastructure to provide turnkey solutions for clients.
Providing expert data engineering consulting to enterprises for designing and implementing custom data architectures, such as building ETL pipelines or setting up DataOps practices. Revenue comes from project-based contracts or hourly rates, utilizing the skill's workflows for pipeline development and troubleshooting.
Creating and selling proprietary data products, like pre-built analytics dashboards or data quality frameworks, that integrate with clients' existing systems. This model monetizes the skill's capabilities in data modeling and pipeline orchestration to deliver value-added insights and tools.
💬 Integration Tip
Integrate this skill with existing CI/CD pipelines to automate deployment of data workflows, and ensure compatibility with cloud platforms like AWS or GCP for scalable infrastructure management.
Use the @steipete/oracle CLI to bundle a prompt plus the right files and get a second-model review (API or browser) for debugging, refactors, design checks, or cross-validation.
Manage Things 3 via the `things` CLI on macOS (add/update projects+todos via URL scheme; read/search/list from the local Things database). Use when a user asks Clawdbot to add a task to Things, list inbox/today/upcoming, search tasks, or inspect projects/areas/tags.
Local search/indexing CLI (BM25 + vectors + rerank) with MCP mode.
Use when designing database schemas, writing migrations, optimizing SQL queries, fixing N+1 problems, creating indexes, setting up PostgreSQL, configuring EF Core, implementing caching, partitioning tables, or any database performance question.
Connect to Supabase for database operations, vector search, and storage. Use for storing data, running SQL queries, similarity search with pgvector, and managing tables. Triggers on requests involving databases, vector stores, embeddings, or Supabase specifically.
Query, design, migrate, and optimize SQL databases. Use when working with SQLite, PostgreSQL, or MySQL — schema design, writing queries, creating migrations, indexing, backup/restore, and debugging slow queries. No ORMs required.