JSPM

@jamx-framework/queue

1.0.0
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 6
  • Score
    100M100P100Q64876F
  • License MIT

JAMX Framework — Async job queue

Package Exports

  • @jamx-framework/queue

Readme

@jamx-framework/queue

Descripción

Módulo de colas (queues) y workers para JAMX Framework. Proporciona una API para gestionar tareas asíncronas, procesamiento en background y ejecución de jobs con soporte para múltiples backends (Redis, RabbitMQ, AWS SQS). Incluye características como prioridades, retries, dead letter queues, y scheduling de tareas.

Cómo funciona

El módulo implementa un sistema de colas basado en productor-consumidor:

  1. Queue: Clase principal que encola jobs y gestiona workers
  2. Worker: Procesa jobs de una cola específica, con concurrencia y retry configurable
  3. Drivers: Implementaciones para diferentes backends (Redis, RabbitMQ, SQS)
  4. Job: Representa una tarea con payload, prioridad, y metadata

Componentes principales

  • src/queue.ts: Clase Queue que gestiona encolado y workers
  • src/worker.ts: Clase Worker que procesa jobs
  • src/types.ts: Tipos (Job, QueueOptions, WorkerOptions, etc.)
  • src/index.ts: Punto de exportación

Uso básico

import { Queue, RedisDriver } from '@jamx-framework/queue';

// Crear cola con driver Redis
const queue = new Queue(new RedisDriver({ url: 'redis://localhost:6379' }));

// Añadir job
await queue.add('send-email', {
  to: 'user@example.com',
  subject: 'Hello',
  body: 'Message body',
});

// Procesar jobs con worker
const worker = queue.process('send-email', async (job) => {
  await mailer.send(job.data);
});

// O con opciones de concurrencia y retry
const worker = queue.process('process-image', async (job) => {
  await processImage(job.data);
}, {
  concurrency: 5,
  maxRetries: 3,
  retryDelay: 5000,
});

Ejemplos

Job con prioridad

// Añadir job con prioridad (menor número = mayor prioridad)
await queue.add('critical-task', { data: 'urgent' }, { priority: 1 });
await queue.add('normal-task', { data: 'normal' }, { priority: 5 });
await queue.add('low-task', { data: 'low' }, { priority: 10 });

// Los workers procesarán en orden de prioridad

Job con delay (programado)

// Ejecutar job en 5 minutos
await queue.add('reminder', { userId: 123 }, { delay: 300_000 });

// Ejecutar en fecha específica
const runAt = new Date(Date.now() + 3600_000);
await queue.add('scheduled-report', { reportId: 'abc' }, { runAt });

Retry con backoff

const worker = queue.process('api-call', async (job) => {
  const result = await callExternalApi(job.data);
  if (!result.success) throw new Error('API failed');
  return result;
}, {
  maxRetries: 5,
  retryDelay: 1000,
  backoff: 'exponential', // 1s, 2s, 4s, 8s, 16s
});

Dead Letter Queue (DLQ)

const worker = queue.process('payment-processing', async (job) => {
  await processPayment(job.data);
}, {
  maxRetries: 3,
  deadLetterQueue: 'payment-failed', // jobs fallidos van aquí
});

// Procesar DLQ separadamente
queue.process('payment-failed', async (job) => {
  await notifyAdmin(job.data);
  await logFailure(job);
});

Job batches

// Añadir múltiples jobs en una operación atómica
await queue.addBulk([
  { name: 'email-1', data: { to: 'user1@example.com' } },
  { name: 'email-2', data: { to: 'user2@example.com' } },
  { name: 'email-3', data: { to: 'user3@example.com' } },
]);

// Procesar en batch (misma función para todos)
await queue.processBatch('send-emails', async (jobs) => {
  for (const job of jobs) {
    await mailer.send(job.data);
  }
}, { batchSize: 10 });

Pausar/reanudar cola

// Pausar procesamiento (no acepta nuevos jobs)
await queue.pause('email-queue');

// Reanudar
await queue.resume('email-queue');

// Ver estado
const stats = await queue.getStats('email-queue');
console.log('Jobs waiting:', stats.waiting);
console.log('Jobs active:', stats.active);

Job con progreso

const worker = queue.process('large-import', async (job, update) => {
  const total = job.data.items.length;
  
  for (let i = 0; i < total; i++) {
    await processItem(job.data.items[i]);
    
    // Actualizar progreso
    await update({ progress: (i + 1) / total });
  }
});

// Monitorear progreso desde otro lugar
const progress = await queue.getJobProgress(jobId);
console.log(`Progreso: ${(progress * 100).toFixed(1)}%`);

Uso con inyección de dependencias

import { Container } from '@jamx-framework/core';
import { Queue } from '@jamx-framework/queue';

Container.registerSingleton('queue', () => {
  return new Queue(new RedisDriver({ url: process.env.REDIS_URL }));
});

const queue = Container.resolve<Queue>('queue');
await queue.add('task', { data: 'value' });

Flujo interno

  1. Encolado: queue.add(name, data, options) serializa el job y lo envía al driver
  2. Almacenamiento: El driver guarda el job en Redis/RabbitMQ/SQS con metadata (priority, delay, etc.)
  3. Workers: queue.process(name, handler, options) crea workers que escuchan la cola
  4. Desencolado: Los workers obtienen jobs (respetando prioridad y delay)
  5. Ejecución: Se ejecuta el handler con el job como argumento
  6. Retry: Si falla, se reintenta según política (maxRetries, delay, backoff)
  7. DLQ: Después de maxRetries, el job va a dead letter queue
  8. Completado: Si tiene éxito, se elimina de la cola

API Reference (Resumen)

Queue

  • constructor(driver: QueueDriver, options?: QueueOptions)
  • async add(name: string, data: any, options?: JobOptions): Promise<string> (retorna jobId)
  • async addBulk(jobs: BulkJob[]): Promise<string[]>
  • async process(name: string, handler: JobHandler, options?: WorkerOptions): Promise<Worker>
  • async pause(name: string): Promise<void>
  • async resume(name: string): Promise<void>
  • async getJob(jobId: string): Promise<Job | null>
  • async getJobProgress(jobId: string): Promise<number>
  • async getStats(name?: string): Promise<QueueStats>
  • async removeQueue(name: string): Promise<void>
  • async clearQueue(name: string): Promise<void>

Worker

  • id: string
  • name: string
  • isRunning(): boolean
  • stop(): Promise<void>

Job

  • id: string
  • name: string
  • data: any
  • priority: number
  • status: 'pending' | 'active' | 'completed' | 'failed'
  • attempts: number
  • failedReason?: string
  • createdAt: Date
  • processedAt?: Date

QueueDriver (interface)

  • async connect(): Promise<void>
  • async disconnect(): Promise<void>
  • async add(queue: string, job: JobData): Promise<string>
  • async addBulk(queue: string, jobs: JobData[]): Promise<string[]>
  • async getNext(queue: string): Promise<Job | null>
  • async ack(queue: string, jobId: string): Promise<void>
  • async nack(queue: string, jobId: string, requeue: boolean): Promise<void>
  • async getStats(queue?: string): Promise<QueueStats>
  • async pause(queue: string): Promise<void>
  • async resume(queue: string): Promise<void>

Performance Considerations

  • Concurrency: Los workers pueden procesar múltiples jobs concurrentemente
  • Batching: addBulk reduce roundtrips al backend
  • Connection pooling: Los drivers reutilizan conexiones
  • Visibility timeout: En Redis, los jobs activos se ocultan temporalmente
  • Memory usage: Los jobs grandes deben evitarse (usar referencias a almacenamiento)

Configuration Options

// Redis Driver
const redisDriver = new RedisDriver({
  url: 'redis://localhost:6379',
  namespace: 'jamx:queue',
  maxRetries: 3,
  retryDelay: 5000,
});

// Queue options
const queue = new Queue(redisDriver, {
  defaultConcurrency: 10,
  defaultPriority: 5,
  defaultDelay: 0,
});

// Worker options
queue.process('my-queue', handler, {
  concurrency: 5,        // jobs simultáneos
  maxRetries: 3,         // reintentos máximos
  retryDelay: 1000,      // delay entre reintentos (ms)
  backoff: 'linear',     // 'linear' | 'exponential'
  deadLetterQueue: 'dlq', // cola de fallidos
  batchSize: 1,          // tamaño de batch (1 = no batch)
});

Testing

Tests en packages/queue/tests/unit/:

pnpm test

Cubre:

  • Encolado y desencolado
  • Prioridades
  • Retry y backoff
  • Dead letter queues
  • Concurrencia
  • Pausa/resume

Compatibility

  • Compatible con Node.js 18+
  • Drivers: Redis, RabbitMQ, AWS SQS
  • Funciona en Windows, macOS, Linux
  • No requiere dependencias nativas

CLI Integration

  • jamx queue:list: Lista todas las colas
  • jamx queue:stats <queue>: Muestra estadísticas de una cola
  • jamx queue:jobs <queue>: Lista jobs en una cola
  • jamx queue:retry <jobId>: Reintenta un job fallido
  • jamx queue:delete <jobId>: Elimina un job
  • jamx queue:purge <queue>: Limpia toda una cola
  • jamx queue:dlq: Muestra y gestiona dead letter queues

Best Practices

  1. Idempotency: Los jobs deben ser idempotentes (ejecutar múltiples veces no daña)
  2. Small payloads: No guardar datos grandes en el job; usar referencias (IDs)
  3. Timeouts: Configurar timeouts apropiados para evitar jobs colgados
  4. Monitoring: Monitorear longitud de cola, tasa de fallos, latencia
  5. Dead letter queues: Siempre configurar DLQ para investigar fallos
  6. Priorities: Usar prioridades para jobs críticos vs best-effort
  7. Graceful shutdown: Los workers deben manejar SIGTERM y terminar jobs en curso

Troubleshooting

Cola llena

Aumentar concurrency o añadir más workers.

Jobs stuck en "active"

Verificar que los workers no estén caídos; revisar timeouts.

Alta tasa de fallos

Revisar DLQ, ajustar retry policy, verificar dependencias externas.

Memory leaks

Asegurarse de que los handlers no retengan referencias a jobs completados.

This queue module provides a robust, production-ready job processing system for JAMX applications, enabling asynchronous task execution, background processing, and reliable delivery with retries and dead letter queues.