JSPM

priority-scheduler-queue

0.1.0
    • ESM via JSPM
    • ES Module Entrypoint
    • Export Map
    • Keywords
    • License
    • Repository URL
    • TypeScript Types
    • README
    • Created
    • Published
    • Downloads 4
    • Score
      100M100P100Q38490F
    • License MIT

    Package Exports

    • priority-scheduler-queue

    Readme

    Priority Scheduler Queue

    npm version node types license: MIT

    A priority‑first FIFO scheduler for asynchronous work with bounded concurrency, pause/resume, idle detection, and backpressure. Ideal for task orchestration in CLIs, servers, and background workers.



    Features

    • Priority-first execution: Higher priority buckets are drained before lower ones; FIFO within each priority bucket.
    • Bounded concurrency: Hard cap via maxConcurrency.
    • Backpressure: maxQueueSize rejects add() when the queue is full.
    • Pause/Resume: Stop starting new tasks without cancelling running ones.
    • Idle detection: onIdle() resolves when nothing is running or queued.
    • Typed: First-class TypeScript types.

    Environment: Node.js ≥ 18.


    Install

    # npm
    npm install priority-scheduler-queue
    
    # pnpm
    pnpm add priority-scheduler-queue
    
    # yarn
    yarn add priority-scheduler-queue

    Quick Start

    import { PriorityQueueFifo } from 'priority-scheduler-queue';
    
    const queue = new PriorityQueueFifo({ maxConcurrency: 2 });
    
    const work = (id: number) => new Promise<string>((res) => setTimeout(() => res(`done ${id}`), 200));
    
    // Add tasks with optional priorities (default 0)
    const p1 = queue.add(() => work(1), { priority: 0 });
    const p2 = queue.add(() => work(2), { priority: 5 }); // runs before p1
    const p3 = queue.add(() => work(3));
    
    await Promise.all([p1, p2, p3]);
    await queue.onIdle();

    How It Works

    • Tasks live in priority buckets (integer priorities). Higher numbers run first.
    • Within a bucket, tasks are started FIFO by insertion time.
    • The scheduler starts at most maxConcurrency tasks at once.
    • If maxQueueSize is reached, add() immediately rejects.

    API

    SchedulerOptions

    interface SchedulerOptions {
      /** Max number of tasks allowed to run at the same time. Default: os.cpus().length */
      maxConcurrency?: number;
    
      /** Default task priority. Higher = sooner. Default: 0 */
      defaultPriority?: number;
    
      /** Max number of tasks allowed in the queue before add() rejects. Default: Infinity */
      maxQueueSize?: number;
    }

    PriorityQueueFifo

    Constructor

    new PriorityQueueFifo(options?: SchedulerOptions)

    Methods

    • add<T>(taskFn: () => Promise<T>, opts?: { priority?: number; id?: string }): Promise<T>

      • Enqueue a task. Returns a promise for that task’s result.
    • pause(): void / resume(): void

      • Stop/allow starting new tasks. Running tasks are unaffected.
    • onIdle(): Promise<void>

      • Resolves when no tasks are running, pending, or queued.
    • clear(finalError?: unknown): void

      • Rejects and removes all queued (not yet started) tasks.
    • size: number

      • Count of queued tasks (not counting tasks about to start this tick).
    • runningCount: number

      • Number of tasks currently running plus pending starters (≤ maxConcurrency).
    • isCurrentlyPaused: boolean

      • Whether the scheduler is paused.

    Usage Patterns

    Priorities & FIFO

    Use integers. Suggested convention:

    • 10 — high priority
    • 0 — normal
    • -10 — low priority
    queue.add(() => doWork('low'), { priority: -10 });
    queue.add(() => doWork('normal')); // 0
    queue.add(() => doWork('high'), { priority: 10 });

    High priority starts first. Within the same priority, earlier add() calls start earlier.

    Pausing and Resuming

    queue.pause();
    queue.add(() => fetchExpensive()); // queued but not started
    
    // ...later
    queue.resume(); // starts queued tasks up to maxConcurrency

    Waiting for Idle

    await queue.onIdle(); // resolves when nothing is running or queued

    Clearing the Queue

    queue.clear();
    // All not-yet-started tasks are rejected with an Error.

    You can pass a custom error:

    queue.clear(new Error('Shutting down'));

    Handling Full Queues

    const q = new PriorityQueueFifo({ maxQueueSize: 100 });
    
    try {
      await q.add(() => doWork());
    } catch (e) {
      // If size >= 100 at the time of add(), you land here.
    }

    Retries (user-land)

    Retries are intentionally not built-in; compose them as needed:

    async function withRetry<T>(fn: () => Promise<T>, retries = 2) {
      let lastErr: unknown;
      for (let i = 0; i <= retries; i++) {
        try {
          return await fn();
        } catch (e) {
          lastErr = e;
        }
      }
      throw lastErr;
    }
    
    queue.add(() => withRetry(() => flakyFetch()));

    Error Handling

    • The promise returned by add() resolves/rejects with your task’s result/error.
    • If you clear the queue, queued tasks reject with the provided error (or a default "cancelled" error).
    • If the queue is full, add() rejects immediately.
    queue.add(doThing).then(handle, handleError);

    Performance Notes

    • Choosing a task uses a "highest priority first" pass and FIFO within the bucket.
    • Distinct priorities are typically few; bucket operations are O(1) amortized for enqueue/dequeue.
    • Set maxConcurrency thoughtfully. For CPU-bound tasks, matching os.cpus().length is a good default.

    FAQs

    Q: Does it run in the browser? A: It targets Node.js (uses os.cpus()). Browser builds would need a polyfill for os or a custom maxConcurrency.

    Q: What if my tasks aren’t promises? A: Wrap them: queue.add(async () => doSyncThing()).

    Q: Can I reorder tasks after enqueueing? A: No—use priorities at add() time.

    Q: Is cancellation supported? A: clear() cancels queued jobs. For running tasks, use your own cancellation (e.g., AbortController within taskFn).



    API Reference

    View the Reference Docs