🏗️ Pipeline Ecommerce — Documentação Técnica
Arquitetura Medalhão completa: do OLTP transacional ao Data Warehouse analítico, passando por camadas progressivas de qualidade, Delta Lake com SCD2, dbt-duckdb e Semantic Layer.
0. Jornada de um Dado — Do Pedido ao KPI
O cenário: cliente faz um pedido
Um cliente chamado João Silva (customer_id=42) faz um pedido de R$299,90 às 14:00 de uma terça-feira. Veja o que acontece com esse dado em cada camada:
📦 Origem — PostgreSQL OLTP (tempo real)
O sistema de e-commerce grava imediatamente na tabela ecommerce.orders:
| order_id | customer_id | status | channel | shipping_cost | discount_amount | created_at | updated_at |
|---|---|---|---|---|---|---|---|
| 1001 | 42 | pending | web | R$ 19,90 | R$ 0,00 | 2026-03-16 14:00:00 | 2026-03-16 14:00:00 |
🗃️ Camada Raw — MinIO (extração a cada hora)
O DAG ecommerce_oltp_to_raw roda às 15:00 e detecta o pedido via watermark (updated_at > '14:00'). Salva o dado exatamente como veio do banco, sem nenhuma alteração:
s3://raw/ecommerce/orders/
year=2026/month=03/day=16/hour=15/
run_2026-03-16T15-00-00.parquet
# → 1 linha, idêntica ao OLTP
🥉 Camada Bronze — Validação Pydantic
O DAG ecommerce_raw_to_bronze lê o Parquet e passa cada linha pelo schema OrderSchema. O pedido é válido → vai para o bucket bronze com 2 colunas extras de rastreabilidade:
| Coluna | Valor | Origem |
|---|---|---|
| order_id | 1001 | OLTP |
| customer_id | 42 | OLTP |
| status | pending | OLTP |
| channel | web | OLTP |
| shipping_cost | 19.90 | OLTP |
| discount_amount | 0.00 | OLTP |
| created_at | 2026-03-16 14:00:00 | OLTP |
| updated_at | 2026-03-16 14:00:00 | OLTP |
| _bronze_loaded_at | 2026-03-16 15:03:22 UTC | pipeline |
| _source_file | s3://raw/ecommerce/orders/…/run_2026-03-16T15-00.parquet | pipeline |
🥈 Camada Silver — Delta Lake (UPSERT)
O DAG ecommerce_bronze_to_silver faz MERGE no Delta Lake. Como o order_id=1001 não existe ainda, é um INSERT:
| Coluna nova | Valor | Explicação |
|---|---|---|
| _silver_loaded_at | 2026-03-16 15:05:00 UTC | Quando entrou na Silver |
| _is_late_arrival | false | order_date e created_at com menos de 1h de diferença |
🔄 Silver — Depois: o pedido é enviado (UPDATE)
3 horas depois, o pedido muda de pending → shipped no OLTP. Na próxima extração às 18:00, a pipeline detecta a mudança (watermark) e faz MERGE:
🧬 Silver — SCD Type 2 para clientes (o mais importante!)
João Silva atualiza o email dele de joao@gmail.com → joao@empresa.com. Na Silver, o histórico é preservado:
| _sk (surrogate) | customer_id | _is_current | _valid_from | _valid_to | |
|---|---|---|---|---|---|
| a1b2c3d4… | 42 | joao@gmail.com | false | 2025-01-10 | 2026-03-16 |
| e5f6a7b8… | 42 | joao@empresa.com | true | 2026-03-16 | 9999-12-31 |
Resultado: você pode responder "qual era o email do João quando ele fez o pedido em 2025?" — basta fazer JOIN pelo _valid_from / _valid_to.
🥇 Camada Gold — Star Schema via dbt-duckdb
O DAG ecommerce_silver_to_gold roda dbt que lê a Silver e produz o modelo star schema. O pedido 1001 aparece na fct_orders com surrogate keys resolvidas:
| order_id | customer_sk | order_date_day | status | shipping_cost |
|---|---|---|---|---|
| 1001 | e5f6a7b8… (atual) | 2026-03-16 | shipped | 19.90 |
fct_orders referencia o customer_sk (UUID da Silver), não o customer_id do OLTP. Isso permite joins históricos: se você quiser saber o endereço que o João tinha na época do pedido, basta usar o _sk correto da época.
🐘 Data Warehouse — Supabase
O DAG ecommerce_gold_to_dw faz TRUNCATE + INSERT das 7 tabelas Gold no schema ecommerce_analytics do Supabase. O pedido agora é acessível por ferramentas BI (Metabase, Grafana, etc.).
📊 KPI Final — Como o pedido vira número
O pedido 1001 contribui para vários KPIs via as views semânticas. Exemplo de como ele entra no KPI revenue:
-- View: v_sales_summary (schema semantic)
SELECT
d.date_day,
SUM(s.net_amount) AS revenue -- Pedido 1001 contribui aqui
FROM ecommerce_analytics.fct_sales s
JOIN ecommerce_analytics.dim_date d ON s.order_date_day = d.date_day
GROUP BY 1;
1. Visão Geral da Arquitetura
Componentes e Portas
| Componente | Tecnologia | Porta | Papel na Pipeline |
|---|---|---|---|
| postgres-oltp | PostgreSQL 16 | 5440 | Fonte transacional (8 tabelas ecommerce) |
| airflow-api | Airflow 3.x Celery | 8080 | Orquestração de todos os DAGs |
| minio | MinIO S3-compatible | 9000/9001 | Data Lake — raw/bronze/silver/gold |
| trino | Trino Delta Lake | 8090 | Query engine SQL com time travel |
| supabase | PostgreSQL + PgBouncer | 5432 | Data Warehouse + Semantic Layer |
2. OLTP — PostgreSQL (Fonte)
Tabela: customers
| Coluna | Tipo | Nulo? | Descrição |
|---|---|---|---|
customer_idPK | SERIAL | NOT NULL | Auto-incrementado — identificador único |
first_name | VARCHAR(100) | NOT NULL | Primeiro nome (Faker pt_BR) |
last_name | VARCHAR(100) | NOT NULL | Sobrenome (Faker pt_BR) |
email | VARCHAR(255) | UNIQUE | Email único — índice UNIQUE constraint |
phoneDQ | VARCHAR(20) | ~30% NULL | Data Problem #2 — telefone ausente em ~30% |
address_streetDQ | VARCHAR(255) | ~20% NULL | Data Problem #2 — endereço ausente em ~20% |
address_city | VARCHAR(100) | ~20% NULL | Cidade |
address_state | CHAR(2) | ~20% NULL | UF — SP, RJ, MG, RS, PR, BA, SC, GO, PE, CE |
address_zip | VARCHAR(10) | ~20% NULL | CEP |
birth_date | DATE | sim | Data de nascimento (18–70 anos) |
is_active | BOOLEAN | NOT NULL | Flag de cliente ativo (default true) |
created_at | TIMESTAMP | NOT NULL | Criação do registro no OLTP |
updated_at | TIMESTAMP | NOT NULL | Última modificação — watermark de extração incremental |
Tabela: orders
order_date retroativa (até 48h no passado vs. created_at). A Silver flageia esses casos com _is_late_arrival = true.
| Coluna | Tipo | Nulo? | Descrição |
|---|---|---|---|
order_idPK | SERIAL | NOT NULL | Identificador único do pedido |
customer_idFK | INT | NOT NULL | → customers.customer_id |
order_dateDQ | TIMESTAMP | NOT NULL | 10% late arrival — até 48h no passado |
status | VARCHAR(30) | NOT NULL | pending → confirmed → shipped → delivered / cancelled / returned |
channel | VARCHAR(30) | sim | web (45%), mobile (40%), marketplace (15%) |
shipping_cost | NUMERIC(10,2) | sim | Frete R$0–35; 30% gratuito |
discount_amount | NUMERIC(10,2) | sim | Desconto R$0–30; 70% sem desconto |
created_at | TIMESTAMP | NOT NULL | Criação no sistema |
updated_at | TIMESTAMP | NOT NULL | Última atualização — watermark |
Simulação de dados por ciclo (horário)
| Tabela | INSERT | UPDATE | Observação |
|---|---|---|---|
customers | ~5 | ~10 endereços | 2% chance de duplicata |
products | ~2 | ~5 preços | ON CONFLICT DO NOTHING por SKU |
inventory | — | ~30 estoques | Venda ou restock aleatório |
orders | ~50 | ~80 status | 10% late arrival |
order_items | ~100–150 | — | 1–3 itens por pedido |
payments | ~50 | ~20 status | 90% approved, 10% refused |
reviews | ~15 | — | 40% sem body |
sessions | ~200 | — | 60% anônimas (customer_id NULL) |
3. Airflow — DAGs e Orquestração
parent_run_id via conf para rastrear o batch correto no MinIO.
Cadeia de Triggers
@hourly 11–21 UTC → ecommerce_oltp_to_raw → ecommerce_raw_to_bronze → ecommerce_bronze_to_silver → ecommerce_silver_to_gold → ecommerce_gold_to_dw
DAG 1: ecommerce_data_simulator
INSERT 5 clientes + UPDATE 10 endereços/telefones via Faker pt_BR
INSERT 2 produtos novos + UPDATE 5 preços/status
UPDATE 30 registros de estoque (venda ou restock)
INSERT 50 pedidos (items + payments) + UPDATE 80 status
UPDATE payments pending → approved (90%) / refused (10%)
INSERT 200 eventos de navegação. 60% anônimos
INSERT 15 avaliações de pedidos delivered. 40% sem body
~1/24 execuções: dispara GDPR delete; resto: skip
Hard delete de 1 customer inativo (simula GDPR erasure)
Dispara ecommerce_oltp_to_raw após ciclo completo
DAG 2: ecommerce_oltp_to_raw
ecommerce_watermark_{table} (default: epoch). Extrai apenas WHERE updated_at > watermark e atualiza ao max(timestamp) encontrado.
| Tabela | Watermark col | Caminho MinIO |
|---|---|---|
customers, products, inventory, orders | updated_at | raw/ecommerce/{table}/year=Y/month=M/day=D/hour=H/{batch}.parquet |
order_items, payments, reviews, sessions | created_at | Tabelas append-only — usa criação como watermark |
DAG 3: ecommerce_raw_to_bronze
Lê cada arquivo Parquet do batch, valida com Pydantic v2 e separa válidos de inválidos:
DAG 4: ecommerce_bronze_to_silver
| Estratégia | Tabelas | Comportamento | Colunas extras |
|---|---|---|---|
| SCD Type 2 | customers, products, inventory |
Mantém histórico de todas as versões. Alterações criam nova linha com _is_current=True |
_valid_from, _valid_to, _is_current, _sk, _silver_loaded_at |
| Upsert (Merge) | orders, payments |
INSERT novos + UPDATE existentes via Delta MERGE. Idempotente | _is_late_arrival (orders), _silver_loaded_at |
| Append + Dedup | order_items, reviews, sessions |
Insere apenas PKs não existentes. Não sobrescreve histórico | _silver_loaded_at |
Como o SCD2 funciona passo a passo
Primeira carga
Todos os registros entram com _is_current=True, _valid_to=NULL, _sk=uuid(). Delta Lake criado em modo overwrite.
Detectar mudanças
Compara colunas monitoradas (nome, email, endereço, preço...) entre novo batch e versão _is_current=True existente. Usa merge do pandas por PK natural.
Fechar versão antiga
Para cada PK alterado: Delta MERGE atualiza _is_current=False e _valid_to=now() na versão ativa anterior.
Inserir nova versão
Linha nova (ou novos PKs) inserida com _is_current=True, _valid_from=now(), _valid_to=NULL, novo _sk=uuid().
Validação Great Expectations
Suite de expectations roda sobre a tabela Silver completa. Falhas geram WARN nos logs Airflow mas não bloqueiam a pipeline (soft failure).
DAG 5: ecommerce_silver_to_gold
Repara tabela fct_orders corrompida/vazia no silver se necessário
Cria bucket gold no MinIO se não existir
dbt run — 7 modelos gold via dbt-duckdb com delta_scan()
dbt test — valida qualidade dos 7 modelos (not_null, unique, accepted_values)
Converte Parquet gerado pelo dbt → Delta Lake (habilita time travel via Trino)
Dispara ecommerce_gold_to_dw após all_success
DAG 6: ecommerce_gold_to_dw
Carrega Gold → Supabase via TRUNCATE + INSERT em bulk (psycopg2 execute_values, page_size=1000):
4. Camada Raw — MinIO Parquet
Bucket: s3://raw/ | Formato: Apache Parquet (engine pyarrow)
Particionamento: ecommerce/{table}/year=Y/month=M/day=D/hour=H/{batch_id}.parquet
Metadados adicionados em cada arquivo:
# Adicionados após extração do PostgreSQL
df["_ingested_at"] = ingested_at # timestamp da ingestão
df["_batch_id"] = batch_id # derivado do run_id do Airflow
df["_source_table"] = "ecommerce.orders" # nome completo da tabela
5. Camada Bronze — Validação Pydantic
Bucket: s3://bronze/ | Quarantine: s3://quarantine/
Schemas Pydantic por tabela
class OrderSchema(_BaseEcommerce):
order_id: int
customer_id: int
order_date: datetime
status: str
channel: Optional[str] = None
shipping_cost: Optional[float] = None
discount_amount: Optional[float] = None
created_at: datetime
updated_at: datetime
@field_validator("status")
@classmethod
def valid_status(cls, v):
valid = {"pending","confirmed","shipped","delivered","cancelled","returned"}
if str(v).lower() not in valid:
raise ValueError(f"invalid order status: {v}")
return v
| Schema | Validações principais |
|---|---|
CustomerSchema | customer_id > 0, email contém "@", phone/address opcionais |
ProductSchema | product_id > 0, price >= 0, cost >= 0 |
InventorySchema | inventory_id > 0, quantities >= 0 |
OrderSchema | order_id > 0, status ∈ {pending,confirmed,...}, shipping >= 0 |
PaymentSchema | payment_id > 0, amount > 0 |
OrderItemSchema | order_item_id > 0, quantity > 0, unit_price > 0 |
ReviewSchema | review_id > 0, rating ∈ [1,5] |
SessionSchema | session_id não vazio, customer_id Optional (60% NULL esperado) |
Exemplo de registro na quarantine:
{
"review_id": 4521,
"rating": 6,
"_validation_errors": "[{\"type\":\"value_error\",\"loc\":[\"rating\"],\"msg\":\"rating must be 1-5, got 6\"}]"
}
6. Camada Silver — Delta Lake
Bucket: s3://silver/ | Formato: Delta Lake (Parquet + _delta_log/)
Tabelas Silver e colunas de metadados
| Tabela Silver | Estratégia | Grain | Colunas extras notáveis |
|---|---|---|---|
dim_customers | SCD2 | customer_id + _sk | _valid_from, _valid_to, _is_current, _sk, _dq_null_phone, _dq_null_address |
dim_products | SCD2 | product_id + _sk | _valid_from, _valid_to, _is_current, _sk |
dim_inventory | SCD2 | inventory_id + _sk | _valid_from, _valid_to, _is_current, _sk |
fct_orders | Upsert | order_id | _is_late_arrival, _silver_loaded_at |
fct_payments | Upsert | payment_id | _silver_loaded_at |
fct_order_items | Append | order_item_id | _silver_loaded_at |
fct_reviews | Append | review_id | _silver_loaded_at |
fct_sessions | Append | session_id | _silver_loaded_at |
Código do MERGE SCD2 (Delta Lake Python):
# Fechar versão antiga (changed_pks)
df_close = df_active[df_active[pk].isin(changed_pks)].copy()
df_close["_is_current"] = False
df_close["_valid_to"] = ts # agora UTC
DeltaTable(silver_path, storage_options=STORAGE_OPTIONS)\
.merge(_to_arrow(df_close), "target._sk = source._sk", "source", "target")\
.when_matched_update_all().execute()
# Inserir nova versão
df_ins = df_new[df_new[pk].isin(to_insert)]
write_deltalake(silver_path, _to_arrow(df_ins), mode="append",
storage_options=STORAGE_OPTIONS)
7. Qualidade de Dados — Great Expectations
| Tabela | Expectations |
|---|---|
customers | customer_id not_null, email not_null, _sk not_null + unique, row_count ≥ 1 |
products | product_id not_null, price ≥ 0, _sk not_null + unique, row_count ≥ 1 |
inventory | inventory_id not_null, quantity_on_hand ≥ 0, row_count ≥ 1 |
orders | order_id not_null, status ∈ {pending,confirmed,shipped,delivered,cancelled,returned}, row_count ≥ 1 |
payments | payment_id not_null, amount > 0, row_count ≥ 1 |
order_items | order_item_id unique, quantity ≥ 1, row_count ≥ 1 |
reviews | review_id unique, rating ∈ [1,5], row_count ≥ 1 |
sessions | session_id unique, row_count ≥ 1 |
Modo ephemeral — sem armazenamento persistente:
ctx = gx.get_context(mode="ephemeral")
ds = ctx.data_sources.add_pandas(f"ds_{table}")
asset = ds.add_dataframe_asset(f"asset_{table}")
bd = asset.add_batch_definition_whole_dataframe("batch")
suite = ctx.suites.add(gx.ExpectationSuite(name=f"{table}_suite"))
# adiciona expectations...
result = bd.get_batch(batch_parameters={"dataframe": df}).validate(suite)
# WARN nos logs se falhar, mas retorna True (não bloqueia)
8. Camada Gold — Star Schema via dbt-duckdb
Projeto dbt: airflow/dbt/ecommerce_dbt/ | Engine: DuckDB + extensão Delta Lake
Cálculo chave: net_amount em fct_sales
-- models/gold/facts/fct_sales.sql
SELECT
oi.order_item_id,
oi.order_id,
oi.product_id,
p._sk AS product_sk,
o.customer_id,
c._sk AS customer_sk,
CAST(o.order_date AS DATE) AS order_date,
oi.quantity,
oi.unit_price,
oi.discount_pct,
ROUND(
oi.quantity * oi.unit_price
* (1 - COALESCE(oi.discount_pct, 0) / 100.0), 2
) AS net_amount
FROM delta_scan('s3://silver/ecommerce/fct_order_items') oi
LEFT JOIN delta_scan('s3://silver/ecommerce/fct_orders') o ON o.order_id = oi.order_id
LEFT JOIN delta_scan('s3://silver/ecommerce/dim_products') p ON p.product_id = oi.product_id AND p._is_current = true
LEFT JOIN delta_scan('s3://silver/ecommerce/dim_customers') c ON c.customer_id= o.customer_id AND c._is_current = true
Colunas de cada tabela Gold
dim_customer
| Coluna | Tipo | Descrição |
|---|---|---|
customer_idPK | BIGINT | PK natural do cliente |
customer_sk | TEXT | Surrogate key SCD2 (= _sk da Silver) |
email | TEXT | Email único |
address_state | TEXT | Estado (UF) |
is_active | BOOLEAN | Cliente ativo |
_dq_null_phoneDQ | BOOLEAN | Flag: telefone ausente (~30%) |
_dq_null_addressDQ | BOOLEAN | Flag: endereço ausente (~20%) |
fct_sales (grain mais fino — receita por item)
| Coluna | Tipo | Descrição |
|---|---|---|
order_item_idPK | BIGINT | PK do item |
order_idFK | BIGINT | → fct_orders |
product_idFK | BIGINT | → dim_product |
customer_idFK | BIGINT | → dim_customer |
order_dateFK | DATE | → dim_date |
quantity | INT | Unidades vendidas |
unit_price | NUMERIC(12,2) | Preço unitário no momento da compra |
discount_pct | NUMERIC(5,2) | Desconto em % (NULL tratado como 0) |
net_amountCALC | NUMERIC(14,2) | qty × price × (1 - disc/100) — receita líquida |
dim_date (spine completa)
| Coluna | Tipo | Descrição |
|---|---|---|
date_dayPK | DATE | Dia (2020-01-01 → 2030-12-31) — 4.018 linhas |
year | INT | Ano |
month | INT | Mês (1–12) |
day | INT | Dia do mês |
quarter | INT | Trimestre (1–4) |
week_of_year | INT | Semana do ano |
month_name | TEXT | January, February... |
day_name | TEXT | Monday, Tuesday... |
is_weekend | BOOLEAN | Sábado ou Domingo |
9. Data Warehouse — Supabase
Host: supabase-pooler:5432 | Schema analytics: ecommerce_analytics | Schema semântico: semantic
psycopg2.extras.execute_values com page_size=1000.
Volume estimado de dados
| Tabela | Grain | Volume típico | Índices |
|---|---|---|---|
dim_customer | customer_id | ~1.000 | PK |
dim_product | product_id | ~500 | PK |
dim_date | date_day | 4.018 | PK |
fct_orders | order_id | ~10.000 | customer_id, order_date, status |
fct_sales | order_item_id | ~25.000 | product_id, customer_id, order_date |
fct_payments | payment_id | ~10.000 | payment_date, payment_method |
fct_reviews | review_id | ~8.000 | product_id, rating |
10. Semantic Layer — Views Analíticas
Schema: semantic no Supabase | 9 views prontas para Metabase, Grafana, Superset
Exemplo de cohort analysis SQL:
WITH first_order AS (
SELECT customer_id,
DATE_TRUNC('month', MIN(order_date))::DATE AS cohort_month
FROM ecommerce_analytics.fct_orders
GROUP BY customer_id
)
SELECT
f.cohort_month,
DATE_TRUNC('month', o.order_date)::DATE AS order_month,
EXTRACT(MONTH FROM AGE(o.order_date, f.cohort_month))::INT AS months_since_first,
COUNT(DISTINCT o.customer_id) AS active_customers,
SUM(s.net_amount) AS revenue
FROM first_order f
JOIN ecommerce_analytics.fct_orders o ON o.customer_id = f.customer_id
JOIN ecommerce_analytics.fct_sales s ON s.order_id = o.order_id
GROUP BY 1, 2, 3
ORDER BY 1, 3;
11. Semantic Models dbt — MetricFlow
dbt sl query --metrics revenue sem escrever SQL manualmente.
| Semantic Model | Grain | Entidades | Medidas disponíveis |
|---|---|---|---|
sales | order_item_id | order_item(primary), order, product, customer | revenue, units_sold, order_items_count, avg_unit_price, avg_discount_pct |
orders | order_id | order(primary), customer | order_count, shipping_cost_total, discount_total, late_arrival_count |
payments | payment_id | payment(primary), order, customer | payment_amount, payment_count, avg_installments |
reviews | review_id | review(primary), product, customer | review_count, avg_rating |
customers | dimension | customer(primary) | — |
products | dimension | product(primary) | — |
# semantic_models.yml — trecho do modelo sales
- name: sales
description: "Grain: order_item_id. Fonte central de receita."
model: ref('fct_sales')
defaults:
agg_time_dimension: order_date
entities:
- name: order_item
type: primary
expr: order_item_id
- name: product
type: foreign
expr: product_id
measures:
- name: revenue
description: Receita líquida após descontos
agg: sum
expr: net_amount
- name: units_sold
agg: sum
expr: quantity
12. KPIs e Métricas — MetricFlow
Receita
WHERE status != 'cancelled'
sem filtro de status
Pedidos
WHERE status = 'delivered'
Satisfação & Pagamentos
13. Trino — Query Engine e Time Travel
Porta: 8090 | Catálogos: gold e silver | Connector: delta_lake | Metastore: file-based
Configuração do catálogo gold:
connector.name=delta_lake
hive.metastore=file
hive.metastore.catalog.dir=s3://gold/
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true
hive.s3.aws-access-key=airflow
hive.s3.ssl.enabled=false
delta.enable-non-concurrent-writes=true
Exemplos de queries com time travel:
-- Versão atual
SELECT * FROM gold.ecommerce.fct_orders LIMIT 10;
-- Time travel por versão do Delta Log
SELECT * FROM gold.ecommerce.fct_orders FOR VERSION AS OF 2;
-- Time travel por timestamp
SELECT * FROM gold.ecommerce.fct_orders
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-14 00:00:00';
-- Comparar receita: versão anterior vs. atual
SELECT 'v1' AS ver, SUM(net_amount) FROM gold.ecommerce.fct_sales FOR VERSION AS OF 1
UNION ALL
SELECT 'atual', SUM(net_amount) FROM gold.ecommerce.fct_sales;
-- Clientes ativos vs. histórico completo SCD2
SELECT 'ativos', COUNT(*) FROM silver.ecommerce.dim_customers WHERE _is_current = true
UNION ALL
SELECT 'histórico', COUNT(*) FROM silver.ecommerce.dim_customers;
-- Receita por canal — top 3
SELECT channel, SUM(s.net_amount) AS revenue
FROM gold.ecommerce.fct_orders o
JOIN gold.ecommerce.fct_sales s ON s.order_id = o.order_id
GROUP BY channel
ORDER BY revenue DESC
LIMIT 3;
14. Problemas de Dados Intencionais
A pipeline foi projetada para exercitar cenários reais de data quality:
| # | Problema | Onde ocorre | % / Freq. | Como a pipeline trata |
|---|---|---|---|---|
| 1 | Duplicatas | customers (INSERT) | ~2% | Bronze: ambas passam (emails diferentes). Silver: drop_duplicates(pk, keep=last) |
| 2 | phone = NULL | customers | ~30% | Pydantic: Optional[str] — válido. Silver: _dq_null_phone=True |
| address = NULL | customers | ~20% | Pydantic: campos Optional. Silver: _dq_null_address=True |
|
| review.body = NULL | reviews | ~40% | Pydantic: Optional[str]. Propagado como NULL até o DW |
|
| customer_id = NULL | sessions | ~60% | Pydantic: Optional[int]. Sessões anônimas preservadas |
|
| 3 | Late Arrival | orders.order_date | ~10% | Silver: _is_late_arrival = order_date < created_at - 1h. MetricFlow: late_arrival_count |
| 4 | GDPR Delete | customers (hard delete) | ~1/24 exec | Hard delete no OLTP. Sessions e reviews deletadas em cascata. Próxima ingestão não verá mais esses dados |
16. Padrão Watermark — Extração Incremental
Como funciona passo a passo
Primeira execução (carga inicial)
Watermark = 1970-01-01 00:00:00 (época Unix zero). A query extrai tudo: WHERE updated_at > '1970-01-01'. Ao final, salva o timestamp atual como novo watermark.
Execuções subsequentes (incremental)
Watermark = último valor salvo (ex: 2026-03-16 14:00:00). Query: WHERE updated_at > '2026-03-16 14:00:00'. Busca apenas o delta — pode ser 0 linhas ou poucas centenas.
Persistência no Airflow
O watermark é salvo como Airflow Variable: ecommerce_watermark_orders, ecommerce_watermark_customers, etc. Uma variável por tabela para controle independente.
Atualização do watermark
Só atualizado após sucesso da gravação no MinIO. Se o Airflow cair no meio, na próxima execução reprocessa o mesmo intervalo (idempotência). Nada se perde.
Código real da pipeline
from airflow.models import Variable
# Lê watermark atual (padrão: época zero se nunca rodou)
wm_key = f"ecommerce_watermark_{table}"
last_watermark = Variable.get(wm_key, default_var="1970-01-01 00:00:00")
# Query incremental no PostgreSQL OLTP
query = f"""
SELECT * FROM ecommerce.{table}
WHERE updated_at > '{last_watermark}'
ORDER BY updated_at
"""
df = pd.read_sql(query, conn)
if not df.empty:
# Salva Parquet no MinIO
save_to_raw(df, table)
# Atualiza watermark para o maior updated_at do lote
new_watermark = df["updated_at"].max().isoformat()
Variable.set(wm_key, new_watermark)
Exceção: tabela sessions (usa created_at)
Sessões são imutáveis — uma vez criadas, nunca são atualizadas. Por isso usa created_at como watermark em vez de updated_at. Os pedidos (orders) têm ambos: created_at para novos pedidos e updated_at para mudanças de status.
Risco: late arrivals (chegadas tardias)
_is_late_arrival na Silver marca esses registros. A extração usa uma janela de segurança de 1h: se order_date for mais de 1h anterior ao created_at, é late arrival.
17. Fluxo de Quarentena — O que acontece com dados inválidos
Tipos de erro capturados
Campo email inválido no CustomerSchema. Ex: "joaogmail.com". Validador Pydantic rejeita.
ReviewSchema rejeita rating=0 ou rating=6. Pode vir de bug no frontend ou dados migrados.
OrderSchema aceita só: pending, confirmed, shipped, delivered, cancelled, returned. Qualquer outro vai para quarentena.
ProductSchema rejeita price ou cost < 0. Pode acontecer com créditos/estornos mal mapeados.
Todos os schemas rejeitam IDs ≤ 0. Protege contra dados de teste ou registros deletados parcialmente.
Pydantic tenta coerção automática (string "123" → int 123). Se falhar, linha vai para quarentena.
Onde ficam os dados em quarentena?
s3://quarantine/ecommerce/{table}/
year=2026/month=03/day=16/
quarantine_2026-03-16T15-03-22.parquet
Estrutura do registro na quarentena
| Coluna | Tipo | Conteúdo |
|---|---|---|
| … todas as colunas originais da linha … | ||
| _validation_errors | JSON | Lista de erros Pydantic: [{"field":"email","msg":"invalid email: joaogmail.com"}] |
| _quarantined_at | TIMESTAMP | Quando o erro foi detectado |
| _source_file | TEXT | Arquivo Parquet de origem no bucket raw |
Como reprocessar dados da quarentena
# 1. Ler registros com erro
import pandas as pd
df_q = pd.read_parquet("s3://quarantine/ecommerce/customers/year=2026/...")
# 2. Ver erros por tipo
import json
df_q["errors_parsed"] = df_q["_validation_errors"].apply(json.loads)
df_q["error_field"] = df_q["errors_parsed"].apply(lambda e: e[0]["field"])
print(df_q.groupby("error_field").size())
# 3. Corrigir e reenviar para bronze
df_fixed = df_q.copy()
df_fixed["email"] = df_fixed["email"].str.strip().str.lower()
# ... correções ...
df_fixed.drop(columns=["_validation_errors","_quarantined_at","_source_file"]).to_parquet(
"s3://bronze/ecommerce/customers/reprocessed_2026-03-16.parquet"
)
18. Como Usar o Data Warehouse — Guia Prático
Queries prontas — Supabase (schema ecommerce_analytics)
Receita dos últimos 30 dias
SELECT
d.date_day,
ROUND(SUM(s.net_amount)::numeric, 2) AS receita_diaria,
COUNT(DISTINCT s.order_id) AS pedidos
FROM ecommerce_analytics.fct_sales s
JOIN ecommerce_analytics.dim_date d ON s.order_date_day = d.date_day
WHERE d.date_day >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1 DESC;
Top 10 produtos por receita
SELECT
p.name,
p.category,
ROUND(SUM(s.net_amount)::numeric, 2) AS receita_total,
SUM(s.quantity) AS unidades_vendidas
FROM ecommerce_analytics.fct_sales s
JOIN ecommerce_analytics.dim_product p ON s.product_sk = p.product_sk
GROUP BY 1, 2
ORDER BY 3 DESC
LIMIT 10;
Ticket médio e NPS por canal
SELECT
o.channel,
COUNT(DISTINCT o.order_id) AS total_pedidos,
ROUND(AVG(o.shipping_cost)::numeric, 2) AS frete_medio,
ROUND(AVG(r.rating)::numeric, 2) AS rating_medio
FROM ecommerce_analytics.fct_orders o
LEFT JOIN ecommerce_analytics.fct_reviews r
ON r.order_id = o.order_id
WHERE o.channel IS NOT NULL
GROUP BY 1
ORDER BY 2 DESC;
Views semânticas prontas (sem precisar escrever joins)
-- Resumo de vendas já com joins feitos
SELECT * FROM semantic.v_sales_summary LIMIT 100;
-- Análise RFM de clientes
SELECT * FROM semantic.v_customer_rfm
WHERE recency_days < 30 AND frequency > 5
ORDER BY monetary DESC;
-- Funil de conversão
SELECT * FROM semantic.v_order_funnel;
-- Cohort de retenção
SELECT * FROM semantic.v_cohort_retention
WHERE cohort_month >= '2026-01-01';
Queries Trino — Delta Lake com Time Travel
-- Conectar ao Trino: jdbc:trino://localhost:8080/gold
-- Ver versões disponíveis de uma tabela
DESCRIBE HISTORY gold.ecommerce.fct_orders;
-- Receita ontem vs. semana passada (time travel)
SELECT 'semana_passada' AS periodo, ROUND(SUM(net_amount),2) AS receita
FROM gold.ecommerce.fct_sales
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-09 23:59:59'
UNION ALL
SELECT 'ontem', ROUND(SUM(net_amount),2)
FROM gold.ecommerce.fct_sales
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-15 23:59:59';
-- Auditoria: ver estado de um cliente em data específica
SELECT customer_id, email, address_city
FROM silver.ecommerce.dim_customers
FOR TIMESTAMP AS OF TIMESTAMP '2025-06-01 00:00:00'
WHERE customer_id = 42;
Entendendo o Star Schema — Como fazer JOINs
Checklist: o que está disponível onde
| Pergunta | Onde buscar | Tabela/View |
|---|---|---|
| Qual a receita do mês? | Supabase / Trino | fct_sales + dim_date |
| Qual produto vende mais? | Supabase | semantic.v_product_performance |
| Taxa de conversão por canal? | Supabase | semantic.v_order_funnel |
| Clientes mais valiosos (LTV)? | Supabase | semantic.v_customer_rfm |
| Evolução de estoque? | Supabase | semantic.v_inventory_health |
| Como os dados estavam 1 semana atrás? | Trino | FOR TIMESTAMP AS OF … |
| Registros em quarentena (erros)? | MinIO / DuckDB | s3://quarantine/ecommerce/… |
| Histórico de mudanças de cliente? | Trino Silver | silver.ecommerce.dim_customers (_is_current=false) |
19. DAG de Monitoramento — Pipeline Health & SPC
Fluxo de execução
O que cada task verifica
Consulta o metadata DB do Airflow e lista o último estado de cada uma das 6 DAGs da pipeline: success, failed, running. Se qualquer DAG falhou → RED imediato.
Lista arquivos Parquet no bucket s3://raw/ para as 8 tabelas. Verifica se existem arquivos nas últimas 48h. WARN se faltou alguma tabela.
Conta registros no bronze e na quarentena por tabela. Lê os arquivos Parquet da quarentena e extrai o campo _validation_errors para listar erros por campo. WARN se taxa > 5%, FAIL se > 15%.
Abre cada uma das 8 Delta tables da Silver. Verifica contagem > 0, freshness (<48h) e integridade SCD2: nenhum customer_id ou product_id pode ter mais de 1 linha com _is_current=True.
Verifica as 7 tabelas Gold. Checa que fct_sales não tem surrogate keys nulos (null SK rate < 1%). Compara contagem com Silver para consistência.
Conecta ao Supabase e compara as contagens com o Gold (drift < 5%). Testa as 9 views semânticas. Verifica max(updated_at) em fct_orders para garantir frescor.
Controle Estatístico de Processo: compara 4 métricas do dia contra a média e desvio padrão dos últimos 14 dias. Alerta se z-score > 2 (WARN) ou > 3 (FAIL). Histórico salvo em Airflow Variable.
Agrega todos os resultados e imprime o relatório completo no log. Salva o JSON do relatório e o status geral (GREEN/YELLOW/RED) como Airflow Variables.
Como o relatório aparece no log
╔══════════════════════════════════════════════════════════════╗
║ PIPELINE HEALTH REPORT — ecommerce — 2026-03-16 ║
╠══════════════════════════════════════════════════════════════╣
║ Overall Status: 🟢 GREEN ║
╚══════════════════════════════════════════════════════════════╝
✈️ DAG RUNS → 🟢 GREEN (6/6 DAGs com sucesso)
📦 RAW LAYER → 🟢 GREEN (8/8 tables with files)
🥉 BRONZE + QUARANTINE → 🟡 YELLOW (1 table above 5% quarantine)
🥈 SILVER DELTA → 🟢 GREEN (8/8 tables OK, SCD2 intact)
🥇 GOLD STAR SCHEMA → 🟢 GREEN (7/7 tables OK)
🐘 SUPABASE DW → 🟢 GREEN (7/7 counts match, 9 views OK)
📊 SPC CONTROL → 🟢 GREEN
──── STATUS DAS DAGs DA PIPELINE ─────────────────────────────
🟢 Data Simulator (OLTP) [success ] 2026-03-16 08:00 4.2 min
🟢 OLTP → Raw [success ] 2026-03-16 09:01 3.1 min
🟢 Raw → Bronze [success ] 2026-03-16 09:06 2.8 min
🟢 Bronze → Silver [success ] 2026-03-16 09:10 5.4 min
🟢 Silver → Gold [success ] 2026-03-16 10:00 6.1 min
🟢 Gold → Supabase DW [success ] 2026-03-16 10:08 2.4 min
──── QUARANTINE INVENTORY ────────────────────────────────────
customers 3 rows [email: 2, customer_id: 1]
orders 0 rows
products 0 rows
reviews 7 rows [rating: 5, review_id: 2]
sessions 0 rows
...
──── SPC METRICS ─────────────────────────────────────────────
total_orders: today=1247.000 mean=1195.000 σ=89.000 z=+0.58 🟢
total_revenue: today=45230.000 mean=52100.000 σ=3120.000 z=-2.20 🟡
quarantine_rate: today=0.018 mean=0.021 σ=0.004 z=-0.75 🟢
new_customers: today=12.000 mean=11.000 σ=2.100 z=+0.47 🟢
Acionamento automático
A DAG é disparada automaticamente pelo TriggerDagRunOperator no final da ecommerce_gold_to_dw (último passo do ciclo diário). Fluxo completo:
SPC — Controle Estatístico de Processo explicado
| Métrica | Fonte | WARN (|z| > 2) | FAIL (|z| > 3) | Histórico |
|---|---|---|---|---|
| total_orders | Silver fct_orders COUNT | Desvio de 2σ do normal | Desvio de 3σ (crítico) | 14 dias |
| total_revenue | Supabase SUM(net_amount) | Queda/alta atípica de receita | Anomalia severa | 14 dias |
| quarantine_rate | Bronze quarantine/total | Taxa acima do padrão histórico | Degradação de qualidade | 14 dias |
| new_customers | Supabase dim_customer hoje | Aquisição atípica | Anomalia de cadastro | 14 dias |
O histórico cresce automaticamente nas primeiras 14 execuções. Com menos de 3 pontos, o SPC retorna INFO (sem dados suficientes para calcular desvio padrão).
Variáveis Airflow geradas
| Variable | Conteúdo |
|---|---|
| ecommerce_health_status | String: GREEN, YELLOW ou RED — status da última execução |
| ecommerce_health_last_report | JSON completo com todos os checks, contagens e resultados SPC do último run |
| spc_history | JSON com histórico de até 14 valores por métrica para cálculo de z-score |
15. Glossário Técnico
| Termo | Definição |
|---|---|
Medallion Architecture | Organização em camadas progressivas de qualidade: Raw → Bronze → Silver → Gold |
CDC | Change Data Capture — captura apenas dados novos/modificados desde a última extração |
Watermark | Timestamp salvo como ponto de referência para extração incremental (Airflow Variable) |
SCD Type 2 | Slowly Changing Dimension — mantém histórico de todas as versões de um registro |
Surrogate Key (_sk) | Chave artificial UUID gerada pela pipeline, independente da PK natural do OLTP |
Delta Lake | Formato sobre Parquet com suporte a ACID, MERGE, time travel e schema enforcement |
Time Travel | Consultar dados como estavam em uma versão ou timestamp anterior (Delta Log) |
Great Expectations | Framework de validação de dados — define e executa "expectativas" sobre DataFrames |
dbt-duckdb | dbt com DuckDB como engine — executa SQL in-memory sobre Delta Lake no S3 |
MetricFlow | Engine do dbt Semantic Layer — gera SQL a partir de definições de métricas em YAML |
Star Schema | Modelo dimensional com dimensões ao redor de tabelas de fato |
net_amount | Receita líquida = quantity × unit_price × (1 - discount_pct / 100) |
LTV | Lifetime Value — receita total gerada por um cliente desde o início |
Cohort Analysis | Análise de retenção de grupos de clientes agrupados por data de aquisição |
Quarantine | Bucket onde vão registros que falharam na validação Pydantic |
Late Arrival | Evento com timestamp anterior ao esperado (order_date mais de 1h antes do created_at) |
TriggerDagRunOperator | Operador Airflow que dispara outro DAG programaticamente, passando conf |
Grain | O nível mais granular de um registro em uma tabela de fato (ex: order_item_id em fct_sales) |