Package Exports
- ravenflow
Readme
ravenflow
Type-safe job queue framework for TypeScript, built on BullMQ and Redis.
Define jobs as plain functions, ravenflow handles scheduling, retries, rate limiting, distributed processing, and crash recovery.
Install
npm install ravenflow
# or
pnpm add ravenflowRequirements: Redis 6+, Node.js 18+. MongoDB required for stateful-etl and token-bucket jobs.
Quick Start
import {
createRavenflowApp,
defineConfig,
defineSimpleJob,
} from "ravenflow";
// 1. Define your jobs
const config = defineConfig({
defaultQueue: "default",
jobs: {
sendEmail: defineSimpleJob({
handler: async (job, params: { to: string; subject: string }) => {
await sendEmail(params.to, params.subject);
return { sent: true };
},
}),
},
});
// 2. Create the app
const app = createRavenflowApp(config, {
redis: { host: "localhost", port: 6379 },
});
// 3. Start the worker
await app.startWorker("default", { concurrency: 10 });
// 4. Send jobs
await app.jobs.sendEmail.send({ to: "user@example.com", subject: "Hello" });
// Or send and wait for the result
const result = await app.jobs.sendEmail.sendAndWait({ to: "user@example.com", subject: "Hello" });
console.log(result); // { sent: true }Job Types
Simple Job
One-shot function execution. Ideal for notifications, cleanup tasks, webhooks.
import { defineSimpleJob } from "ravenflow";
const sendEmail = defineSimpleJob({
cron: "*/5 * * * *", // optional: run every 5 minutes
handler: async (job, params: { to: string; text: string }) => {
const result = await emailService.send(params.to, params.text);
return { messageId: result.id };
},
});Memory ETL
Extract-Transform-Load pipeline that runs entirely in-memory. Cursor-based pagination, parallel transforms with rate limiting.
import { defineMemoryEtlJob } from "ravenflow";
const syncUsers = defineMemoryEtlJob({
cron: "0 2 * * *", // daily at 2 AM
extract: async (cursor: number | null) => {
const offset = cursor ?? 0;
const users = await api.getUsers({ offset, limit: 100 });
return {
data: users,
cursor: offset + 100,
hasMore: users.length === 100,
};
},
transform: async (user) => ({
id: user.id,
email: user.email,
normalizedName: user.name.trim().toLowerCase(),
}),
load: async (users) => {
await db.users.insertMany(users);
},
steps: {
transform: {
rateLimit: { perSecond: 50 }, // even spacing: 1 every 20ms
},
},
});Distributed ETL
Same extract/transform/load pattern, but transforms are dispatched as independent BullMQ sub-jobs for parallel processing across workers.
import { defineDistributedEtlJob } from "ravenflow";
const processOrders = defineDistributedEtlJob({
cron: "*/10 * * * *",
extract: async (cursor: number | null) => {
const orders = await db.orders
.find({ status: "pending" })
.skip(cursor ?? 0)
.limit(50)
.toArray();
return {
data: orders,
cursor: (cursor ?? 0) + 50,
hasMore: orders.length === 50,
};
},
transform: async (order) => {
const enriched = await paymentGateway.verify(order.paymentId);
return { ...order, verified: enriched.status === "ok" };
},
load: async (orders) => {
await db.processedOrders.insertMany(orders);
},
steps: {
transform: {
rateLimit: { perSecond: 10 },
attempts: 3,
backoff: { type: "exponential", delay: 1000 },
},
},
});Stateful ETL
Distributed ETL with MongoDB-backed checkpoints and automatic crash recovery. If a worker dies mid-processing, the Stall Watcher detects it and resumes from the last checkpoint.
import { defineStatefulEtlJob } from "ravenflow";
const migrateLegacyData = defineStatefulEtlJob({
cron: "0 3 * * *",
singletonRun: true, // only one run at a time
stallTimeoutMs: 60_000, // detect stall after 60s without heartbeat
maxStallRetries: 3,
extract: async (cursor: string | null) => {
const page = await legacyApi.fetch({ after: cursor, limit: 100 });
return {
data: page.items,
cursor: page.nextCursor,
hasMore: page.hasMore,
};
},
transform: async (item) => {
return mapToNewSchema(item);
},
load: async (items) => {
await newDb.collection("records").insertMany(items);
},
steps: {
transform: {
rateLimit: { perSecond: 5 },
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
},
},
});How crash recovery works:
- During processing, the worker sends heartbeats to MongoDB
- A
__stall_checkcron job monitors all active runs - If a heartbeat goes stale (exceeds
stallTimeoutMs), the run is claimed as "stalled" - A
__stall_resumejob re-enqueues the original job with the last checkpoint - Processing resumes from where it left off (no duplicate work)
Configuration
App Options
const app = createRavenflowApp(config, {
// Required: Redis connection
redis: { host: "localhost", port: 6379, password: "secret", db: 0 },
// Optional: MongoDB (required for stateful-etl and token-bucket)
// String form:
mongo: "mongodb://localhost:27017/myapp",
// Or object form:
mongo: { host: "localhost", port: 27017, username: "admin", password: "secret", database: "myapp" },
// Optional: job retention policy
retention: {
completed: 1000, // keep last 1000 completed jobs per queue
failed: 5000, // keep last 5000 failed jobs per queue
},
// Optional: logger config (pino)
logger: { level: "info", pretty: true },
});Rate Limiting
Even-spacing rate limiter. perSecond: 10 means 1 request every 100ms (not a burst of 10 then wait).
steps: {
extract: { rateLimit: { perMinute: 60 } }, // 1 per second
transform: { rateLimit: { perSecond: 20 } }, // 1 every 50ms
load: { rateLimit: { perSecond: 5 } }, // 1 every 200ms
}Step Config (Distributed & Stateful ETL)
Each step can have its own retry strategy, rate limit, and target queue:
steps: {
transform: {
rateLimit: { perSecond: 10 },
attempts: 3,
backoff: { type: "exponential", delay: 1000 },
queue: "heavy-transforms", // route to a different queue
},
}Cron Scheduling
Jobs with a cron field are automatically scheduled when the worker starts:
defineSimpleJob({
cron: "*/5 * * * *", // every 5 minutes
handler: async () => { /* ... */ },
});Sending Jobs
// Fire and forget — returns BullMQ Job object
const job = await app.jobs.sendEmail.send({ to: "user@example.com", subject: "Hi" });
// Wait for the result
const result = await app.jobs.sendEmail.sendAndWait({ to: "user@example.com", subject: "Hi" });
// ETL jobs don't take parameters
await app.jobs.syncUsers.send();
// ETL sendAndWait returns counts
const counts = await app.jobs.syncUsers.sendAndWait();
// { extractedCount: 500, transformedCount: 500, loadedCount: 500 }Worker
// Start processing jobs from a queue
await app.startWorker("default", { concurrency: 20 });
// Graceful shutdown
await app.close();When startWorker is called:
- Cron schedules are synced to Redis
- The Stall Watcher starts automatically if stateful-etl jobs are registered
- The worker begins processing jobs
Config Validation
Ravenflow validates your config at startup and throws clear errors:
[ravenflow] Config validation failed:
- "migrateLegacy" (stateful-etl) requires a mongo connection.
- "rateLimiter" (token-bucket) requires a cron schedule.Companion Packages
@ravenflow/mongo— Typed MongoDB collections, base repository, Zod schema integration@ravenflow/http— HTTP client utilities built on undici
Worker metrics (pulse / throughput observability)
createWorker enables BullMQ's native per-minute metrics collection by default. Every Worker constructed through ravenflow records a monotonic completed-job count + a 10_080-bucket rolling minute histogram under the bull:<queue>:metrics:completed Redis keys. The @ravenflow/dashboard Jobs-per-Second live indicator reads those keys via queue.getMetrics('completed').
// Default — metrics enabled, 7-day window (~10-40 KB per queue in Redis).
const worker = createWorker(queueName, connection, concurrency, processJob);
// Narrow the window to 1 hour.
import { METRICS_TIME } from "ravenflow";
const worker = createWorker(queueName, connection, concurrency, processJob, {
metrics: { maxDataPoints: METRICS_TIME.ONE_HOUR },
});
// Fully opt out — no metrics collection, no Redis footprint.
const worker = createWorker(queueName, connection, concurrency, processJob, {
metrics: undefined,
});See CHANGELOG.md for the rationale and Redis cost breakdown.
License
MIT