Pipeline Ecommerce Airflow 3.x Delta Lake dbt-duckdb
Arquitetura DAGs Gold KPIs

🏗️ Pipeline Ecommerce — Documentação Técnica

Arquitetura Medalhão completa: do OLTP transacional ao Data Warehouse analítico, passando por camadas progressivas de qualidade, Delta Lake com SCD2, dbt-duckdb e Semantic Layer.

📦 PostgreSQL OLTP 🦊 MinIO Data Lake 🔶 Delta Lake 🔷 dbt-duckdb ✈️ Apache Airflow 3.x 🧪 Great Expectations 🐘 Supabase DW 🔍 Trino
📋
8
Tabelas OLTP
✈️
6
DAGs Airflow
🗄️
4
Camadas Medalhão
🔶
8
Tabelas Silver
7
Modelos Gold
👁️
9
Views Semânticas
📊
13
KPIs MetricFlow
Time Travel

🚀 0. Jornada de um Dado — Do Pedido ao KPI

Por que ler esta seção primeiro? Antes de entrar nos detalhes técnicos, veja o ciclo de vida completo de um único pedido real percorrendo todas as camadas da pipeline. Isso torna todo o resto muito mais fácil de entender.

O cenário: cliente faz um pedido

Um cliente chamado João Silva (customer_id=42) faz um pedido de R$299,90 às 14:00 de uma terça-feira. Veja o que acontece com esse dado em cada camada:

📦 Origem — PostgreSQL OLTP (tempo real)

O sistema de e-commerce grava imediatamente na tabela ecommerce.orders:

order_idcustomer_idstatuschannelshipping_costdiscount_amountcreated_atupdated_at
1001 42 pending webR$ 19,90R$ 0,00 2026-03-16 14:00:00 2026-03-16 14:00:00

🗃️ Camada Raw — MinIO (extração a cada hora)

O DAG ecommerce_oltp_to_raw roda às 15:00 e detecta o pedido via watermark (updated_at > '14:00'). Salva o dado exatamente como veio do banco, sem nenhuma alteração:

s3://raw/ecommerce/orders/
  year=2026/month=03/day=16/hour=15/
    run_2026-03-16T15-00-00.parquet
# → 1 linha, idêntica ao OLTP
Por que guardar cru? A Raw é imutável e serve como "backup eterno". Se qualquer outra camada corrompida for, é possível reprocessar a partir daqui sem precisar acessar o OLTP novamente.

🥉 Camada Bronze — Validação Pydantic

O DAG ecommerce_raw_to_bronze lê o Parquet e passa cada linha pelo schema OrderSchema. O pedido é válido → vai para o bucket bronze com 2 colunas extras de rastreabilidade:

ColunaValorOrigem
order_id1001OLTP
customer_id42OLTP
statuspendingOLTP
channelwebOLTP
shipping_cost19.90OLTP
discount_amount0.00OLTP
created_at2026-03-16 14:00:00OLTP
updated_at2026-03-16 14:00:00OLTP
_bronze_loaded_at2026-03-16 15:03:22 UTCMETA pipeline
_source_files3://raw/ecommerce/orders/…/run_2026-03-16T15-00.parquetMETA pipeline

🥈 Camada Silver — Delta Lake (UPSERT)

O DAG ecommerce_bronze_to_silver faz MERGE no Delta Lake. Como o order_id=1001 não existe ainda, é um INSERT:

Coluna novaValorExplicação
_silver_loaded_at2026-03-16 15:05:00 UTCQuando entrou na Silver
_is_late_arrivalfalseorder_date e created_at com menos de 1h de diferença

🔄 Silver — Depois: o pedido é enviado (UPDATE)

3 horas depois, o pedido muda de pendingshipped no OLTP. Na próxima extração às 18:00, a pipeline detecta a mudança (watermark) e faz MERGE:

MERGE na fct_orders (upsert simples) Pedidos usam UPSERT — o registro existente é atualizado. O histórico fica no Delta Log (time travel), não na tabela em si. Para clientes e produtos (dados dimensionais), o comportamento é diferente: usa SCD Type 2.

🧬 Silver — SCD Type 2 para clientes (o mais importante!)

João Silva atualiza o email dele de joao@gmail.comjoao@empresa.com. Na Silver, o histórico é preservado:

_sk (surrogate)customer_idemail_is_current_valid_from_valid_to
a1b2c3d4…42joao@gmail.com false 2025-01-102026-03-16
e5f6a7b8…42joao@empresa.com true 2026-03-169999-12-31

Resultado: você pode responder "qual era o email do João quando ele fez o pedido em 2025?" — basta fazer JOIN pelo _valid_from / _valid_to.

🥇 Camada Gold — Star Schema via dbt-duckdb

O DAG ecommerce_silver_to_gold roda dbt que lê a Silver e produz o modelo star schema. O pedido 1001 aparece na fct_orders com surrogate keys resolvidas:

order_idcustomer_skorder_date_daystatusshipping_cost
1001 e5f6a7b8… (atual) 2026-03-16 shipped19.90
Por que surrogate key aqui? A fct_orders referencia o customer_sk (UUID da Silver), não o customer_id do OLTP. Isso permite joins históricos: se você quiser saber o endereço que o João tinha na época do pedido, basta usar o _sk correto da época.

🐘 Data Warehouse — Supabase

O DAG ecommerce_gold_to_dw faz TRUNCATE + INSERT das 7 tabelas Gold no schema ecommerce_analytics do Supabase. O pedido agora é acessível por ferramentas BI (Metabase, Grafana, etc.).

📊 KPI Final — Como o pedido vira número

O pedido 1001 contribui para vários KPIs via as views semânticas. Exemplo de como ele entra no KPI revenue:

-- View: v_sales_summary (schema semantic)
SELECT
    d.date_day,
    SUM(s.net_amount) AS revenue   -- Pedido 1001 contribui aqui
FROM ecommerce_analytics.fct_sales s
JOIN ecommerce_analytics.dim_date  d ON s.order_date_day = d.date_day
GROUP BY 1;
Resumo do ciclo completo OLTP (14:00) → Raw/15h (Parquet S3) → Bronze/15h (Pydantic validado) → Silver/15h (Delta MERGE) → Gold/09h (dbt star schema) → Supabase DW → KPI disponível. Latência total: ~19h para o dado aparecer no DW analítico (pipeline roda no dia seguinte às 09h).

🏗️ 1. Visão Geral da Arquitetura

Arquitetura Medalhão (Medallion Architecture) Dados fluem do OLTP transacional (PostgreSQL) até o DW analítico (Supabase), passando por camadas progressivas de qualidade. Cada camada adiciona rastreabilidade, validação e transformação.
flowchart LR OLTP[("`**PostgreSQL** :5440 8 tabelas`")]:::db subgraph MinIO["☁️ MinIO Data Lake"] RAW["📦 **raw/** Parquet bruto watermark incremental"]:::raw BRZ["🥉 **bronze/** Pydantic validado linhagem + quarantine"]:::bronze SLV["🥈 **silver/** Delta Lake SCD2 / Upsert / Append"]:::silver GLD["🥇 **gold/** Delta Lake Star Schema"]:::gold end SUP[("`**Supabase** ecommerce_analytics semantic views`")]:::dw TRN["🔍 **Trino** Time Travel SQL engine"]:::tool AIRFLOW["✈️ **Airflow 3.x** 6 DAGs encadeados"]:::orch OLTP -->|"incremental watermark"| RAW RAW -->|"Pydantic + quarantine"| BRZ BRZ -->|"SCD2 / Merge / Append Great Expectations"| SLV SLV -->|"dbt-duckdb net_amount calc"| GLD GLD -->|"TRUNCATE + INSERT bulk"| SUP GLD --- TRN AIRFLOW -.->|"orquestra"| OLTP AIRFLOW -.->|"orquestra"| MinIO classDef db fill:#0d419d,stroke:#58a6ff,color:#c9d1d9 classDef raw fill:#1b2a1b,stroke:#3fb950,color:#c9d1d9 classDef bronze fill:#2a1f0d,stroke:#e3b341,color:#c9d1d9 classDef silver fill:#1a1a2a,stroke:#58a6ff,color:#c9d1d9 classDef gold fill:#2a1f00,stroke:#e3b341,color:#c9d1d9 classDef dw fill:#1a0d2a,stroke:#bc8cff,color:#c9d1d9 classDef tool fill:#0d1a2a,stroke:#58a6ff,color:#c9d1d9 classDef orch fill:#1a1a1a,stroke:#8b949e,color:#c9d1d9

Componentes e Portas

ComponenteTecnologiaPortaPapel na Pipeline
postgres-oltpPostgreSQL 165440Fonte transacional (8 tabelas ecommerce)
airflow-apiAirflow 3.x Celery8080Orquestração de todos os DAGs
minioMinIO S3-compatible9000/9001Data Lake — raw/bronze/silver/gold
trinoTrino Delta Lake8090Query engine SQL com time travel
supabasePostgreSQL + PgBouncer5432Data Warehouse + Semantic Layer

🐘 2. OLTP — PostgreSQL (Fonte)

Schema: ecommerce Banco PostgreSQL porta 5440. 8 tabelas relacionadas simulando um ecommerce brasileiro. Dados gerados com Faker pt_BR a cada hora em dias úteis.
erDiagram customers { int customer_id PK string email UK string phone "30% NULL" string address_street "20% NULL" timestamp updated_at } products { int product_id PK string sku UK decimal price string category timestamp updated_at } inventory { int inventory_id PK int product_id FK int quantity_on_hand timestamp updated_at } orders { int order_id PK int customer_id FK timestamp order_date "10% late arrival" string status string channel timestamp updated_at } order_items { int order_item_id PK int order_id FK int product_id FK int quantity decimal unit_price decimal discount_pct } payments { int payment_id PK int order_id FK string payment_method decimal amount string status } reviews { int review_id PK int order_id FK int product_id FK int customer_id FK smallint rating text body "40% NULL" } sessions { uuid session_id PK int customer_id FK "60% NULL" string event_type string device_type } customers ||--o{ orders : "faz" customers ||--o{ reviews : "escreve" customers ||--o{ sessions : "navega" orders ||--o{ order_items : "contém" orders ||--|| payments : "pago com" orders ||--o{ reviews : "avaliado em" products ||--o{ order_items : "vendido em" products ||--|| inventory : "estocado" products ||--o{ reviews : "recebe" products ||--o{ sessions : "visitado"

Tabela: customers

ColunaTipoNulo?Descrição
customer_idPKSERIALNOT NULLAuto-incrementado — identificador único
first_nameVARCHAR(100)NOT NULLPrimeiro nome (Faker pt_BR)
last_nameVARCHAR(100)NOT NULLSobrenome (Faker pt_BR)
emailVARCHAR(255)UNIQUEEmail único — índice UNIQUE constraint
phoneDQVARCHAR(20)~30% NULLData Problem #2 — telefone ausente em ~30%
address_streetDQVARCHAR(255)~20% NULLData Problem #2 — endereço ausente em ~20%
address_cityVARCHAR(100)~20% NULLCidade
address_stateCHAR(2)~20% NULLUF — SP, RJ, MG, RS, PR, BA, SC, GO, PE, CE
address_zipVARCHAR(10)~20% NULLCEP
birth_dateDATEsimData de nascimento (18–70 anos)
is_activeBOOLEANNOT NULLFlag de cliente ativo (default true)
created_atTIMESTAMPNOT NULLCriação do registro no OLTP
updated_atTIMESTAMPNOT NULLÚltima modificação — watermark de extração incremental

Tabela: orders

⚠️ Data Problem #3 — Late Arrival 10% dos pedidos têm order_date retroativa (até 48h no passado vs. created_at). A Silver flageia esses casos com _is_late_arrival = true.
ColunaTipoNulo?Descrição
order_idPKSERIALNOT NULLIdentificador único do pedido
customer_idFKINTNOT NULL→ customers.customer_id
order_dateDQTIMESTAMPNOT NULL10% late arrival — até 48h no passado
statusVARCHAR(30)NOT NULLpending → confirmed → shipped → delivered / cancelled / returned
channelVARCHAR(30)simweb (45%), mobile (40%), marketplace (15%)
shipping_costNUMERIC(10,2)simFrete R$0–35; 30% gratuito
discount_amountNUMERIC(10,2)simDesconto R$0–30; 70% sem desconto
created_atTIMESTAMPNOT NULLCriação no sistema
updated_atTIMESTAMPNOT NULLÚltima atualização — watermark

Simulação de dados por ciclo (horário)

TabelaINSERTUPDATEObservação
customers~5~10 endereços2% chance de duplicata
products~2~5 preçosON CONFLICT DO NOTHING por SKU
inventory~30 estoquesVenda ou restock aleatório
orders~50~80 status10% late arrival
order_items~100–1501–3 itens por pedido
payments~50~20 status90% approved, 10% refused
reviews~1540% sem body
sessions~20060% anônimas (customer_id NULL)

✈️ 3. Airflow — DAGs e Orquestração

Airflow 3.x — CeleryExecutor — dag-processor separado Todos os DAGs ecommerce formam uma cadeia de triggers. O simulador dispara o raw, que dispara o bronze, e assim por diante. Cada DAG recebe o parent_run_id via conf para rastrear o batch correto no MinIO.

Cadeia de Triggers

ecommerce_data_simulator
@hourly 11–21 UTC
ecommerce_oltp_to_raw ecommerce_raw_to_bronze ecommerce_bronze_to_silver ecommerce_silver_to_gold ecommerce_gold_to_dw
flowchart TD SIM["⏰ **ecommerce_data_simulator** Faker CDC: INSERT/UPDATE/DELETE seg-sex 08:00–18:00 BRT"] RAW["📦 **ecommerce_oltp_to_raw** Extração incremental watermark 8 tabelas → Parquet particionado"] BRZ["🥉 **ecommerce_raw_to_bronze** Pydantic validation válidos → bronze / inválidos → quarantine"] SLV["🥈 **ecommerce_bronze_to_silver** SCD2 | Upsert | Append+Dedup Great Expectations soft-fail"] GLD["🥇 **ecommerce_silver_to_gold** dbt-duckdb run + test Parquet → Delta Lake"] DW["🐘 **ecommerce_gold_to_dw** TRUNCATE + INSERT bulk → Supabase Recria 9 semantic views"] SIM -->|"TriggerDagRunOperator"| RAW RAW -->|"conf: parent_run_id"| BRZ BRZ -->|"conf: parent_run_id"| SLV SLV -->|"TriggerDagRunOperator"| GLD GLD -->|"all_success"| DW style SIM fill:#1a2a1a,stroke:#3fb950 style RAW fill:#1a1a2a,stroke:#58a6ff style BRZ fill:#2a1a0a,stroke:#e3b341 style SLV fill:#0a1a2a,stroke:#58a6ff style GLD fill:#2a2a0a,stroke:#e3b341 style DW fill:#1a0a2a,stroke:#bc8cff

DAG 1: ecommerce_data_simulator

task_simulate_customers
PythonOperator

INSERT 5 clientes + UPDATE 10 endereços/telefones via Faker pt_BR

task_simulate_products
PythonOperator

INSERT 2 produtos novos + UPDATE 5 preços/status

task_simulate_inventory
PythonOperator

UPDATE 30 registros de estoque (venda ou restock)

task_simulate_orders
PythonOperator

INSERT 50 pedidos (items + payments) + UPDATE 80 status

task_simulate_payments
PythonOperator

UPDATE payments pending → approved (90%) / refused (10%)

task_simulate_sessions
PythonOperator

INSERT 200 eventos de navegação. 60% anônimos

task_simulate_reviews
PythonOperator

INSERT 15 avaliações de pedidos delivered. 40% sem body

task_branch_gdpr
BranchPython

~1/24 execuções: dispara GDPR delete; resto: skip

task_gdpr_delete
PythonOperator

Hard delete de 1 customer inativo (simula GDPR erasure)

task_trigger_raw_to_minio
TriggerDagRun

Dispara ecommerce_oltp_to_raw após ciclo completo

DAG 2: ecommerce_oltp_to_raw

Mecanismo de Watermark Cada tabela tem uma Airflow Variable ecommerce_watermark_{table} (default: epoch). Extrai apenas WHERE updated_at > watermark e atualiza ao max(timestamp) encontrado.
TabelaWatermark colCaminho MinIO
customers, products, inventory, ordersupdated_atraw/ecommerce/{table}/year=Y/month=M/day=D/hour=H/{batch}.parquet
order_items, payments, reviews, sessionscreated_atTabelas append-only — usa criação como watermark

DAG 3: ecommerce_raw_to_bronze

Lê cada arquivo Parquet do batch, valida com Pydantic v2 e separa válidos de inválidos:

flowchart LR subgraph bronze_flow["Para cada tabela"] P["📄 Parquet raw\nbatch_id"] V{Pydantic\nvalidation} OK["✅ bronze/\n+_bronze_loaded_at\n+_raw_file_path\n+_validation_passed"] QR["🚫 quarantine/\n+_validation_errors JSON"] end P --> V V -->|"linhas válidas"| OK V -->|"linhas inválidas"| QR style OK fill:#1b2a1b,stroke:#3fb950 style QR fill:#2a1a1a,stroke:#f78166

DAG 4: ecommerce_bronze_to_silver

EstratégiaTabelasComportamentoColunas extras
SCD Type 2 customers, products, inventory Mantém histórico de todas as versões. Alterações criam nova linha com _is_current=True _valid_from, _valid_to, _is_current, _sk, _silver_loaded_at
Upsert (Merge) orders, payments INSERT novos + UPDATE existentes via Delta MERGE. Idempotente _is_late_arrival (orders), _silver_loaded_at
Append + Dedup order_items, reviews, sessions Insere apenas PKs não existentes. Não sobrescreve histórico _silver_loaded_at

Como o SCD2 funciona passo a passo

Primeira carga

Todos os registros entram com _is_current=True, _valid_to=NULL, _sk=uuid(). Delta Lake criado em modo overwrite.

Detectar mudanças

Compara colunas monitoradas (nome, email, endereço, preço...) entre novo batch e versão _is_current=True existente. Usa merge do pandas por PK natural.

Fechar versão antiga

Para cada PK alterado: Delta MERGE atualiza _is_current=False e _valid_to=now() na versão ativa anterior.

Inserir nova versão

Linha nova (ou novos PKs) inserida com _is_current=True, _valid_from=now(), _valid_to=NULL, novo _sk=uuid().

Validação Great Expectations

Suite de expectations roda sobre a tabela Silver completa. Falhas geram WARN nos logs Airflow mas não bloqueiam a pipeline (soft failure).

DAG 5: ecommerce_silver_to_gold

fix_fct_orders
@task

Repara tabela fct_orders corrompida/vazia no silver se necessário

ensure_gold_bucket
@task

Cria bucket gold no MinIO se não existir

dbt_gold_run
@task.bash

dbt run — 7 modelos gold via dbt-duckdb com delta_scan()

dbt_gold_test
@task.bash

dbt test — valida qualidade dos 7 modelos (not_null, unique, accepted_values)

convert_gold_to_delta
@task

Converte Parquet gerado pelo dbt → Delta Lake (habilita time travel via Trino)

task_trigger_supabase
TriggerDagRun

Dispara ecommerce_gold_to_dw após all_success

DAG 6: ecommerce_gold_to_dw

Carrega Gold → Supabase via TRUNCATE + INSERT em bulk (psycopg2 execute_values, page_size=1000):

flowchart TD SETUP["setup_schema\nCria schemas ecommerce_analytics + semantic"] DD["load_dim_date"]:::dim DC["load_dim_customer"]:::dim DP["load_dim_product"]:::dim FO["load_fct_orders"]:::fct FS["load_fct_sales"]:::fct FP["load_fct_payments"]:::fct FR["load_fct_reviews"]:::fct SEM["refresh_semantic\nRecria 9 views analíticas"] SETUP --> DD & DC & DP DD & DC & DP --> FO & FS & FP & FR FO & FS & FP & FR --> SEM classDef dim fill:#1a1a0a,stroke:#e3b341 classDef fct fill:#0a0a1a,stroke:#bc8cff

📦 4. Camada Raw — MinIO Parquet

Bucket: s3://raw/ | Formato: Apache Parquet (engine pyarrow)

Particionamento: ecommerce/{table}/year=Y/month=M/day=D/hour=H/{batch_id}.parquet

Landing Zone — dados brutos imutáveis A camada Raw preserva exatamente o que estava no OLTP: nulos originais, duplicatas, late arrivals. Nenhuma transformação além dos metadados de ingestão. Essencial para auditoria e reprocessamento.

Metadados adicionados em cada arquivo:

# Adicionados após extração do PostgreSQL
df["_ingested_at"]   = ingested_at        # timestamp da ingestão
df["_batch_id"]      = batch_id           # derivado do run_id do Airflow
df["_source_table"]  = "ecommerce.orders" # nome completo da tabela

🥉 5. Camada Bronze — Validação Pydantic

Bucket: s3://bronze/ | Quarantine: s3://quarantine/

Schemas Pydantic por tabela

class OrderSchema(_BaseEcommerce):
    order_id:        int
    customer_id:     int
    order_date:      datetime
    status:          str
    channel:         Optional[str]   = None
    shipping_cost:   Optional[float] = None
    discount_amount: Optional[float] = None
    created_at:      datetime
    updated_at:      datetime

    @field_validator("status")
    @classmethod
    def valid_status(cls, v):
        valid = {"pending","confirmed","shipped","delivered","cancelled","returned"}
        if str(v).lower() not in valid:
            raise ValueError(f"invalid order status: {v}")
        return v
SchemaValidações principais
CustomerSchemacustomer_id > 0, email contém "@", phone/address opcionais
ProductSchemaproduct_id > 0, price >= 0, cost >= 0
InventorySchemainventory_id > 0, quantities >= 0
OrderSchemaorder_id > 0, status ∈ {pending,confirmed,...}, shipping >= 0
PaymentSchemapayment_id > 0, amount > 0
OrderItemSchemaorder_item_id > 0, quantity > 0, unit_price > 0
ReviewSchemareview_id > 0, rating ∈ [1,5]
SessionSchemasession_id não vazio, customer_id Optional (60% NULL esperado)

Exemplo de registro na quarantine:

{
  "review_id": 4521,
  "rating": 6,
  "_validation_errors": "[{\"type\":\"value_error\",\"loc\":[\"rating\"],\"msg\":\"rating must be 1-5, got 6\"}]"
}

🥈 6. Camada Silver — Delta Lake

Bucket: s3://silver/ | Formato: Delta Lake (Parquet + _delta_log/)

Por que Delta Lake? ACID transactions (writes atômicos), MERGE eficiente (sem reescrever toda a tabela), schema enforcement automático, e time travel para consultar versões anteriores dos dados.

Tabelas Silver e colunas de metadados

Tabela SilverEstratégiaGrainColunas extras notáveis
dim_customersSCD2customer_id + _sk_valid_from, _valid_to, _is_current, _sk, _dq_null_phone, _dq_null_address
dim_productsSCD2product_id + _sk_valid_from, _valid_to, _is_current, _sk
dim_inventorySCD2inventory_id + _sk_valid_from, _valid_to, _is_current, _sk
fct_ordersUpsertorder_id_is_late_arrival, _silver_loaded_at
fct_paymentsUpsertpayment_id_silver_loaded_at
fct_order_itemsAppendorder_item_id_silver_loaded_at
fct_reviewsAppendreview_id_silver_loaded_at
fct_sessionsAppendsession_id_silver_loaded_at

Código do MERGE SCD2 (Delta Lake Python):

# Fechar versão antiga (changed_pks)
df_close = df_active[df_active[pk].isin(changed_pks)].copy()
df_close["_is_current"] = False
df_close["_valid_to"]   = ts  # agora UTC

DeltaTable(silver_path, storage_options=STORAGE_OPTIONS)\
    .merge(_to_arrow(df_close), "target._sk = source._sk", "source", "target")\
    .when_matched_update_all().execute()

# Inserir nova versão
df_ins = df_new[df_new[pk].isin(to_insert)]
write_deltalake(silver_path, _to_arrow(df_ins), mode="append",
                storage_options=STORAGE_OPTIONS)

🧪 7. Qualidade de Dados — Great Expectations

Soft failure — não bloqueia a pipeline Falhas nas expectations geram WARN nos logs Airflow mas a pipeline continua. O objetivo é monitorar a qualidade, não parar o fluxo em produção.
TabelaExpectations
customerscustomer_id not_null, email not_null, _sk not_null + unique, row_count ≥ 1
productsproduct_id not_null, price ≥ 0, _sk not_null + unique, row_count ≥ 1
inventoryinventory_id not_null, quantity_on_hand ≥ 0, row_count ≥ 1
ordersorder_id not_null, status ∈ {pending,confirmed,shipped,delivered,cancelled,returned}, row_count ≥ 1
paymentspayment_id not_null, amount > 0, row_count ≥ 1
order_itemsorder_item_id unique, quantity ≥ 1, row_count ≥ 1
reviewsreview_id unique, rating ∈ [1,5], row_count ≥ 1
sessionssession_id unique, row_count ≥ 1

Modo ephemeral — sem armazenamento persistente:

ctx   = gx.get_context(mode="ephemeral")
ds    = ctx.data_sources.add_pandas(f"ds_{table}")
asset = ds.add_dataframe_asset(f"asset_{table}")
bd    = asset.add_batch_definition_whole_dataframe("batch")
suite = ctx.suites.add(gx.ExpectationSuite(name=f"{table}_suite"))
# adiciona expectations...
result = bd.get_batch(batch_parameters={"dataframe": df}).validate(suite)
# WARN nos logs se falhar, mas retorna True (não bloqueia)

🥇 8. Camada Gold — Star Schema via dbt-duckdb

Projeto dbt: airflow/dbt/ecommerce_dbt/ | Engine: DuckDB + extensão Delta Lake

erDiagram dim_customer { bigint customer_id PK text customer_sk text email text address_state boolean is_active boolean _dq_null_phone } dim_product { bigint product_id PK text product_sk text category text brand decimal price } dim_date { date date_day PK int year int month text month_name boolean is_weekend } fct_sales { bigint order_item_id PK bigint order_id FK bigint product_id FK bigint customer_id FK date order_date FK decimal net_amount int quantity decimal discount_pct } fct_orders { bigint order_id PK bigint customer_id FK date order_date FK text status text channel boolean _is_late_arrival } fct_payments { bigint payment_id PK bigint order_id FK bigint customer_id FK date payment_date FK decimal amount text payment_method } fct_reviews { bigint review_id PK bigint product_id FK bigint customer_id FK date review_date FK int rating } dim_customer ||--o{ fct_sales : "customer_id" dim_product ||--o{ fct_sales : "product_id" dim_date ||--o{ fct_sales : "order_date" dim_customer ||--o{ fct_orders : "customer_id" dim_date ||--o{ fct_orders : "order_date" dim_customer ||--o{ fct_payments : "customer_id" dim_date ||--o{ fct_payments : "payment_date" dim_product ||--o{ fct_reviews : "product_id" dim_customer ||--o{ fct_reviews : "customer_id" dim_date ||--o{ fct_reviews : "review_date"

Cálculo chave: net_amount em fct_sales

-- models/gold/facts/fct_sales.sql
SELECT
    oi.order_item_id,
    oi.order_id,
    oi.product_id,
    p._sk                                                    AS product_sk,
    o.customer_id,
    c._sk                                                    AS customer_sk,
    CAST(o.order_date AS DATE)                               AS order_date,
    oi.quantity,
    oi.unit_price,
    oi.discount_pct,
    ROUND(
        oi.quantity * oi.unit_price
        * (1 - COALESCE(oi.discount_pct, 0) / 100.0), 2
    )                                                        AS net_amount
FROM delta_scan('s3://silver/ecommerce/fct_order_items') oi
LEFT JOIN delta_scan('s3://silver/ecommerce/fct_orders')    o ON o.order_id   = oi.order_id
LEFT JOIN delta_scan('s3://silver/ecommerce/dim_products')  p ON p.product_id = oi.product_id AND p._is_current = true
LEFT JOIN delta_scan('s3://silver/ecommerce/dim_customers') c ON c.customer_id= o.customer_id AND c._is_current = true

Colunas de cada tabela Gold

dim_customer

ColunaTipoDescrição
customer_idPKBIGINTPK natural do cliente
customer_skSKTEXTSurrogate key SCD2 (= _sk da Silver)
emailTEXTEmail único
address_stateTEXTEstado (UF)
is_activeBOOLEANCliente ativo
_dq_null_phoneDQBOOLEANFlag: telefone ausente (~30%)
_dq_null_addressDQBOOLEANFlag: endereço ausente (~20%)

fct_sales (grain mais fino — receita por item)

ColunaTipoDescrição
order_item_idPKBIGINTPK do item
order_idFKBIGINT→ fct_orders
product_idFKBIGINT→ dim_product
customer_idFKBIGINT→ dim_customer
order_dateFKDATE→ dim_date
quantityINTUnidades vendidas
unit_priceNUMERIC(12,2)Preço unitário no momento da compra
discount_pctNUMERIC(5,2)Desconto em % (NULL tratado como 0)
net_amountCALCNUMERIC(14,2)qty × price × (1 - disc/100) — receita líquida

dim_date (spine completa)

ColunaTipoDescrição
date_dayPKDATEDia (2020-01-01 → 2030-12-31) — 4.018 linhas
yearINTAno
monthINTMês (1–12)
dayINTDia do mês
quarterINTTrimestre (1–4)
week_of_yearINTSemana do ano
month_nameTEXTJanuary, February...
day_nameTEXTMonday, Tuesday...
is_weekendBOOLEANSábado ou Domingo

🐘 9. Data Warehouse — Supabase

Host: supabase-pooler:5432 | Schema analytics: ecommerce_analytics | Schema semântico: semantic

Estratégia de carga: TRUNCATE + INSERT Cada tabela é truncada e recarregada em bulk a cada execução. Simples, idempotente e rápido para os volumes desta pipeline. psycopg2.extras.execute_values com page_size=1000.

Volume estimado de dados

TabelaGrainVolume típicoÍndices
dim_customercustomer_id~1.000PK
dim_productproduct_id~500PK
dim_datedate_day4.018PK
fct_ordersorder_id~10.000customer_id, order_date, status
fct_salesorder_item_id~25.000product_id, customer_id, order_date
fct_paymentspayment_id~10.000payment_date, payment_method
fct_reviewsreview_id~8.000product_id, rating

👁️ 10. Semantic Layer — Views Analíticas

Schema: semantic no Supabase | 9 views prontas para Metabase, Grafana, Superset

vw_overview
KPIs globais do ecommerce — painel único. Uma linha só.
total_customersactive_productstotal_orderstotal_revenueavg_order_valueavg_ratingtotal_reviews
vw_daily_revenue
Receita e volume por dia. Ideal para gráficos de tendência temporal.
order_dateyearmonthis_weekendtotal_ordersrevenueunits_sold
vw_revenue_by_product
Performance por produto — combina vendas com avaliações.
product_namecategorybrandrevenueunits_soldavg_ratingreview_count
vw_revenue_by_category
Receita agregada por categoria e subcategoria.
categorysubcategoryproduct_countrevenueavg_discount_pct
vw_customer_ltv
LTV e comportamento por cliente. Inclui primeiro/último pedido.
customer_idltvtotal_ordersavg_order_valuefirst_order_dateavg_rating_given
vw_payment_methods
Volume e taxa de sucesso por método de pagamento e status.
payment_methodstatustransactionstotal_amountavg_installments
vw_rating_by_product
Distribuição de ratings — cinco_star, four_star, three_or_less.
product_nameavg_ratingreview_countfive_starfour_starthree_or_less
vw_order_status
Pedidos agrupados por status e canal de venda.
statuschannelorder_counttotal_shippingtotal_discounts
vw_monthly_cohorts
Retenção mensal por coorte de aquisição de clientes (cohort analysis).
cohort_monthorder_monthmonths_since_firstactive_customersrevenue

Exemplo de cohort analysis SQL:

WITH first_order AS (
    SELECT customer_id,
           DATE_TRUNC('month', MIN(order_date))::DATE AS cohort_month
    FROM ecommerce_analytics.fct_orders
    GROUP BY customer_id
)
SELECT
    f.cohort_month,
    DATE_TRUNC('month', o.order_date)::DATE               AS order_month,
    EXTRACT(MONTH FROM AGE(o.order_date, f.cohort_month))::INT AS months_since_first,
    COUNT(DISTINCT o.customer_id)                         AS active_customers,
    SUM(s.net_amount)                                     AS revenue
FROM first_order f
JOIN ecommerce_analytics.fct_orders o ON o.customer_id = f.customer_id
JOIN ecommerce_analytics.fct_sales  s ON s.order_id    = o.order_id
GROUP BY 1, 2, 3
ORDER BY 1, 3;

🔷 11. Semantic Models dbt — MetricFlow

dbt MetricFlow + Semantic Layer Define formalmente entidades, dimensões e medidas que o MetricFlow usa para gerar SQL automaticamente. Permite dbt sl query --metrics revenue sem escrever SQL manualmente.
Semantic ModelGrainEntidadesMedidas disponíveis
salesorder_item_idorder_item(primary), order, product, customerrevenue, units_sold, order_items_count, avg_unit_price, avg_discount_pct
ordersorder_idorder(primary), customerorder_count, shipping_cost_total, discount_total, late_arrival_count
paymentspayment_idpayment(primary), order, customerpayment_amount, payment_count, avg_installments
reviewsreview_idreview(primary), product, customerreview_count, avg_rating
customersdimensioncustomer(primary)
productsdimensionproduct(primary)
# semantic_models.yml — trecho do modelo sales
- name: sales
  description: "Grain: order_item_id. Fonte central de receita."
  model: ref('fct_sales')
  defaults:
    agg_time_dimension: order_date

  entities:
    - name: order_item
      type: primary
      expr: order_item_id
    - name: product
      type: foreign
      expr: product_id

  measures:
    - name: revenue
      description: Receita líquida após descontos
      agg: sum
      expr: net_amount
    - name: units_sold
      agg: sum
      expr: quantity

📊 12. KPIs e Métricas — MetricFlow

Receita

revenue
Receita Total
simple
SUM(net_amount)
WHERE status != 'cancelled'
revenue_all
Receita Bruta (todos status)
simple
SUM(net_amount)
sem filtro de status
avg_order_value
Ticket Médio
derived
revenue / NULLIF(order_count, 0)

Pedidos

order_count
Total de Pedidos
simple
COUNT_DISTINCT(order_id)
orders_completed
Pedidos Entregues
simple
COUNT_DISTINCT(order_id)
WHERE status = 'delivered'
cancellation_rate
Taxa de Cancelamento
derived
orders_cancelled / NULLIF(order_count, 0)

Satisfação & Pagamentos

avg_rating
Rating Médio (1-5)
simple
AVG(rating)
positive_review_rate
Taxa de Reviews Positivas
ratio
COUNT(rating ≥ 4) / COUNT(review_id)
units_sold
Unidades Vendidas
simple
SUM(quantity)
payment_volume
Volume de Pagamentos
simple
SUM(amount)
payment_success_rate
Taxa de Aprovação
ratio
COUNT(status='completed') / COUNT(payment_id)

🔍 13. Trino — Query Engine e Time Travel

Porta: 8090 | Catálogos: gold e silver | Connector: delta_lake | Metastore: file-based

Time Travel sem HMS (Hive Metastore Server) Trino 405+ suporta metastore baseado em arquivo diretamente — sem necessidade de iniciar um Hive Metastore Service. O catálogo é descoberto automaticamente a partir do diretório raiz do bucket.

Configuração do catálogo gold:

connector.name=delta_lake
hive.metastore=file
hive.metastore.catalog.dir=s3://gold/
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true
hive.s3.aws-access-key=airflow
hive.s3.ssl.enabled=false
delta.enable-non-concurrent-writes=true

Exemplos de queries com time travel:

-- Versão atual
SELECT * FROM gold.ecommerce.fct_orders LIMIT 10;

-- Time travel por versão do Delta Log
SELECT * FROM gold.ecommerce.fct_orders FOR VERSION AS OF 2;

-- Time travel por timestamp
SELECT * FROM gold.ecommerce.fct_orders
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-14 00:00:00';

-- Comparar receita: versão anterior vs. atual
SELECT 'v1' AS ver, SUM(net_amount) FROM gold.ecommerce.fct_sales FOR VERSION AS OF 1
UNION ALL
SELECT 'atual',     SUM(net_amount) FROM gold.ecommerce.fct_sales;

-- Clientes ativos vs. histórico completo SCD2
SELECT 'ativos',    COUNT(*) FROM silver.ecommerce.dim_customers WHERE _is_current = true
UNION ALL
SELECT 'histórico', COUNT(*) FROM silver.ecommerce.dim_customers;

-- Receita por canal — top 3
SELECT channel, SUM(s.net_amount) AS revenue
FROM gold.ecommerce.fct_orders  o
JOIN gold.ecommerce.fct_sales   s ON s.order_id = o.order_id
GROUP BY channel
ORDER BY revenue DESC
LIMIT 3;

⚠️ 14. Problemas de Dados Intencionais

A pipeline foi projetada para exercitar cenários reais de data quality:

#ProblemaOnde ocorre% / Freq.Como a pipeline trata
1 Duplicatas customers (INSERT) ~2% Bronze: ambas passam (emails diferentes). Silver: drop_duplicates(pk, keep=last)
2 phone = NULL customers ~30% Pydantic: Optional[str] — válido. Silver: _dq_null_phone=True
address = NULL customers ~20% Pydantic: campos Optional. Silver: _dq_null_address=True
review.body = NULL reviews ~40% Pydantic: Optional[str]. Propagado como NULL até o DW
customer_id = NULL sessions ~60% Pydantic: Optional[int]. Sessões anônimas preservadas
3 Late Arrival orders.order_date ~10% Silver: _is_late_arrival = order_date < created_at - 1h. MetricFlow: late_arrival_count
4 GDPR Delete customers (hard delete) ~1/24 exec Hard delete no OLTP. Sessions e reviews deletadas em cascata. Próxima ingestão não verá mais esses dados

💧 16. Padrão Watermark — Extração Incremental

O problema que o Watermark resolve Fazer SELECT * de uma tabela com milhões de linhas toda hora é inviável. O watermark é uma "marca d'água" — um timestamp salvo que diz: "da última vez que rodei, peguei tudo até este momento. Na próxima, pego só o que é mais novo."

Como funciona passo a passo

Primeira execução (carga inicial)

Watermark = 1970-01-01 00:00:00 (época Unix zero). A query extrai tudo: WHERE updated_at > '1970-01-01'. Ao final, salva o timestamp atual como novo watermark.

Execuções subsequentes (incremental)

Watermark = último valor salvo (ex: 2026-03-16 14:00:00). Query: WHERE updated_at > '2026-03-16 14:00:00'. Busca apenas o delta — pode ser 0 linhas ou poucas centenas.

Persistência no Airflow

O watermark é salvo como Airflow Variable: ecommerce_watermark_orders, ecommerce_watermark_customers, etc. Uma variável por tabela para controle independente.

Atualização do watermark

Só atualizado após sucesso da gravação no MinIO. Se o Airflow cair no meio, na próxima execução reprocessa o mesmo intervalo (idempotência). Nada se perde.

Código real da pipeline

from airflow.models import Variable

# Lê watermark atual (padrão: época zero se nunca rodou)
wm_key = f"ecommerce_watermark_{table}"
last_watermark = Variable.get(wm_key, default_var="1970-01-01 00:00:00")

# Query incremental no PostgreSQL OLTP
query = f"""
    SELECT * FROM ecommerce.{table}
    WHERE updated_at > '{last_watermark}'
    ORDER BY updated_at
"""
df = pd.read_sql(query, conn)

if not df.empty:
    # Salva Parquet no MinIO
    save_to_raw(df, table)

    # Atualiza watermark para o maior updated_at do lote
    new_watermark = df["updated_at"].max().isoformat()
    Variable.set(wm_key, new_watermark)

Exceção: tabela sessions (usa created_at)

Sessões são imutáveis — uma vez criadas, nunca são atualizadas. Por isso usa created_at como watermark em vez de updated_at. Os pedidos (orders) têm ambos: created_at para novos pedidos e updated_at para mudanças de status.

Risco: late arrivals (chegadas tardias)

O que é e como tratamos ~10% dos eventos chegam com timestamps antigos (ex: um pedido mobile que ficou em cache offline). O campo _is_late_arrival na Silver marca esses registros. A extração usa uma janela de segurança de 1h: se order_date for mais de 1h anterior ao created_at, é late arrival.

🚨 17. Fluxo de Quarentena — O que acontece com dados inválidos

~2% dos dados chegam com problemas Erros de sistema, bugs no app, dados legados — sempre há linhas problemáticas. A quarentena impede que esses dados corrompam as análises sem simplesmente descartá-los.

Tipos de erro capturados

email sem @

Campo email inválido no CustomerSchema. Ex: "joaogmail.com". Validador Pydantic rejeita.

rating fora de 1-5

ReviewSchema rejeita rating=0 ou rating=6. Pode vir de bug no frontend ou dados migrados.

status inválido

OrderSchema aceita só: pending, confirmed, shipped, delivered, cancelled, returned. Qualquer outro vai para quarentena.

preço negativo

ProductSchema rejeita price ou cost < 0. Pode acontecer com créditos/estornos mal mapeados.

IDs não positivos

Todos os schemas rejeitam IDs ≤ 0. Protege contra dados de teste ou registros deletados parcialmente.

tipo incorreto

Pydantic tenta coerção automática (string "123" → int 123). Se falhar, linha vai para quarentena.

Onde ficam os dados em quarentena?

s3://quarantine/ecommerce/{table}/
  year=2026/month=03/day=16/
    quarantine_2026-03-16T15-03-22.parquet

Estrutura do registro na quarentena

ColunaTipoConteúdo
… todas as colunas originais da linha …
_validation_errorsJSONLista de erros Pydantic: [{"field":"email","msg":"invalid email: joaogmail.com"}]
_quarantined_atTIMESTAMPQuando o erro foi detectado
_source_fileTEXTArquivo Parquet de origem no bucket raw

Como reprocessar dados da quarentena

# 1. Ler registros com erro
import pandas as pd
df_q = pd.read_parquet("s3://quarantine/ecommerce/customers/year=2026/...")

# 2. Ver erros por tipo
import json
df_q["errors_parsed"] = df_q["_validation_errors"].apply(json.loads)
df_q["error_field"] = df_q["errors_parsed"].apply(lambda e: e[0]["field"])
print(df_q.groupby("error_field").size())

# 3. Corrigir e reenviar para bronze
df_fixed = df_q.copy()
df_fixed["email"] = df_fixed["email"].str.strip().str.lower()
# ... correções ...
df_fixed.drop(columns=["_validation_errors","_quarantined_at","_source_file"]).to_parquet(
    "s3://bronze/ecommerce/customers/reprocessed_2026-03-16.parquet"
)
Princípio importante: falha suave (soft failure) A pipeline não para quando encontra dados inválidos. Linhas ruins vão para quarentena, linhas boas continuam o fluxo. Isso garante que 98% dos dados cheguem ao DW mesmo quando há problemas pontuais na fonte.

🔍 18. Como Usar o Data Warehouse — Guia Prático

3 formas de acessar os dados Você pode consultar via Supabase SQL Editor (browser), via Trino (query engine para Delta Lake com time travel) ou via Metabase (dashboards visuais).

Queries prontas — Supabase (schema ecommerce_analytics)

Receita dos últimos 30 dias

SELECT
    d.date_day,
    ROUND(SUM(s.net_amount)::numeric, 2) AS receita_diaria,
    COUNT(DISTINCT s.order_id)           AS pedidos
FROM ecommerce_analytics.fct_sales s
JOIN ecommerce_analytics.dim_date  d ON s.order_date_day = d.date_day
WHERE d.date_day >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1 DESC;

Top 10 produtos por receita

SELECT
    p.name,
    p.category,
    ROUND(SUM(s.net_amount)::numeric, 2) AS receita_total,
    SUM(s.quantity)                       AS unidades_vendidas
FROM ecommerce_analytics.fct_sales    s
JOIN ecommerce_analytics.dim_product  p ON s.product_sk = p.product_sk
GROUP BY 1, 2
ORDER BY 3 DESC
LIMIT 10;

Ticket médio e NPS por canal

SELECT
    o.channel,
    COUNT(DISTINCT o.order_id)                      AS total_pedidos,
    ROUND(AVG(o.shipping_cost)::numeric, 2)         AS frete_medio,
    ROUND(AVG(r.rating)::numeric, 2)                AS rating_medio
FROM ecommerce_analytics.fct_orders   o
LEFT JOIN ecommerce_analytics.fct_reviews r
       ON r.order_id = o.order_id
WHERE o.channel IS NOT NULL
GROUP BY 1
ORDER BY 2 DESC;

Views semânticas prontas (sem precisar escrever joins)

-- Resumo de vendas já com joins feitos
SELECT * FROM semantic.v_sales_summary    LIMIT 100;

-- Análise RFM de clientes
SELECT * FROM semantic.v_customer_rfm
WHERE recency_days < 30 AND frequency > 5
ORDER BY monetary DESC;

-- Funil de conversão
SELECT * FROM semantic.v_order_funnel;

-- Cohort de retenção
SELECT * FROM semantic.v_cohort_retention
WHERE cohort_month >= '2026-01-01';

Queries Trino — Delta Lake com Time Travel

-- Conectar ao Trino: jdbc:trino://localhost:8080/gold

-- Ver versões disponíveis de uma tabela
DESCRIBE HISTORY gold.ecommerce.fct_orders;

-- Receita ontem vs. semana passada (time travel)
SELECT 'semana_passada' AS periodo, ROUND(SUM(net_amount),2) AS receita
FROM gold.ecommerce.fct_sales
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-09 23:59:59'
UNION ALL
SELECT 'ontem', ROUND(SUM(net_amount),2)
FROM gold.ecommerce.fct_sales
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-15 23:59:59';

-- Auditoria: ver estado de um cliente em data específica
SELECT customer_id, email, address_city
FROM silver.ecommerce.dim_customers
FOR TIMESTAMP AS OF TIMESTAMP '2025-06-01 00:00:00'
WHERE customer_id = 42;

Entendendo o Star Schema — Como fazer JOINs

erDiagram dim_customer { uuid customer_sk PK int customer_id string email string address_city } dim_product { uuid product_sk PK int product_id string name string category } dim_date { date date_day PK int year int month string month_name } fct_orders { int order_id PK uuid customer_sk FK date order_date_day FK string status float shipping_cost } fct_sales { int order_item_id PK int order_id uuid customer_sk FK uuid product_sk FK date order_date_day FK float net_amount } fct_reviews { int review_id PK uuid customer_sk FK uuid product_sk FK int rating } dim_customer ||--o{ fct_orders : "faz" dim_customer ||--o{ fct_sales : "compra" dim_customer ||--o{ fct_reviews : "avalia" dim_product ||--o{ fct_sales : "vendido em" dim_product ||--o{ fct_reviews : "avaliado em" dim_date ||--o{ fct_orders : "data do pedido" dim_date ||--o{ fct_sales : "data da venda"
Regra de ouro do Star Schema Sempre comece pela tabela de fato (fct_*) e faça JOIN com as dimensões (dim_*). Nunca faça JOIN de fato com fato diretamente — use uma dimensão como ponte. Ex: para ver produtos de clientes de SP, vá: fct_sales → dim_customer (WHERE address_state='SP') → dim_product.

Checklist: o que está disponível onde

PerguntaOnde buscarTabela/View
Qual a receita do mês?Supabase / Trinofct_sales + dim_date
Qual produto vende mais?Supabasesemantic.v_product_performance
Taxa de conversão por canal?Supabasesemantic.v_order_funnel
Clientes mais valiosos (LTV)?Supabasesemantic.v_customer_rfm
Evolução de estoque?Supabasesemantic.v_inventory_health
Como os dados estavam 1 semana atrás?TrinoFOR TIMESTAMP AS OF …
Registros em quarentena (erros)?MinIO / DuckDBs3://quarantine/ecommerce/…
Histórico de mudanças de cliente?Trino Silversilver.ecommerce.dim_customers (_is_current=false)

🩺 19. DAG de Monitoramento — Pipeline Health & SPC

Para que serve Uma única DAG que você dispara (ou que roda automaticamente ao final de cada ciclo) e cujo log já te diz tudo: se cada DAG rodou com sucesso, se os dados trafegaram em todas as camadas, se a quarentena está dentro do normal e se os números de hoje fazem sentido estatisticamente.

Fluxo de execução

flowchart TD A[check_dag_runs\nAirflow metadata DB] --> Z B[check_raw\nMinIO s3://raw] --> Z C[check_bronze_quarantine\nMinIO s3://bronze + quarantine] --> Z D[check_silver\nDelta Lake Silver] --> Z E[check_gold\nDelta Lake Gold] --> F F[check_supabase\nPostgreSQL DW] --> G C --> G D --> G G[run_spc\nControle Estatístico] Z[compile_report\nRelatório GREEN/YELLOW/RED] G --> Z E --> Z A --> Z B --> Z

O que cada task verifica

check_dag_runs
Airflow DB

Consulta o metadata DB do Airflow e lista o último estado de cada uma das 6 DAGs da pipeline: success, failed, running. Se qualquer DAG falhou → RED imediato.

check_raw
MinIO

Lista arquivos Parquet no bucket s3://raw/ para as 8 tabelas. Verifica se existem arquivos nas últimas 48h. WARN se faltou alguma tabela.

check_bronze_quarantine
MinIO

Conta registros no bronze e na quarentena por tabela. Lê os arquivos Parquet da quarentena e extrai o campo _validation_errors para listar erros por campo. WARN se taxa > 5%, FAIL se > 15%.

check_silver
Delta Lake

Abre cada uma das 8 Delta tables da Silver. Verifica contagem > 0, freshness (<48h) e integridade SCD2: nenhum customer_id ou product_id pode ter mais de 1 linha com _is_current=True.

check_gold
Delta Lake

Verifica as 7 tabelas Gold. Checa que fct_sales não tem surrogate keys nulos (null SK rate < 1%). Compara contagem com Silver para consistência.

check_supabase
PostgreSQL

Conecta ao Supabase e compara as contagens com o Gold (drift < 5%). Testa as 9 views semânticas. Verifica max(updated_at) em fct_orders para garantir frescor.

run_spc
Estatístico

Controle Estatístico de Processo: compara 4 métricas do dia contra a média e desvio padrão dos últimos 14 dias. Alerta se z-score > 2 (WARN) ou > 3 (FAIL). Histórico salvo em Airflow Variable.

compile_report
Relatório

Agrega todos os resultados e imprime o relatório completo no log. Salva o JSON do relatório e o status geral (GREEN/YELLOW/RED) como Airflow Variables.

Como o relatório aparece no log

╔══════════════════════════════════════════════════════════════╗
║       PIPELINE HEALTH REPORT — ecommerce — 2026-03-16       ║
╠══════════════════════════════════════════════════════════════╣
║  Overall Status: 🟢 GREEN                                    ║
╚══════════════════════════════════════════════════════════════╝

✈️  DAG RUNS             → 🟢 GREEN    (6/6 DAGs com sucesso)
📦 RAW LAYER           → 🟢 GREEN    (8/8 tables with files)
🥉 BRONZE + QUARANTINE → 🟡 YELLOW   (1 table above 5% quarantine)
🥈 SILVER DELTA        → 🟢 GREEN    (8/8 tables OK, SCD2 intact)
🥇 GOLD STAR SCHEMA    → 🟢 GREEN    (7/7 tables OK)
🐘 SUPABASE DW         → 🟢 GREEN    (7/7 counts match, 9 views OK)
📊 SPC CONTROL         → 🟢 GREEN

──── STATUS DAS DAGs DA PIPELINE ─────────────────────────────
  🟢 Data Simulator (OLTP)         [success  ]  2026-03-16 08:00  4.2 min
  🟢 OLTP → Raw                    [success  ]  2026-03-16 09:01  3.1 min
  🟢 Raw → Bronze                  [success  ]  2026-03-16 09:06  2.8 min
  🟢 Bronze → Silver               [success  ]  2026-03-16 09:10  5.4 min
  🟢 Silver → Gold                 [success  ]  2026-03-16 10:00  6.1 min
  🟢 Gold → Supabase DW            [success  ]  2026-03-16 10:08  2.4 min

──── QUARANTINE INVENTORY ────────────────────────────────────
  customers      3 rows  [email: 2, customer_id: 1]
  orders         0 rows
  products       0 rows
  reviews        7 rows  [rating: 5, review_id: 2]
  sessions       0 rows
  ...

──── SPC METRICS ─────────────────────────────────────────────
  total_orders:        today=1247.000  mean=1195.000  σ=89.000   z=+0.58  🟢
  total_revenue:       today=45230.000 mean=52100.000 σ=3120.000 z=-2.20  🟡
  quarantine_rate:     today=0.018     mean=0.021     σ=0.004    z=-0.75  🟢
  new_customers:       today=12.000    mean=11.000    σ=2.100    z=+0.47  🟢

Acionamento automático

A DAG é disparada automaticamente pelo TriggerDagRunOperator no final da ecommerce_gold_to_dw (último passo do ciclo diário). Fluxo completo:

ecommerce_data_simulator
ecommerce_oltp_to_raw
ecommerce_raw_to_bronze
ecommerce_bronze_to_silver
ecommerce_silver_to_gold
ecommerce_gold_to_dw
🩺 pipeline_health

SPC — Controle Estatístico de Processo explicado

Por que isso importa? A pipeline pode rodar sem erros técnicos mas os dados podem estar errados. Exemplo: por um bug no simulador, apenas 10 pedidos foram gerados hoje em vez dos 1.200 habituais. Sem o SPC, o pipeline marcaria tudo como verde. Com o SPC, o z-score seria altíssimo e você seria alertado.
MétricaFonteWARN (|z| > 2)FAIL (|z| > 3)Histórico
total_ordersSilver fct_orders COUNTDesvio de 2σ do normalDesvio de 3σ (crítico)14 dias
total_revenueSupabase SUM(net_amount)Queda/alta atípica de receitaAnomalia severa14 dias
quarantine_rateBronze quarantine/totalTaxa acima do padrão históricoDegradação de qualidade14 dias
new_customersSupabase dim_customer hojeAquisição atípicaAnomalia de cadastro14 dias

O histórico cresce automaticamente nas primeiras 14 execuções. Com menos de 3 pontos, o SPC retorna INFO (sem dados suficientes para calcular desvio padrão).

Variáveis Airflow geradas

VariableConteúdo
ecommerce_health_statusString: GREEN, YELLOW ou RED — status da última execução
ecommerce_health_last_reportJSON completo com todos os checks, contagens e resultados SPC do último run
spc_historyJSON com histórico de até 14 valores por métrica para cálculo de z-score

📖 15. Glossário Técnico

TermoDefinição
Medallion ArchitectureOrganização em camadas progressivas de qualidade: Raw → Bronze → Silver → Gold
CDCChange Data Capture — captura apenas dados novos/modificados desde a última extração
WatermarkTimestamp salvo como ponto de referência para extração incremental (Airflow Variable)
SCD Type 2Slowly Changing Dimension — mantém histórico de todas as versões de um registro
Surrogate Key (_sk)Chave artificial UUID gerada pela pipeline, independente da PK natural do OLTP
Delta LakeFormato sobre Parquet com suporte a ACID, MERGE, time travel e schema enforcement
Time TravelConsultar dados como estavam em uma versão ou timestamp anterior (Delta Log)
Great ExpectationsFramework de validação de dados — define e executa "expectativas" sobre DataFrames
dbt-duckdbdbt com DuckDB como engine — executa SQL in-memory sobre Delta Lake no S3
MetricFlowEngine do dbt Semantic Layer — gera SQL a partir de definições de métricas em YAML
Star SchemaModelo dimensional com dimensões ao redor de tabelas de fato
net_amountReceita líquida = quantity × unit_price × (1 - discount_pct / 100)
LTVLifetime Value — receita total gerada por um cliente desde o início
Cohort AnalysisAnálise de retenção de grupos de clientes agrupados por data de aquisição
QuarantineBucket onde vão registros que falharam na validação Pydantic
Late ArrivalEvento com timestamp anterior ao esperado (order_date mais de 1h antes do created_at)
TriggerDagRunOperatorOperador Airflow que dispara outro DAG programaticamente, passando conf
GrainO nível mais granular de um registro em uma tabela de fato (ex: order_item_id em fct_sales)