Skill: data-pipeline
ETL/ELT pipeline design. Trigger when the user wants to create data flows, transformations, or orchestration.
Configuration
| Property | Value |
|---|---|
| Context | fork |
| Allowed tools | Read, Write, Edit, Bash, Glob, Grep |
| Keywords | data, pipeline |
Detailed description
Data Pipeline
ETL vs ELT
| Pattern | When to use |
|---|---|
| ETL | Complex transformation, sensitive data |
| ELT | Big data, cloud DW (BigQuery, Snowflake) |
Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_from_source,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
load = PythonOperator(
task_id='load',
python_callable=load_to_warehouse,
)
extract >> transform >> load
dbt Transformation
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}
SELECT
id AS order_id,
customer_id,
order_date,
CAST(total AS DECIMAL(10,2)) AS total_amount
FROM {{ source('raw', 'orders') }}
WHERE order_date >= '2023-01-01'
Data Quality
def validate_data(df):
assert df['order_id'].is_unique, "Duplicate IDs"
assert df['amount'].ge(0).all(), "Negative amounts"
assert df['customer_id'].notna().all(), "Null customers"
Automatic triggering
This skill is automatically activated when:
- The matching keywords are detected in the conversation
- The task context matches the skill's domain
Triggering examples
- "I want to data..."
- "I want to pipeline..."
Context fork
Fork means the skill runs in an isolated context:
- Does not pollute the main conversation
- Results are returned cleanly
- Ideal for autonomous tasks
Practical examples
1. Example: ETL Pipeline
Example: ETL Pipeline
Scenario
Build a pipeline to extract orders from a PostgreSQL database, transform for analytics, and load into a data warehouse.
Pipeline Architecture
Source (PostgreSQL) -> Extract -> Transform -> Validate -> Load (BigQuery)
|
Dead Letter Queue
Extract
# pipeline/extract.py
import pandas as pd
from sqlalchemy import create_engine
def extract_orders(since: str, until: str) -> pd.DataFrame:
"""Extract orders within date range from source DB."""
engine = create_engine(os.environ['SOURCE_DATABASE_URL'])
query = """
SELECT o.id, o.user_id, o.total, o.status, o.created_at,
u.country, u.signup_date
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.created_at BETWEEN %(since)s AND %(until)s
"""
df = pd.read_sql(query, engine, params={'since': since, 'until': until})
logger.info(f"Extracted {len(df)} orders from {since} to {until}")
return df
Transform
# pipeline/transform.py
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and enrich order data for analytics."""
# Clean
df = df.dropna(subset=['user_id', 'total'])
df['total'] = df['total'].clip(lower=0)
# Enrich
df['order_date'] = pd.to_datetime(df['created_at']).dt.date
df['days_since_signup'] = (
pd.to_datetime(df['created_at']) - pd.to_datetime(df['signup_date'])
).dt.days
df['order_bucket'] = pd.cut(
df['total'], bins=[0, 25, 100, 500, float('inf')],
labels=['small', 'medium', 'large', 'enterprise']
)
# Aggregate
daily = df.groupby(['order_date', 'country', 'order_bucket']).agg(
order_count=('id', 'count'),
revenue=('total', 'sum'),
avg_order_value=('total', 'mean'),
).reset_index()
return daily
Validate
# pipeline/validate.py
def validate(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Split into valid rows and invalid rows (dead letter)."""
invalid_mask = (
df['revenue'].isna() |
(df['order_count'] <= 0) |
(df['revenue'] < 0)
)
valid = df[~invalid_mask]
dead_letter = df[invalid_mask]
if len(dead_letter) > 0:
logger.warn(f"{len(dead_letter)} rows sent to dead letter queue")
return valid, dead_letter
Load
# pipeline/load.py
from google.cloud import bigquery
def load_to_bigquery(df: pd.DataFrame, table_id: str):
"""Load validated data to BigQuery with upsert."""
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
write_disposition='WRITE_APPEND',
schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
)
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for completion
logger.info(f"Loaded {len(df)} rows to {table_id}")
Orchestration
# pipeline/run.py
def run_daily_pipeline(execution_date: str):
raw = extract_orders(since=execution_date, until=next_day(execution_date))
transformed = transform_orders(raw)
valid, dead_letter = validate(transformed)
load_to_bigquery(valid, 'analytics.daily_orders')
if len(dead_letter) > 0:
load_to_bigquery(dead_letter, 'analytics.dead_letter_orders')
Key Decisions
- Parameterized queries: Prevent SQL injection, enable date-range backfills
- Dead letter queue: Invalid rows saved for investigation, not silently dropped
- Aggregation in transform: Reduce row count before loading (cost optimization)
- Idempotent loads: Date-partitioned tables allow safe re-runs
- Pandas over Spark: Dataset fits in memory (< 1M rows/day); Spark for larger scale