Documentación Técnica

Esta sección documenta los aspectos operativos y técnicos del proyecto: instalación, despliegue, estructura del código y comandos de operación diaria. Si los demás artículos responden al “qué” y al “por qué”, esta sección responde al “cómo”.

1. Manual de instalación y despliegue

1.1 Configuraciones importantes

Antes de empezar, es importante tener claras las decisiones de arquitectura:

  • Stack: Python 3.13+, gestor de dependencias uv, FastAPI para la API web, DuckDB como motor analítico, Plotly para visualización en frontend, scikit-learn para modelado, MongoDB como fuente de datos transaccionales.
  • Persistencia: Parquet para todos los datos derivados (raw histórico + outputs analíticos). No usamos base de datos relacional para la capa analítica.
  • Sistema operativo objetivo: AlmaLinux 9 (rama RHEL/CentOS). El proyecto también corre en Windows/Ubuntu en desarrollo local.
  • Proceso supervisado: systemd para el dashboard, cron para el pipeline diario.
  • Modo SELinux recomendado: permissive (registra violaciones sin bloquear, suficiente para intranet).

1.2 Requisitos del sistema

Componente Versión mínima Notas
Sistema operativo AlmaLinux 9 / RHEL 9 / Ubuntu 22.04+ Linux estable
Python 3.13 Definido en pyproject.toml
uv 0.4+ Gestor de dependencias y entornos
Git Cualquier versión reciente Para clonar el repo
Acceso a MongoDB Red interna o cadena de conexión Fuente de datos transaccionales
Memoria RAM 8 GB mínimo El pipeline carga ~1.8M items en memoria
Disco 5 GB libres Para parquets, logs y dependencias

1.3 Dependencias principales del proyecto

Lista declarada en pyproject.toml:

Librería Uso
pandas, numpy, pyarrow Manipulación de datos y formato Parquet
scikit-learn Pipeline de transformación + K-Means
mlxtend FP-Growth y association rules para MBA
fastapi, uvicorn, jinja2 Servidor web del dashboard
duckdb Motor SQL embebido para queries del dashboard
pymongo Cliente MongoDB para ingesta
python-dotenv Manejo de credenciales fuera del código
pytest Suite de tests automáticos

1.4 Instalación

1.4.1 Clonar el repositorio

cd /home/<usuario>
git clone <url-del-repo> ct-analytics
cd ct-analytics

Si el repo es privado, autentica con gh auth login o usa SSH key configurada.

1.4.2 Instalar dependencias con uv

uv sync

Este comando lee pyproject.toml, crea un entorno virtual en .venv/ e instala todas las dependencias en versiones bloqueadas por uv.lock. No es necesario activar el venv manualmente: todos los comandos posteriores usan uv run.

Advertencia

Si por accidente clonaste el repo con sudo, los archivos quedan como root:root y uv sync va a fallar con “Permission denied”. Solución:

sudo chown -R <usuario>:<usuario> /home/<usuario>/ct-analytics
sudo rm -rf .venv  # solo si el venv ya existía con permisos rotos
uv sync

1.4.3 Configurar variables de entorno

Crea un archivo .env en la raíz del proyecto. Nunca subir este archivo a git (debe estar en .gitignore desde el primer commit):

# Conexión a MongoDB
MONGO_URI=mongodb://usuario:password@host:puerto/db
MONGO_DB=nombre_database
MONGO_COLLECTION=nombre_collection

# Opcional: zona horaria local para agregados temporales
TIMEZONE_LOCAL=America/Mexico_City

1.4.4 Subir artefactos de modelo y datos históricos

Si el servidor no va a hacer un backfill completo desde MongoDB, sube los siguientes archivos con FileZilla, SCP o rsync:

ct-analytics/
├── .env                                        # credenciales
├── datos/processed/
│   ├── orders_historicos.parquet              # raw histórico (~20 MB)
│   └── items_historicos.parquet               # raw histórico (~40 MB)
└── models/v1/
    ├── pipeline.pkl                            # modelo congelado
    └── metadata.json                           # metadata del modelo
Nota

Por qué se sube el raw histórico: el ingest incremental usa watermarks. Si arrancamos sin histórico, la primera corrida intenta extraer ~780K pedidos desde MongoDB, lo cual toma 5-10 minutos y carga la BD. Subir el parquet evita ese arranque pesado.

1.4.5 Primera corrida del pipeline

mkdir -p logs
uv run python -m pulse.pipeline weekly --log-file logs/primera_corrida.log

Resultado esperado: en 45-90 segundos verás un log similar a:

✅ Ingesta OK: 784,088 → 784,088 pedidos (cambio +0.0%)
✅ Segmentación OK: 18,638 clientes, sin nulos
✅ MBA guardado: 3,860 reglas totales, 1,516 exclusivas, 141 accionables
✅ Cross-check temporalidad-RFM OK (diferencia 0.00%)
✅ OK Pipeline · modo=weekly · duración=42.6s

Verifica que se generaron los 7 outputs analíticos:

ls datos/processed/*.parquet | wc -l   # debe dar 10 (3 base + 7 generados)

1.4.6 Despliegue persistente con systemd

Crear el servicio
sudo nano /etc/systemd/system/pulse-dashboard.service

Pega el siguiente contenido (ajusta el puerto a tu elección y el usuario al tuyo):

[Unit]
Description=Pulse Dashboard - FastAPI
After=network.target

[Service]
Type=simple
User=<usuario>
Group=<usuario>
WorkingDirectory=/home/<usuario>/ct-analytics
Environment="PATH=/home/<usuario>/.local/bin:/usr/local/bin:/usr/bin"
EnvironmentFile=/home/<usuario>/ct-analytics/.env
ExecStart=/home/<usuario>/.local/bin/uv run uvicorn pulse.dashboard.app:app --host 0.0.0.0 --port <PUERTO>
Restart=always
RestartSec=10
StandardOutput=append:/home/<usuario>/ct-analytics/logs/dashboard.log
StandardError=append:/home/<usuario>/ct-analytics/logs/dashboard.error.log

[Install]
WantedBy=multi-user.target
Habilitar e iniciar
sudo systemctl daemon-reload
sudo systemctl enable pulse-dashboard
sudo systemctl start pulse-dashboard
sudo systemctl status pulse-dashboard

El estado debe mostrar Active: active (running) y el contador de reinicios en 0.

Apertura persistente del puerto con firewalld
sudo firewall-cmd --permanent --add-port=<PUERTO>/tcp
sudo firewall-cmd --reload
sudo firewall-cmd --list-ports   # verificar
Consideraciones de SELinux

En AlmaLinux 9, SELinux viene activo en modo enforcing por default. Para un despliegue en intranet recomendamos modo permissive:

# Cambio inmediato
sudo setenforce 0

# Cambio persistente: editar /etc/selinux/config
# SELINUX=enforcing  →  SELINUX=permissive

sestatus   # verificar: "Current mode: permissive"

En modo permissive, SELinux registra todas las violaciones en /var/log/audit/audit.log sin bloquearlas. Si en el futuro migras a enforcing, esos logs te dicen exactamente qué contextos configurar.

1.4.7 Verificación final del despliegue

Desde una máquina cliente (no el servidor mismo):

http://<IP-del-servidor>:<PUERTO>

El navegador debe redirigir a /dashboard/overview y mostrar los KPIs cargados.

Test de supervivencia a reboot (la prueba real):

sudo reboot

Espera 1-2 minutos y vuelve a entrar a la URL. Si carga sin que hayas hecho nada en el servidor, el deployment es robusto.

1.4.8 Configurar cron diario del pipeline

Como tu usuario (no como root):

crontab -e

Agrega esta línea (un solo renglón):

0 3 * * * cd /home/<usuario>/ct-analytics && /home/<usuario>/.local/bin/uv run python -m pulse.pipeline daily --log-file logs/cron_$(date +\%Y\%m\%d).log 2>&1

Verifica:

crontab -l

El pipeline correrá todos los días a las 3am hora del servidor.

1.5 Comandos útiles de operación diaria

# Estado del dashboard
sudo systemctl status pulse-dashboard

# Reiniciar el dashboard (por ejemplo después de un git pull)
sudo systemctl restart pulse-dashboard

# Ver logs en vivo
sudo journalctl -u pulse-dashboard -f

# Correr pipeline manualmente (sin esperar al cron)
cd /home/<usuario>/ct-analytics
uv run python -m pulse.pipeline daily

# Forzar recálculo completo (incluyendo MBA, ~45-60s)
uv run python -m pulse.pipeline weekly

# Saltar la ingesta y solo recalcular agregados
uv run python -m pulse.pipeline weekly --skip-ingest

# Actualizar código desde git
cd /home/<usuario>/ct-analytics
git pull
sudo systemctl restart pulse-dashboard

# Ver corridas recientes del cron
ls -lt logs/cron_*.log | head -5

1.6 Solución de problemas comunes

Síntoma Diagnóstico Solución
No module named pulse.pipeline Paquete no instalado en venv uv sync con pyproject.toml correcto
Permission denied al subir archivos Owner es root, no usuario sudo chown -R user:user .
Address already in use al arrancar Proceso anterior tomó el puerto sudo ss -tlnp | grep <PUERTO> y sudo kill <PID>
Dashboard responde 500 Parquets faltantes o corruptos Re-correr uv run python -m pulse.pipeline weekly
Pipeline falla en MongoDB Credenciales o red Verificar .env y nc -zv <host> 27017
Servicio crashea en bucle Error en código o config sudo journalctl -u pulse-dashboard -n 100

2. Documentación técnica del código

2.1 Estructura de carpetas

ct-analytics/
├── src/pulse/
│   ├── config/
│   │   └── paths.py              # Rutas centralizadas (PROCESSED, MODELS, etc.)
│   ├── etl/
│   │   ├── extraction.py         # Extracción desde MongoDB
│   │   ├── transform.py          # build_both_dfs + enrich_items
│   │   ├── load.py               # save_parquet
│   │   ├── incremental.py        # leer_watermark + extract_incremental
│   │   └── ingest.py             # run_ingest con IngestResult
│   ├── analytics/
│   │   ├── familia.py            # Derivar familia desde clave de producto
│   │   ├── rfm.py                # calcular_rfm_completo + imputación
│   │   ├── segmentacion.py       # segmentar_clientes (carga + predict)
│   │   ├── mba.py                # calcular_mba por segmento
│   │   └── temporalidad.py       # Agregados hora/día/mes + bundles
│   ├── modeling/
│   │   └── segmentador.py        # SegmentadorClientes (wrapper sklearn)
│   ├── pipeline/
│   │   ├── __main__.py           # python -m pulse.pipeline
│   │   ├── cli.py                # parser de argumentos
│   │   ├── runner.py             # orquestador con modos daily/weekly/monthly
│   │   └── validacion.py         # quality checks
│   └── dashboard/
│       ├── app.py                # FastAPI app
│       ├── db.py                 # DuckDB + registro de vistas sobre parquets
│       ├── queries.py            # Queries SQL parametrizadas
│       ├── routers/
│       │   ├── api.py            # Endpoints JSON
│       │   └── pages.py          # Endpoints HTML
│       ├── templates/            # Jinja2 templates
│       └── static/
│           ├── css/styles.css
│           └── js/{charts,filters}.js
│
├── datos/
│   ├── processed/                # Producción (10 parquets)
│   └── processed_pruebas/        # Auditoría histórica
├── models/v1/
│   ├── pipeline.pkl              # Modelo congelado
│   └── metadata.json
├── tests/                        # 60+ tests (pytest)
├── logs/                         # Logs de pipeline + dashboard
├── pyproject.toml
├── uv.lock
└── .env                          # NUNCA en git

2.2 Módulos principales

2.2.1 ETL (src/pulse/etl/)

extraction.py — Cliente de MongoDB. Lee documentos crudos del orden transaccional y devuelve un iterable.

transform.py — Función build_both_dfs() que en una sola pasada construye df_orders y df_items. Aplica decisiones críticas:

  • Excluye CARGO100 (cargo financiero por tarjeta) del conteo de productos.
  • Excluye items con clave is None para preservar la invariante len(items) == num_productos.

incremental.pyextract_incremental() lee el watermark (timestamp del último pedido procesado) y solo extrae los pedidos nuevos. Si no existe watermark, hace backfill completo.

ingest.pyrun_ingest() orquesta el flujo: lee watermark → extrae nuevo → deduplica → concatena con histórico → persiste → actualiza watermark. Devuelve un objeto IngestResult con métricas.

2.2.2 Analytics (src/pulse/analytics/)

familia.py — Deriva la “familia” de cada producto desde su clave (clave sin dígitos finales). Por ejemplo, ESDKPK4710 → familia ESDKPK. Esta granularidad es clave para MBA.

rfm.pycalcular_rfm_completo(df_orders, fecha_ref, ventana_meses=30). Calcula:

  • Recency: días desde la última compra hasta fecha_ref.
  • Frequency: número de pedidos en la ventana.
  • Monetary: suma del pago_total en la ventana.
  • Dias_entre_compras: mediana de la diferencia entre compras consecutivas (cadencia personalizada).
  • Es_single_buyer: flag para clientes con una sola compra (cadencia imputada al p95 de la base).

segmentacion.pysegmentar_clientes(df_rfm, version='v1'). Carga el SegmentadorClientes congelado y aplica predict(). Devuelve el dataframe con cluster_id y segmento_cluster.

mba.pycalcular_mba(df_items, df_segmentos, df_orders). Para cada cluster:

  1. Construye canastas (pedidos multi-familia).
  2. Aplica FP-Growth con umbrales min_support_count=30, min_support_pct=0.001.
  3. Genera reglas con min_confidence=0.20 y min_lift=3.0.
  4. Deduplica reglas simétricas.
  5. Para las top 30 por lift, calcula ticket_medio y revenue_total.

Devuelve tres dataframes: por_segmento, exclusivas, accionables.

temporalidad.py — Agrega pedidos por hora-día-mes después de convertir a hora local CDMX. Genera tres outputs: temp_hora_dia, temp_mensual, temp_bundles (este último cruza reglas MBA top con su evolución mensual).

2.2.3 Modeling (src/pulse/modeling/)

segmentador.py — Clase SegmentadorClientes que encapsula:

Pipeline([
    ('log_transform', FunctionTransformer(np.log1p)),
    ('scaler', StandardScaler()),
    ('kmeans', KMeans(n_clusters=5, random_state=42)),
])

Métodos públicos: fit(), predict(), save(), load(). Persiste el pipeline serializado con joblib y los metadatos (versión, features, nombres de clusters) en JSON.

2.2.4 Pipeline (src/pulse/pipeline/)

runner.pyrun(modo). Tres modos:

  • daily: ingest + segmentación + temporalidad. MBA solo si no existe.
  • weekly: daily + recálculo completo de MBA.
  • monthly: weekly + validación de drift contra el snapshot.

Cada paso emite logs con métricas y dispara validaciones de calidad (validacion.py).

validacion.py — Quality checks que disparan al final de cada paso:

  • Ingest: el conteo de pedidos no debe disminuir más de 1%.
  • Segmentación: cero clientes con cluster_id nulo; ningún segmento debe absorber más del 50%.
  • Temporalidad: cross-check contra RFM (suma de pedidos por segmento debe coincidir al 0%).

2.2.5 Dashboard (src/pulse/dashboard/)

db.py — Una sola conexión DuckDB compartida por proceso. Cada parquet en datos/processed/ se registra como una vista SQL con nombre estable (segmentos, orders, mba_accionables, etc.).

queries.py — Funciones puras que devuelven list[dict]. Toda interacción SQL pasa por fetch_dicts(sql, params) para parametrizar (evita inyección).

routers/api.py — Endpoints JSON consumidos por el JS del cliente para filtros y drill-downs.

routers/pages.py — Endpoints HTML que renderizan templates Jinja2 con un payload initial_data embebido (evita un fetch extra en la primera pintura).

2.3 Decisiones técnicas importantes

Algunas decisiones específicas del proyecto que vale la pena conocer si se va a mantener o extender el código:

  • CARGO100 se filtra al entrar al runner, no en cada módulo. Una sola fuente de verdad.
  • Timezone: los pedidos en MongoDB están en UTC. La conversión a CDMX se hace en temporalidad.py antes de extraer hora/día/mes.
  • El log-transform vive dentro del Pipeline de sklearn, no fuera. Importante para que predict() funcione idénticamente al fit().
  • Ratio de alertas: recency / GREATEST(dias_entre_compras, 1) para evitar división por cero en clientes B2B que compran varias veces al día.
  • Cuadrantes del scatter de bundles: división por mediana dinámica (replica el notebook de referencia). No usamos umbrales fijos.

3. Diagrama de arquitectura

3.1 Vista de alto nivel

flowchart LR
    A[MongoDB<br/>Transacciones] -->|ingest incremental| B[ETL Pipeline]
    B --> C[orders_historicos.parquet<br/>items_historicos.parquet]
    C --> D[Segmentación<br/>K-Means k=5]
    D --> E[MBA<br/>FP-Growth por segmento]
    D --> F[Temporalidad<br/>hora-día-mes]
    E --> G[7 Parquets analíticos]
    F --> G
    D --> G
    G --> H[Dashboard FastAPI<br/>+ DuckDB + Plotly]
    H --> I[Marketing]

    M[Modelo v1 congelado<br/>pipeline.pkl] -.->|predict| D

    style A fill:#e3f2fd
    style M fill:#fff3e0
    style H fill:#e8f5e9
    style I fill:#f3e5f5

flowchart LR
    A[MongoDB<br/>Transacciones] -->|ingest incremental| B[ETL Pipeline]
    B --> C[orders_historicos.parquet<br/>items_historicos.parquet]
    C --> D[Segmentación<br/>K-Means k=5]
    D --> E[MBA<br/>FP-Growth por segmento]
    D --> F[Temporalidad<br/>hora-día-mes]
    E --> G[7 Parquets analíticos]
    F --> G
    D --> G
    G --> H[Dashboard FastAPI<br/>+ DuckDB + Plotly]
    H --> I[Marketing]

    M[Modelo v1 congelado<br/>pipeline.pkl] -.->|predict| D

    style A fill:#e3f2fd
    style M fill:#fff3e0
    style H fill:#e8f5e9
    style I fill:#f3e5f5

3.2 Componentes clave

Componente Responsabilidad Stack
MongoDB Fuente única de pedidos transaccionales Externo
ETL Pipeline Extraer, transformar, validar, persistir Python + pandas
Segmentación Asignar cluster a cada cliente scikit-learn (KMeans)
MBA Generar reglas de asociación por segmento mlxtend (FP-Growth)
Temporalidad Agregar por hora-día-mes pandas
Dashboard Servir 7 vistas interactivas FastAPI + DuckDB + Plotly
systemd Supervisar el proceso del dashboard OS
cron Ejecutar el pipeline diariamente OS

3.3 Flujo de interacción principal

  1. 3:00 AM (cron): el pipeline daily corre. Lee watermark, extrae pedidos nuevos de MongoDB, actualiza histórico, recalcula segmentación + temporalidad, regenera los 7 parquets.
  2. Cualquier hora: un usuario abre el dashboard en su navegador.
  3. FastAPI sirve el HTML inicial con initial_data embebido en <script>.
  4. JavaScript en el cliente llama a endpoints /api/* cuando el usuario cambia filtros.
  5. DuckDB ejecuta queries SQL sobre los parquets (sin cargar todo en memoria, gracias a lectura columnar).
  6. Plotly renderiza las visualizaciones inline.

3.4 Flujo de datos

sequenceDiagram
    participant MongoDB
    participant Pipeline
    participant Parquet
    participant DuckDB
    participant FastAPI
    participant Browser

    Note over Pipeline: 3:00 AM (cron)
    Pipeline->>MongoDB: ¿Pedidos > watermark?
    MongoDB-->>Pipeline: 1,300 pedidos nuevos
    Pipeline->>Parquet: orders_historicos.parquet<br/>items_historicos.parquet
    Pipeline->>Pipeline: segmentación + MBA + temporalidad
    Pipeline->>Parquet: 7 parquets analíticos

    Note over Browser: Usuario abre dashboard
    Browser->>FastAPI: GET /dashboard/bundles
    FastAPI->>DuckDB: SELECT FROM mba_accionables...
    DuckDB->>Parquet: read columns
    Parquet-->>DuckDB: rows
    DuckDB-->>FastAPI: results
    FastAPI-->>Browser: HTML + initial_data + Plotly

    Browser->>FastAPI: GET /api/bundles?segmento=MVPs
    FastAPI->>DuckDB: filtered query
    DuckDB-->>FastAPI: filtered rows
    FastAPI-->>Browser: JSON

sequenceDiagram
    participant MongoDB
    participant Pipeline
    participant Parquet
    participant DuckDB
    participant FastAPI
    participant Browser

    Note over Pipeline: 3:00 AM (cron)
    Pipeline->>MongoDB: ¿Pedidos > watermark?
    MongoDB-->>Pipeline: 1,300 pedidos nuevos
    Pipeline->>Parquet: orders_historicos.parquet<br/>items_historicos.parquet
    Pipeline->>Pipeline: segmentación + MBA + temporalidad
    Pipeline->>Parquet: 7 parquets analíticos

    Note over Browser: Usuario abre dashboard
    Browser->>FastAPI: GET /dashboard/bundles
    FastAPI->>DuckDB: SELECT FROM mba_accionables...
    DuckDB->>Parquet: read columns
    Parquet-->>DuckDB: rows
    DuckDB-->>FastAPI: results
    FastAPI-->>Browser: HTML + initial_data + Plotly

    Browser->>FastAPI: GET /api/bundles?segmento=MVPs
    FastAPI->>DuckDB: filtered query
    DuckDB-->>FastAPI: filtered rows
    FastAPI-->>Browser: JSON

4. Suite de tests

El proyecto incluye 60+ tests unitarios cubriendo cada módulo analítico y los componentes del pipeline. Para correrlos:

uv run pytest

Para correr con reporte de cobertura:

uv run pytest --cov=pulse --cov-report=html

Los tests cubren especialmente los invariantes críticos:

  • El conteo de pedidos en orders coincide con el conteo en items.
  • La distribución de segmentos suma 100%.
  • Las reglas MBA cumplen los umbrales definidos.
  • El cross-check temporalidad-RFM tiene 0% de divergencia.
  • El modelo v1 produce los mismos clusters dados los mismos inputs (reproducibilidad).
Tip

Si vas a modificar mba.py, rfm.py o temporalidad.py, siempre corre los tests antes de mergear. La mayoría de los bugs históricos del proyecto se detectaron por estos tests.

Volver arriba