Esta sección documenta los aspectos operativos y técnicos del proyecto: instalación, despliegue, estructura del código y comandos de operación diaria. Si los demás artículos responden al “qué” y al “por qué”, esta sección responde al “cómo”.
1. Manual de instalación y despliegue
1.1 Configuraciones importantes
Antes de empezar, es importante tener claras las decisiones de arquitectura:
Stack: Python 3.13+, gestor de dependencias uv, FastAPI para la API web, DuckDB como motor analítico, Plotly para visualización en frontend, scikit-learn para modelado, MongoDB como fuente de datos transaccionales.
Persistencia: Parquet para todos los datos derivados (raw histórico + outputs analíticos). No usamos base de datos relacional para la capa analítica.
Sistema operativo objetivo: AlmaLinux 9 (rama RHEL/CentOS). El proyecto también corre en Windows/Ubuntu en desarrollo local.
Proceso supervisado: systemd para el dashboard, cron para el pipeline diario.
Modo SELinux recomendado: permissive (registra violaciones sin bloquear, suficiente para intranet).
1.2 Requisitos del sistema
Componente
Versión mínima
Notas
Sistema operativo
AlmaLinux 9 / RHEL 9 / Ubuntu 22.04+
Linux estable
Python
3.13
Definido en pyproject.toml
uv
0.4+
Gestor de dependencias y entornos
Git
Cualquier versión reciente
Para clonar el repo
Acceso a MongoDB
Red interna o cadena de conexión
Fuente de datos transaccionales
Memoria RAM
8 GB mínimo
El pipeline carga ~1.8M items en memoria
Disco
5 GB libres
Para parquets, logs y dependencias
1.3 Dependencias principales del proyecto
Lista declarada en pyproject.toml:
Librería
Uso
pandas, numpy, pyarrow
Manipulación de datos y formato Parquet
scikit-learn
Pipeline de transformación + K-Means
mlxtend
FP-Growth y association rules para MBA
fastapi, uvicorn, jinja2
Servidor web del dashboard
duckdb
Motor SQL embebido para queries del dashboard
pymongo
Cliente MongoDB para ingesta
python-dotenv
Manejo de credenciales fuera del código
pytest
Suite de tests automáticos
1.4 Instalación
1.4.1 Clonar el repositorio
cd /home/<usuario>git clone <url-del-repo> ct-analyticscd ct-analytics
Si el repo es privado, autentica con gh auth login o usa SSH key configurada.
1.4.2 Instalar dependencias con uv
uv sync
Este comando lee pyproject.toml, crea un entorno virtual en .venv/ e instala todas las dependencias en versiones bloqueadas por uv.lock. No es necesario activar el venv manualmente: todos los comandos posteriores usan uv run.
Advertencia
Si por accidente clonaste el repo con sudo, los archivos quedan como root:root y uv sync va a fallar con “Permission denied”. Solución:
sudo chown -R<usuario>:<usuario> /home/<usuario>/ct-analyticssudo rm -rf .venv # solo si el venv ya existía con permisos rotosuv sync
1.4.3 Configurar variables de entorno
Crea un archivo .env en la raíz del proyecto. Nunca subir este archivo a git (debe estar en .gitignore desde el primer commit):
# Conexión a MongoDBMONGO_URI=mongodb://usuario:password@host:puerto/dbMONGO_DB=nombre_databaseMONGO_COLLECTION=nombre_collection# Opcional: zona horaria local para agregados temporalesTIMEZONE_LOCAL=America/Mexico_City
1.4.4 Subir artefactos de modelo y datos históricos
Si el servidor no va a hacer un backfill completo desde MongoDB, sube los siguientes archivos con FileZilla, SCP o rsync:
Por qué se sube el raw histórico: el ingest incremental usa watermarks. Si arrancamos sin histórico, la primera corrida intenta extraer ~780K pedidos desde MongoDB, lo cual toma 5-10 minutos y carga la BD. Subir el parquet evita ese arranque pesado.
1.4.5 Primera corrida del pipeline
mkdir-p logsuv run python -m pulse.pipeline weekly --log-file logs/primera_corrida.log
Resultado esperado: en 45-90 segundos verás un log similar a:
✅ Ingesta OK: 784,088 → 784,088 pedidos (cambio +0.0%)
✅ Segmentación OK: 18,638 clientes, sin nulos
✅ MBA guardado: 3,860 reglas totales, 1,516 exclusivas, 141 accionables
✅ Cross-check temporalidad-RFM OK (diferencia 0.00%)
✅ OK Pipeline · modo=weekly · duración=42.6s
Verifica que se generaron los 7 outputs analíticos:
ls datos/processed/*.parquet |wc-l# debe dar 10 (3 base + 7 generados)
En modo permissive, SELinux registra todas las violaciones en /var/log/audit/audit.log sin bloquearlas. Si en el futuro migras a enforcing, esos logs te dicen exactamente qué contextos configurar.
1.4.7 Verificación final del despliegue
Desde una máquina cliente (no el servidor mismo):
http://<IP-del-servidor>:<PUERTO>
El navegador debe redirigir a /dashboard/overview y mostrar los KPIs cargados.
Test de supervivencia a reboot (la prueba real):
sudo reboot
Espera 1-2 minutos y vuelve a entrar a la URL. Si carga sin que hayas hecho nada en el servidor, el deployment es robusto.
1.4.8 Configurar cron diario del pipeline
Como tu usuario (no como root):
crontab-e
Agrega esta línea (un solo renglón):
0 3 * * * cd /home/<usuario>/ct-analytics && /home/<usuario>/.local/bin/uv run python -m pulse.pipeline daily --log-file logs/cron_$(date +\%Y\%m\%d).log 2>&1
Verifica:
crontab-l
El pipeline correrá todos los días a las 3am hora del servidor.
1.5 Comandos útiles de operación diaria
# Estado del dashboardsudo systemctl status pulse-dashboard# Reiniciar el dashboard (por ejemplo después de un git pull)sudo systemctl restart pulse-dashboard# Ver logs en vivosudo journalctl -u pulse-dashboard -f# Correr pipeline manualmente (sin esperar al cron)cd /home/<usuario>/ct-analyticsuv run python -m pulse.pipeline daily# Forzar recálculo completo (incluyendo MBA, ~45-60s)uv run python -m pulse.pipeline weekly# Saltar la ingesta y solo recalcular agregadosuv run python -m pulse.pipeline weekly --skip-ingest# Actualizar código desde gitcd /home/<usuario>/ct-analyticsgit pullsudo systemctl restart pulse-dashboard# Ver corridas recientes del cronls-lt logs/cron_*.log |head-5
extraction.py — Cliente de MongoDB. Lee documentos crudos del orden transaccional y devuelve un iterable.
transform.py — Función build_both_dfs() que en una sola pasada construye df_orders y df_items. Aplica decisiones críticas:
Excluye CARGO100 (cargo financiero por tarjeta) del conteo de productos.
Excluye items con clave is None para preservar la invariante len(items) == num_productos.
incremental.py — extract_incremental() lee el watermark (timestamp del último pedido procesado) y solo extrae los pedidos nuevos. Si no existe watermark, hace backfill completo.
ingest.py — run_ingest() orquesta el flujo: lee watermark → extrae nuevo → deduplica → concatena con histórico → persiste → actualiza watermark. Devuelve un objeto IngestResult con métricas.
2.2.2 Analytics (src/pulse/analytics/)
familia.py — Deriva la “familia” de cada producto desde su clave (clave sin dígitos finales). Por ejemplo, ESDKPK4710 → familia ESDKPK. Esta granularidad es clave para MBA.
Recency: días desde la última compra hasta fecha_ref.
Frequency: número de pedidos en la ventana.
Monetary: suma del pago_total en la ventana.
Dias_entre_compras: mediana de la diferencia entre compras consecutivas (cadencia personalizada).
Es_single_buyer: flag para clientes con una sola compra (cadencia imputada al p95 de la base).
segmentacion.py — segmentar_clientes(df_rfm, version='v1'). Carga el SegmentadorClientes congelado y aplica predict(). Devuelve el dataframe con cluster_id y segmento_cluster.
mba.py — calcular_mba(df_items, df_segmentos, df_orders). Para cada cluster:
Construye canastas (pedidos multi-familia).
Aplica FP-Growth con umbrales min_support_count=30, min_support_pct=0.001.
Genera reglas con min_confidence=0.20 y min_lift=3.0.
Deduplica reglas simétricas.
Para las top 30 por lift, calcula ticket_medio y revenue_total.
Devuelve tres dataframes: por_segmento, exclusivas, accionables.
temporalidad.py — Agrega pedidos por hora-día-mes después de convertir a hora local CDMX. Genera tres outputs: temp_hora_dia, temp_mensual, temp_bundles (este último cruza reglas MBA top con su evolución mensual).
2.2.3 Modeling (src/pulse/modeling/)
segmentador.py — Clase SegmentadorClientes que encapsula:
Métodos públicos: fit(), predict(), save(), load(). Persiste el pipeline serializado con joblib y los metadatos (versión, features, nombres de clusters) en JSON.
2.2.4 Pipeline (src/pulse/pipeline/)
runner.py — run(modo). Tres modos:
daily: ingest + segmentación + temporalidad. MBA solo si no existe.
weekly: daily + recálculo completo de MBA.
monthly: weekly + validación de drift contra el snapshot.
Cada paso emite logs con métricas y dispara validaciones de calidad (validacion.py).
validacion.py — Quality checks que disparan al final de cada paso:
Ingest: el conteo de pedidos no debe disminuir más de 1%.
Segmentación: cero clientes con cluster_id nulo; ningún segmento debe absorber más del 50%.
Temporalidad: cross-check contra RFM (suma de pedidos por segmento debe coincidir al 0%).
2.2.5 Dashboard (src/pulse/dashboard/)
db.py — Una sola conexión DuckDB compartida por proceso. Cada parquet en datos/processed/ se registra como una vista SQL con nombre estable (segmentos, orders, mba_accionables, etc.).
queries.py — Funciones puras que devuelven list[dict]. Toda interacción SQL pasa por fetch_dicts(sql, params) para parametrizar (evita inyección).
routers/api.py — Endpoints JSON consumidos por el JS del cliente para filtros y drill-downs.
routers/pages.py — Endpoints HTML que renderizan templates Jinja2 con un payload initial_data embebido (evita un fetch extra en la primera pintura).
2.3 Decisiones técnicas importantes
Algunas decisiones específicas del proyecto que vale la pena conocer si se va a mantener o extender el código:
CARGO100 se filtra al entrar al runner, no en cada módulo. Una sola fuente de verdad.
Timezone: los pedidos en MongoDB están en UTC. La conversión a CDMX se hace en temporalidad.py antes de extraer hora/día/mes.
El log-transform vive dentro del Pipeline de sklearn, no fuera. Importante para que predict() funcione idénticamente al fit().
Ratio de alertas: recency / GREATEST(dias_entre_compras, 1) para evitar división por cero en clientes B2B que compran varias veces al día.
Cuadrantes del scatter de bundles: división por mediana dinámica (replica el notebook de referencia). No usamos umbrales fijos.
3. Diagrama de arquitectura
3.1 Vista de alto nivel
flowchart LR A[MongoDB<br/>Transacciones] -->|ingest incremental| B[ETL Pipeline] B --> C[orders_historicos.parquet<br/>items_historicos.parquet] C --> D[Segmentación<br/>K-Means k=5] D --> E[MBA<br/>FP-Growth por segmento] D --> F[Temporalidad<br/>hora-día-mes] E --> G[7 Parquets analíticos] F --> G D --> G G --> H[Dashboard FastAPI<br/>+ DuckDB + Plotly] H --> I[Marketing] M[Modelo v1 congelado<br/>pipeline.pkl] -.->|predict| D style A fill:#e3f2fd style M fill:#fff3e0 style H fill:#e8f5e9 style I fill:#f3e5f5
flowchart LR
A[MongoDB<br/>Transacciones] -->|ingest incremental| B[ETL Pipeline]
B --> C[orders_historicos.parquet<br/>items_historicos.parquet]
C --> D[Segmentación<br/>K-Means k=5]
D --> E[MBA<br/>FP-Growth por segmento]
D --> F[Temporalidad<br/>hora-día-mes]
E --> G[7 Parquets analíticos]
F --> G
D --> G
G --> H[Dashboard FastAPI<br/>+ DuckDB + Plotly]
H --> I[Marketing]
M[Modelo v1 congelado<br/>pipeline.pkl] -.->|predict| D
style A fill:#e3f2fd
style M fill:#fff3e0
style H fill:#e8f5e9
style I fill:#f3e5f5
3.2 Componentes clave
Componente
Responsabilidad
Stack
MongoDB
Fuente única de pedidos transaccionales
Externo
ETL Pipeline
Extraer, transformar, validar, persistir
Python + pandas
Segmentación
Asignar cluster a cada cliente
scikit-learn (KMeans)
MBA
Generar reglas de asociación por segmento
mlxtend (FP-Growth)
Temporalidad
Agregar por hora-día-mes
pandas
Dashboard
Servir 7 vistas interactivas
FastAPI + DuckDB + Plotly
systemd
Supervisar el proceso del dashboard
OS
cron
Ejecutar el pipeline diariamente
OS
3.3 Flujo de interacción principal
3:00 AM (cron): el pipeline daily corre. Lee watermark, extrae pedidos nuevos de MongoDB, actualiza histórico, recalcula segmentación + temporalidad, regenera los 7 parquets.
Cualquier hora: un usuario abre el dashboard en su navegador.
FastAPI sirve el HTML inicial con initial_data embebido en <script>.
JavaScript en el cliente llama a endpoints /api/* cuando el usuario cambia filtros.
DuckDB ejecuta queries SQL sobre los parquets (sin cargar todo en memoria, gracias a lectura columnar).
Plotly renderiza las visualizaciones inline.
3.4 Flujo de datos
sequenceDiagram participant MongoDB participant Pipeline participant Parquet participant DuckDB participant FastAPI participant Browser Note over Pipeline: 3:00 AM (cron) Pipeline->>MongoDB: ¿Pedidos > watermark? MongoDB-->>Pipeline: 1,300 pedidos nuevos Pipeline->>Parquet: orders_historicos.parquet<br/>items_historicos.parquet Pipeline->>Pipeline: segmentación + MBA + temporalidad Pipeline->>Parquet: 7 parquets analíticos Note over Browser: Usuario abre dashboard Browser->>FastAPI: GET /dashboard/bundles FastAPI->>DuckDB: SELECT FROM mba_accionables... DuckDB->>Parquet: read columns Parquet-->>DuckDB: rows DuckDB-->>FastAPI: results FastAPI-->>Browser: HTML + initial_data + Plotly Browser->>FastAPI: GET /api/bundles?segmento=MVPs FastAPI->>DuckDB: filtered query DuckDB-->>FastAPI: filtered rows FastAPI-->>Browser: JSON
sequenceDiagram
participant MongoDB
participant Pipeline
participant Parquet
participant DuckDB
participant FastAPI
participant Browser
Note over Pipeline: 3:00 AM (cron)
Pipeline->>MongoDB: ¿Pedidos > watermark?
MongoDB-->>Pipeline: 1,300 pedidos nuevos
Pipeline->>Parquet: orders_historicos.parquet<br/>items_historicos.parquet
Pipeline->>Pipeline: segmentación + MBA + temporalidad
Pipeline->>Parquet: 7 parquets analíticos
Note over Browser: Usuario abre dashboard
Browser->>FastAPI: GET /dashboard/bundles
FastAPI->>DuckDB: SELECT FROM mba_accionables...
DuckDB->>Parquet: read columns
Parquet-->>DuckDB: rows
DuckDB-->>FastAPI: results
FastAPI-->>Browser: HTML + initial_data + Plotly
Browser->>FastAPI: GET /api/bundles?segmento=MVPs
FastAPI->>DuckDB: filtered query
DuckDB-->>FastAPI: filtered rows
FastAPI-->>Browser: JSON
4. Suite de tests
El proyecto incluye 60+ tests unitarios cubriendo cada módulo analítico y los componentes del pipeline. Para correrlos:
uv run pytest
Para correr con reporte de cobertura:
uv run pytest --cov=pulse --cov-report=html
Los tests cubren especialmente los invariantes críticos:
El conteo de pedidos en orders coincide con el conteo en items.
La distribución de segmentos suma 100%.
Las reglas MBA cumplen los umbrales definidos.
El cross-check temporalidad-RFM tiene 0% de divergencia.
El modelo v1 produce los mismos clusters dados los mismos inputs (reproducibilidad).
Tip
Si vas a modificar mba.py, rfm.py o temporalidad.py, siempre corre los tests antes de mergear. La mayoría de los bugs históricos del proyecto se detectaron por estos tests.