Package Exports
- priority-scheduler-queue
Readme
Priority Scheduler Queue
A priority‑first FIFO scheduler for asynchronous work with bounded concurrency, pause/resume, idle detection, and backpressure. Ideal for task orchestration in CLIs, servers, and background workers.
Features
- Priority-first execution: Higher priority buckets are drained before lower ones; FIFO within each priority bucket.
- Bounded concurrency: Hard cap via
maxConcurrency
. - Backpressure:
maxQueueSize
rejectsadd()
when the queue is full. - Pause/Resume: Stop starting new tasks without cancelling running ones.
- Idle detection:
onIdle()
resolves when nothing is running or queued. - Typed: First-class TypeScript types.
Environment: Node.js ≥ 18.
Install
# npm
npm install priority-scheduler-queue
# pnpm
pnpm add priority-scheduler-queue
# yarn
yarn add priority-scheduler-queue
Quick Start
import { PriorityQueueFifo } from 'priority-scheduler-queue';
const queue = new PriorityQueueFifo({ maxConcurrency: 2 });
const work = (id: number) => new Promise<string>((res) => setTimeout(() => res(`done ${id}`), 200));
// Add tasks with optional priorities (default 0)
const p1 = queue.add(() => work(1), { priority: 0 });
const p2 = queue.add(() => work(2), { priority: 5 }); // runs before p1
const p3 = queue.add(() => work(3));
await Promise.all([p1, p2, p3]);
await queue.onIdle();
How It Works
- Tasks live in priority buckets (integer priorities). Higher numbers run first.
- Within a bucket, tasks are started FIFO by insertion time.
- The scheduler starts at most
maxConcurrency
tasks at once. - If
maxQueueSize
is reached,add()
immediately rejects.
API
SchedulerOptions
interface SchedulerOptions {
/** Max number of tasks allowed to run at the same time. Default: os.cpus().length */
maxConcurrency?: number;
/** Default task priority. Higher = sooner. Default: 0 */
defaultPriority?: number;
/** Max number of tasks allowed in the queue before add() rejects. Default: Infinity */
maxQueueSize?: number;
}
PriorityQueueFifo
Constructor
new PriorityQueueFifo(options?: SchedulerOptions)
Methods
add<T>(taskFn: () => Promise<T>, opts?: { priority?: number; id?: string }): Promise<T>
- Enqueue a task. Returns a promise for that task’s result.
pause(): void
/resume(): void
- Stop/allow starting new tasks. Running tasks are unaffected.
onIdle(): Promise<void>
- Resolves when no tasks are running, pending, or queued.
clear(finalError?: unknown): void
- Rejects and removes all queued (not yet started) tasks.
size: number
- Count of queued tasks (not counting tasks about to start this tick).
runningCount: number
- Number of tasks currently running plus pending starters (≤
maxConcurrency
).
- Number of tasks currently running plus pending starters (≤
isCurrentlyPaused: boolean
- Whether the scheduler is paused.
Usage Patterns
Priorities & FIFO
Use integers. Suggested convention:
10
— high priority0
— normal-10
— low priority
queue.add(() => doWork('low'), { priority: -10 });
queue.add(() => doWork('normal')); // 0
queue.add(() => doWork('high'), { priority: 10 });
High priority starts first. Within the same priority, earlier add()
calls start earlier.
Pausing and Resuming
queue.pause();
queue.add(() => fetchExpensive()); // queued but not started
// ...later
queue.resume(); // starts queued tasks up to maxConcurrency
Waiting for Idle
await queue.onIdle(); // resolves when nothing is running or queued
Clearing the Queue
queue.clear();
// All not-yet-started tasks are rejected with an Error.
You can pass a custom error:
queue.clear(new Error('Shutting down'));
Handling Full Queues
const q = new PriorityQueueFifo({ maxQueueSize: 100 });
try {
await q.add(() => doWork());
} catch (e) {
// If size >= 100 at the time of add(), you land here.
}
Retries (user-land)
Retries are intentionally not built-in; compose them as needed:
async function withRetry<T>(fn: () => Promise<T>, retries = 2) {
let lastErr: unknown;
for (let i = 0; i <= retries; i++) {
try {
return await fn();
} catch (e) {
lastErr = e;
}
}
throw lastErr;
}
queue.add(() => withRetry(() => flakyFetch()));
Error Handling
- The promise returned by
add()
resolves/rejects with your task’s result/error. - If you clear the queue, queued tasks reject with the provided error (or a default "cancelled" error).
- If the queue is full,
add()
rejects immediately.
queue.add(doThing).then(handle, handleError);
Performance Notes
- Choosing a task uses a "highest priority first" pass and FIFO within the bucket.
- Distinct priorities are typically few; bucket operations are O(1) amortized for enqueue/dequeue.
- Set
maxConcurrency
thoughtfully. For CPU-bound tasks, matchingos.cpus().length
is a good default.
FAQs
Q: Does it run in the browser?
A: It targets Node.js (uses os.cpus()
). Browser builds would need a polyfill for os
or a custom maxConcurrency
.
Q: What if my tasks aren’t promises?
A: Wrap them: queue.add(async () => doSyncThing())
.
Q: Can I reorder tasks after enqueueing?
A: No—use priorities at add()
time.
Q: Is cancellation supported?
A: clear()
cancels queued jobs. For running tasks, use your own cancellation (e.g., AbortController
within taskFn
).