data-engineering-medallion-pipeline

End-to-end data engineering pipeline using MinIO, Airbyte, PostgreSQL, DBT, and Airflow with medallion architecture (Bronze/Silver/Gold layers)

Skill file

Preview skill file
---
name: data-engineering-medallion-pipeline
description: End-to-end data engineering pipeline using MinIO, Airbyte, PostgreSQL, DBT, and Airflow with medallion architecture (Bronze/Silver/Gold layers)
triggers:
  - "set up a medallion architecture data pipeline"
  - "configure airbyte with minio and postgres"
  - "create dbt bronze silver gold models"
  - "orchestrate data pipeline with airflow"
  - "implement data quality tests in dbt"
  - "build an elt pipeline with docker compose"
  - "create a data lakehouse with medallion layers"
  - "deploy data engineering stack locally"
---

# Data Engineering Medallion Pipeline Skill

> Skill by [ara.so](https://ara.so) — Data Skills collection.

This skill enables AI agents to work with a complete data engineering pipeline implementing the Medallion Architecture (Bronze → Silver → Gold) using modern open-source tools: MinIO (S3-compatible storage), Airbyte (data ingestion), PostgreSQL (data warehouse), DBT (transformations), Apache Airflow (orchestration), and Grafana (monitoring).

## What This Project Does

The data-engineering-medallion project provides a complete end-to-end data pipeline that:

1. **Ingests** raw data from MinIO object storage into PostgreSQL using Airbyte
2. **Transforms** data through three layers (Bronze/Silver/Gold) using DBT
3. **Orchestrates** the entire pipeline with Apache Airflow DAGs
4. **Validates** data quality with automated DBT tests
5. **Monitors** infrastructure health with Prometheus and Grafana
6. **Visualizes** business metrics in Power BI dashboards

The architecture follows ELT (Extract-Load-Transform) pattern with clear separation of concerns:
- **Bronze**: Raw immutable data from sources (JSONB format)
- **Silver**: Cleaned, validated, and typed data
- **Gold**: Business-ready aggregated metrics and KPIs

## Installation & Setup

### Prerequisites

```bash
# Required
docker --version  # 20.10+
docker-compose --version  # 2.0+
# 8GB RAM minimum, 16GB recommended
```

### Clone and Initialize

```bash
git clone https://github.com/LucasGoulartCouto/data-engineering-medallion.git
cd data-engineering-medallion

# Setup environment and start all services
make setup
make start

# Verify all containers are healthy
make status
```

### Service URLs

After startup, access these interfaces:

- **Airflow**: http://localhost:8080 (admin/admin)
- **MinIO**: http://localhost:9001 (minioadmin/[from .env])
- **Airbyte**: http://localhost:8000 (create account on first visit)
- **Grafana**: http://localhost:3000 (admin/admin)
- **Prometheus**: http://localhost:9090
- **DBT Docs**: http://localhost:8085 (after `make dbt-docs`)

## Key Commands (Makefile)

```bash
# Infrastructure
make setup          # Create .env, directories, install dependencies
make start          # Start all Docker services
make stop           # Stop all services
make restart        # Restart all services
make status         # Check container health
make logs SERVICE=airflow  # View logs for specific service

# DBT Operations
make dbt-run        # Run all DBT models (bronze → silver → gold)
make dbt-test       # Run data quality tests
make dbt-docs       # Generate and serve documentation
make dbt-snapshot   # Capture SCD Type 2 snapshots
make dbt-clean      # Clean compiled artifacts

# Data Pipeline
make upload-data    # Upload sample data to MinIO
make trigger-dag DAG_ID=bronze_ingestion_dag  # Manually trigger Airflow DAG

# Development
make lint           # Lint Python and SQL code
make format         # Format Python code with black
make validate       # Validate Airflow DAGs and DBT models

# Cleanup
make clean          # Remove volumes and stop services
make clean-all      # Full cleanup including Docker images
```

## Project Structure

```
data-engineering-medallion/
├── airflow/
│   └── dags/
│       ├── bronze_ingestion_dag.py      # Triggers Airbyte sync
│       ├── silver_transformation_dag.py  # Runs DBT silver models
│       └── gold_aggregation_dag.py       # Runs DBT gold models
├── dbt/
│   ├── models/
│   │   ├── bronze/         # Extract JSONB → columnar
│   │   ├── silver/         # Clean, validate, dedupe
│   │   └── gold/           # Business metrics
│   ├── macros/             # Reusable SQL functions
│   ├── snapshots/          # SCD Type 2 history
│   └── tests/              # Custom data quality tests
├── scripts/
│   ├── upload_to_minio.py  # Upload CSV/JSON to MinIO
│   └── test_connections.py # Verify service connectivity
├── postgres/init/          # Database initialization SQL
├── monitoring/
│   ├── grafana/provisioning/
│   └── prometheus/
├── docker-compose.yml
├── Makefile
└── .env                    # Configuration (create from .env.example)
```

## DBT Model Development

### Bronze Layer (Raw Extraction)

Bronze models extract JSONB data from Airbyte into typed columns:

```sql
-- dbt/models/bronze/bronze_orders.sql
{{ config(
    materialized='view',
    schema='bronze'
) }}

SELECT
    _airbyte_ab_id,
    _airbyte_emitted_at,
    _airbyte_data->>'order_id' AS order_id,
    _airbyte_data->>'customer_id' AS customer_id,
    _airbyte_data->>'order_date' AS order_date,
    _airbyte_data->>'total_amount' AS total_amount,
    _airbyte_data->>'status' AS status,
    _airbyte_data AS raw_data
FROM {{ source('airbyte_raw', '_airbyte_raw_orders') }}
```

### Silver Layer (Cleaned & Validated)

Silver models clean, cast types, deduplicate, and add calculated fields:

```sql
-- dbt/models/silver/silver_orders.sql
{{ config(
    materialized='table',
    schema='silver',
    unique_key='order_id'
) }}

WITH deduplicated AS (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY order_id 
               ORDER BY _airbyte_emitted_at DESC
           ) AS rn
    FROM {{ ref('bronze_orders') }}
)

SELECT
    order_id::INTEGER,
    customer_id::INTEGER,
    order_date::DATE,
    total_amount::DECIMAL(10,2),
    UPPER(TRIM(status)) AS status,
    CASE 
        WHEN total_amount::DECIMAL > 1000 THEN 'high_value'
        WHEN total_amount::DECIMAL > 500 THEN 'medium_value'
        ELSE 'low_value'
    END AS order_value_segment,
    _airbyte_emitted_at AS ingested_at,
    CURRENT_TIMESTAMP AS transformed_at
FROM deduplicated
WHERE rn = 1
    AND order_id IS NOT NULL
    AND order_date::DATE <= CURRENT_DATE
```

### Gold Layer (Business Metrics)

Gold models aggregate data for business consumption:

```sql
-- dbt/models/gold/gold_product_performance.sql
{{ config(
    materialized='table',
    schema='gold'
) }}

SELECT
    p.product_id,
    p.product_name,
    p.category,
    p.supplier,
    COUNT(DISTINCT oi.order_id) AS total_orders,
    SUM(oi.quantity) AS total_quantity_sold,
    SUM(oi.line_total) AS total_revenue,
    SUM(oi.quantity * p.cost) AS total_cost,
    SUM(oi.line_total) - SUM(oi.quantity * p.cost) AS total_profit,
    ROUND(
        (SUM(oi.line_total) - SUM(oi.quantity * p.cost)) / 
        NULLIF(SUM(oi.line_total), 0) * 100, 
        2
    ) AS profit_margin_percentage,
    AVG(oi.unit_price) AS avg_unit_price,
    MAX(o.order_date) AS last_sale_date
FROM {{ ref('silver_products') }} p
INNER JOIN {{ ref('silver_order_items') }} oi 
    ON p.product_id = oi.product_id
INNER JOIN {{ ref('silver_orders') }} o 
    ON oi.order_id = o.order_id
WHERE o.status = 'completed'
GROUP BY p.product_id, p.product_name, p.category, p.supplier
```

## DBT Testing & Data Quality

### Schema Tests (schema.yml)

```yaml
# dbt/models/silver/schema.yml
version: 2

models:
  - name: silver_orders
    description: "Cleaned and validated orders"
    columns:
      - name: order_id
        description: "Primary key"
        tests:
          - unique
          - not_null
      - name: customer_id
        description: "Foreign key to customers"
        tests:
          - not_null
          - relationships:
              to: ref('silver_customers')
              field: customer_id
      - name: status
        tests:
          - accepted_values:
              values: ['PENDING', 'COMPLETED', 'CANCELLED', 'REFUNDED']
      - name: total_amount
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
```

### Custom Tests

```sql
-- dbt/tests/assert_no_future_dates_in_sales.sql
SELECT order_id, order_date
FROM {{ ref('silver_orders') }}
WHERE order_date > CURRENT_DATE
```

```sql
-- dbt/tests/assert_no_negative_profit.sql
SELECT product_id, total_profit
FROM {{ ref('gold_product_performance') }}
WHERE total_profit < 0
```

### Running Tests

```bash
# Run all tests
make dbt-test

# Run tests for specific model
docker-compose exec dbt dbt test --select silver_orders

# Run specific test type
docker-compose exec dbt dbt test --select test_type:generic
docker-compose exec dbt dbt test --select test_type:singular
```

## Airflow DAG Development

### Bronze Ingestion DAG

```python
# airflow/dags/bronze_ingestion_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'bronze_ingestion_dag',
    default_args=default_args,
    description='Ingest data from MinIO to PostgreSQL Bronze layer',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['bronze', 'ingestion', 'airbyte'],
) as dag:

    trigger_airbyte_sync = AirbyteTriggerSyncOperator(
        task_id='trigger_airbyte_orders_sync',
        airbyte_conn_id='airbyte_default',
        connection_id='{{ var.value.airbyte_connection_id }}',
        asynchronous=False,
        timeout=3600,
    )

    def validate_ingestion(**context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        pg_hook = PostgresHook(postgres_conn_id='postgres_default')
        
        # Check row count
        result = pg_hook.get_first(
            "SELECT COUNT(*) FROM airbyte_raw._airbyte_raw_orders"
        )
        if result[0] == 0:
            raise ValueError("No data ingested to bronze layer")
        
        print(f"Validated {result[0]} rows in bronze layer")

    validate_task = PythonOperator(
        task_id='validate_bronze_ingestion',
        python_callable=validate_ingestion,
    )

    trigger_airbyte_sync >> validate_task
```

### Silver Transformation DAG

```python
# airflow/dags/silver_transformation_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtRunOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=3),
}

with DAG(
    'silver_transformation_dag',
    default_args=default_args,
    description='Transform bronze to silver layer with DBT',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['silver', 'transformation', 'dbt'],
) as dag:

    run_silver_models = BashOperator(
        task_id='run_dbt_silver_models',
        bash_command='cd /opt/dbt && dbt run --select silver.*',
    )

    test_silver_models = BashOperator(
        task_id='test_dbt_silver_models',
        bash_command='cd /opt/dbt && dbt test --select silver.*',
    )

    run_silver_models >> test_silver_models
```

### Gold Aggregation DAG

```python
# airflow/dags/gold_aggregation_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=3),
}

with DAG(
    'gold_aggregation_dag',
    default_args=default_args,
    description='Build gold layer business metrics',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['gold', 'aggregation', 'metrics'],
) as dag:

    run_gold_models = BashOperator(
        task_id='run_dbt_gold_models',
        bash_command='cd /opt/dbt && dbt run --select gold.*',
    )

    test_gold_models = BashOperator(
        task_id='test_dbt_gold_models',
        bash_command='cd /opt/dbt && dbt test --select gold.*',
    )

    snapshot_gold = BashOperator(
        task_id='snapshot_gold_metrics',
        bash_command='cd /opt/dbt && dbt snapshot',
    )

    run_gold_models >> test_gold_models >> snapshot_gold
```

## Data Upload to MinIO

```python
# scripts/upload_to_minio.py
import os
from minio import Minio
from minio.error import S3Error
import pandas as pd
from pathlib import Path

def upload_data_to_minio():
    """Upload sample CSV data to MinIO bucket"""
    
    # Initialize MinIO client
    client = Minio(
        os.getenv('MINIO_ENDPOINT', 'localhost:9000'),
        access_key=os.getenv('MINIO_ACCESS_KEY', 'minioadmin'),
        secret_key=os.getenv('MINIO_SECRET_KEY'),
        secure=False
    )
    
    bucket_name = os.getenv('MINIO_BUCKET', 'raw-data')
    
    # Create bucket if not exists
    try:
        if not client.bucket_exists(bucket_name):
            client.make_bucket(bucket_name)
            print(f"Created bucket: {bucket_name}")
    except S3Error as e:
        print(f"Error creating bucket: {e}")
        return
    
    # Upload files from data directory
    data_dir = Path('data/raw')
    for file_path in data_dir.glob('*.csv'):
        try:
            client.fput_object(
                bucket_name,
                file_path.name,
                str(file_path),
                content_type='text/csv'
            )
            print(f"Uploaded: {file_path.name}")
        except S3Error as e:
            print(f"Error uploading {file_path.name}: {e}")

if __name__ == '__main__':
    upload_data_to_minio()
```

Run the upload:

```bash
make upload-data
# or
python scripts/upload_to_minio.py
```

## Configuration

### Environment Variables (.env)

```bash
# PostgreSQL
POSTGRES_USER=dataeng
POSTGRES_PASSWORD=<your-secure-password>
POSTGRES_DB=datawarehouse
POSTGRES_HOST=postgres
POSTGRES_PORT=5432

# MinIO
MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=<your-secure-password>
MINIO_ENDPOINT=minio:9000
MINIO_BUCKET=raw-data

# Airflow
AIRFLOW_UID=50000
AIRFLOW_GID=0
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY=<generate-with-python-cryptography>

# Airbyte
AIRBYTE_VERSION=0.50.0

# DBT
DBT_PROFILES_DIR=/opt/dbt
DBT_PROJECT_DIR=/opt/dbt

# Grafana
GF_SECURITY_ADMIN_PASSWORD=<your-secure-password>
```

### DBT Profile Configuration

```yaml
# dbt/profiles.yml
datawarehouse:
  target: dev
  outputs:
    dev:
      type: postgres
      host: "{{ env_var('POSTGRES_HOST') }}"
      port: "{{ env_var('POSTGRES_PORT') | int }}"
      user: "{{ env_var('POSTGRES_USER') }}"
      password: "{{ env_var('POSTGRES_PASSWORD') }}"
      dbname: "{{ env_var('POSTGRES_DB') }}"
      schema: public
      threads: 4
      keepalives_idle: 0
```

### Airbyte Connection Setup

1. Access Airbyte UI: http://localhost:8000
2. Create MinIO source:
   - Connector: S3
   - Endpoint: http://minio:9000
   - Bucket: raw-data
   - Access Key: ${MINIO_ROOT_USER}
   - Secret Key: ${MINIO_ROOT_PASSWORD}

3. Create PostgreSQL destination:
   - Host: postgres
   - Port: 5432
   - Database: datawarehouse
   - Schema: bronze
   - Username: ${POSTGRES_USER}
   - Password: ${POSTGRES_PASSWORD}

4. Create connection with sync mode: Full Refresh | Overwrite

## Common Patterns

### Incremental Model Pattern

```sql
-- dbt/models/silver/silver_orders_incremental.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    schema='silver',
    on_schema_change='append_new_columns'
) }}

SELECT
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    _airbyte_emitted_at AS ingested_at
FROM {{ ref('bronze_orders') }}
{% if is_incremental() %}
WHERE _airbyte_emitted_at > (SELECT MAX(ingested_at) FROM {{ this }})
{% endif %}
```

### Macro for Common Transformations

```sql
-- dbt/macros/clean_string.sql
{% macro clean_string(column_name) %}
    UPPER(TRIM(REGEXP_REPLACE({{ column_name }}, '\s+', ' ', 'g')))
{% endmacro %}

-- Usage in model
SELECT {{ clean_string('customer_name') }} AS customer_name
FROM {{ ref('bronze_customers') }}
```

### Snapshot (SCD Type 2)

```sql
-- dbt/snapshots/snapshot_customer_segments.sql
{% snapshot snapshot_customer_segments %}

{{
    config(
      target_schema='snapshots',
      unique_key='customer_id',
      strategy='timestamp',
      updated_at='updated_at',
    )
}}

SELECT * FROM {{ ref('gold_customer_metrics') }}

{% endsnapshot %}
```

### Custom Test Macro

```sql
-- dbt/macros/test_no_orphans.sql
{% test no_orphan_records(model, column_name, parent_model, parent_column) %}

SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} IS NOT NULL
  AND {{ column_name }} NOT IN (
      SELECT {{ parent_column }}
      FROM {{ parent_model }}
  )

{% endtest %}
```

## Querying the Data Warehouse

### Bronze Layer Query

```sql
-- Raw JSONB data from Airbyte
SELECT 
    _airbyte_ab_id,
    _airbyte_emitted_at,
    _airbyte_data->>'order_id' AS order_id,
    _airbyte_data
FROM airbyte_raw._airbyte_raw_orders
LIMIT 10;
```

### Silver Layer Query

```sql
-- Cleaned typed data
SELECT 
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    order_value_segment
FROM silver.silver_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY order_date DESC;
```

### Gold Layer Query

```sql
-- Business metrics
SELECT 
    product_name,
    category,
    total_revenue,
    total_profit,
    profit_margin_percentage,
    total_quantity_sold
FROM gold.gold_product_performance
WHERE profit_margin_percentage > 20
ORDER BY total_revenue DESC
LIMIT 10;

-- Customer segmentation
SELECT 
    customer_segment,
    COUNT(*) AS customer_count,
    AVG(total_spent) AS avg_lifetime_value,
    AVG(recency_days) AS avg_recency
FROM gold.gold_customer_metrics
GROUP BY customer_segment
ORDER BY avg_lifetime_value DESC;
```

## Troubleshooting

### Container Won't Start

```bash
# Check logs
make logs SERVICE=postgres
make logs SERVICE=airflow-webserver

# Verify resource allocation
docker system df
docker system prune  # Clean up if needed

# Reset specific service
docker-compose restart postgres
```

### Airbyte Connection Failing

```bash
# Test MinIO connectivity
docker-compose exec airbyte-worker curl http://minio:9000/minio/health/live

# Test PostgreSQL connectivity
docker-compose exec airbyte-worker \
  psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1"

# Check Airbyte logs
docker-compose logs airbyte-worker | grep ERROR
```

### DBT Model Failing

```bash
# Run with debug logging
docker-compose exec dbt dbt run --select silver_orders --debug

# Check compiled SQL
cat dbt/target/compiled/datawarehouse/models/silver/silver_orders.sql

# Test single model
docker-compose exec dbt dbt run --select silver_orders --full-refresh

# Validate syntax without execution
docker-compose exec dbt dbt parse
```

### Airflow DAG Not Appearing

```bash
# Check DAG parsing errors
docker-compose exec airflow-webserver airflow dags list-import-errors

# Validate DAG Python syntax
python -m py_compile airflow/dags/bronze_ingestion_dag.py

# Refresh DAGs
docker-compose exec airflow-webserver airflow dags trigger <dag_id>
```

### PostgreSQL Connection Refused

```bash
# Wait for PostgreSQL to be ready
docker-compose exec postgres pg_isready -U ${POSTGRES_USER}

# Check connection from within container
docker-compose exec dbt psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB}

# Verify connection string in .env
grep POSTGRES .env
```

### Data Quality Test Failures

```bash
# Run tests with detailed output
docker-compose exec dbt dbt test --select silver_orders --store-failures

# Query failed test results
SELECT * FROM silver.test_failures 
WHERE test_name = 'unique_order_id';

# Run specific test
docker-compose exec dbt dbt test --select test_name:unique_order_id
```

### MinIO Upload Failing

```bash
# Check MinIO service status
docker-compose ps minio

# Test MinIO API directly
docker-compose exec minio mc alias set local http://localhost:9000 \
  ${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}

docker-compose exec minio mc ls local/${MINIO_BUCKET}

# Upload test file
docker-compose exec minio mc cp /tmp/test.csv local/${MINIO_BUCKET}/
```

## Monitoring & Observability

### Check Pipeline Health

```bash
# Airflow task status
docker-compose exec airflow-webserver \
  airflow tasks state bronze_ingestion_dag trigger_airbyte_orders_sync 2024-01-01

# DBT run results
docker-compose exec dbt dbt run-operation log_run_results

# PostgreSQL query performance
docker-compose exec postgres psql -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c \
  "SELECT query, calls, total_time FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10;"
```

### Grafana Dashboard Access

1. Access http://localhost:3000
2. Default dashboard: PostgreSQL Overview
3. Key metrics:
   - Cache hit rate (should be >95%)
   - Active connections
   - TPS (transactions per second)
   - Query duration percentiles

### Custom Alerts

Add to `monitoring/prometheus/alerts.yml`:

```yaml
groups:
  - name: dbt_pipeline
    rules:
      - alert: DBTModelFailed
        expr: dbt_model_run_status{status="error"} > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "DBT model {{ $labels.model }} failed"
```

This skill covers the complete data-engineering-medallion pipeline from setup through production operations, enabling AI agents to assist with implementation, troubleshooting, and extension of medallion architecture data pipelines.

Source

Creator's repository · aradotso/data-skills

View on GitHub

Security

Security checks in progress
Results will appear here once audits complete
What this skill can do
Reads your filesConnects to the internetRuns code on your machine
Checked by 3 independent security firms
Does it try to trick the AI?Not yet checkedPending · Gen Agent Trust Hub
Does it sneak in hidden code?Not yet checkedPending · Socket
Does it have known bugs?Not yet checkedPending · Snyk