JSPM

@memberjunction/queue

2.100.0
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 1441
  • Score
    100M100P100Q135176F
  • License ISC

MemberJunction: Queue Library for managing server side queues

Package Exports

    This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (@memberjunction/queue) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

    Readme

    @memberjunction/queue

    A flexible queue management system for MemberJunction applications that enables background task processing, job scheduling, and asynchronous execution with database persistence.

    Overview

    The @memberjunction/queue package provides a robust framework for implementing persistent queues in MemberJunction applications. It offers:

    • Database-backed task persistence
    • Automatic queue creation and management
    • Concurrent task processing with configurable limits
    • Heartbeat monitoring for process health
    • Type-safe task definitions
    • Extensible queue implementations

    Installation

    npm install @memberjunction/queue

    Dependencies

    This package requires the following MemberJunction packages:

    • @memberjunction/core - Core functionality and entity management
    • @memberjunction/global - Global utilities and class registration
    • @memberjunction/core-entities - Entity type definitions
    • @memberjunction/ai - AI functionality (for AI-related queues)
    • @memberjunction/aiengine - AI Engine integration

    Additional dependencies:

    • uuid - For generating unique identifiers

    Core Components

    TaskBase

    The TaskBase class represents an individual task in a queue:

    export class TaskBase {
      constructor(
        taskRecord: QueueTaskEntity,
        data: any,
        options: TaskOptions
      )
      
      // Properties
      ID: string              // Unique task identifier
      Status: TaskStatus      // Current task status
      Data: any              // Task payload data
      Options: TaskOptions   // Task configuration
      TaskRecord: QueueTaskEntity // Database entity
    }

    TaskStatus

    Available task statuses:

    export const TaskStatus = {
      Pending: 'Pending',
      InProgress: 'InProgress',
      Complete: 'Complete',
      Failed: 'Failed',
      Cancelled: 'Cancelled',
    } as const;

    QueueBase

    The abstract QueueBase class serves as the foundation for all queue implementations:

    export abstract class QueueBase {
      constructor(
        QueueRecord: QueueEntity,
        QueueTypeID: string,
        ContextUser: UserInfo
      )
      
      // Public methods
      AddTask(task: TaskBase): boolean
      FindTask(ID: string): TaskBase
      
      // Protected abstract method to implement
      protected abstract ProcessTask(
        task: TaskBase, 
        contextUser: UserInfo
      ): Promise<TaskResult>
    }

    QueueManager

    The QueueManager is a singleton that manages all active queues:

    export class QueueManager {
      // Singleton access
      static get Instance(): QueueManager
      
      // Static methods
      static async Config(contextUser: UserInfo): Promise<void>
      static async AddTask(
        QueueType: string,
        data: any,
        options: any,
        contextUser: UserInfo
      ): Promise<TaskBase | undefined>
      
      // Instance methods
      async AddTask(
        QueueTypeID: string,
        data: any,
        options: any,
        contextUser: UserInfo
      ): Promise<TaskBase | undefined>
    }

    TaskResult

    Structure returned by task processing:

    export class TaskResult {
      success: boolean      // Whether task completed successfully
      userMessage: string   // User-friendly message
      output: any          // Task output data
      exception: any       // Exception details if failed
    }

    Usage Examples

    Basic Queue Implementation

    Create a custom queue by extending QueueBase:

    import { QueueBase, TaskBase, TaskResult } from '@memberjunction/queue';
    import { RegisterClass } from '@memberjunction/global';
    import { UserInfo } from '@memberjunction/core';
    
    // Register your queue with a specific queue type name
    @RegisterClass(QueueBase, 'Email Notification')
    export class EmailNotificationQueue extends QueueBase {
      protected async ProcessTask(
        task: TaskBase, 
        contextUser: UserInfo
      ): Promise<TaskResult> {
        try {
          // Extract task data
          const { recipient, subject, body } = task.Data;
          
          // Implement your email sending logic here
          console.log(`Sending email to ${recipient}`);
          console.log(`Subject: ${subject}`);
          
          // Simulate email sending
          await this.sendEmail(recipient, subject, body);
          
          // Return success result
          return {
            success: true,
            userMessage: 'Email sent successfully',
            output: { sentAt: new Date() },
            exception: null
          };
        } catch (error) {
          // Return failure result
          return {
            success: false,
            userMessage: `Failed to send email: ${error.message}`,
            output: null,
            exception: error
          };
        }
      }
      
      private async sendEmail(to: string, subject: string, body: string) {
        // Your email service integration here
      }
    }

    Adding Tasks to Queue

    import { QueueManager } from '@memberjunction/queue';
    import { UserInfo } from '@memberjunction/core';
    
    // Initialize queue manager (typically done once at app startup)
    await QueueManager.Config(contextUser);
    
    // Add a task using queue type name
    const task = await QueueManager.AddTask(
      'Email Notification',  // Queue type name
      {                     // Task data
        recipient: 'user@example.com',
        subject: 'Welcome to MemberJunction',
        body: 'Thank you for joining!'
      },
      {                     // Task options
        priority: 1
      },
      contextUser
    );
    
    if (task) {
      console.log(`Task created with ID: ${task.ID}`);
    }

    AI Action Queue Example

    The package includes built-in queues for AI operations:

    import { AIActionQueue, EntityAIActionQueue } from '@memberjunction/queue';
    
    // These queues are automatically registered and available for use
    // Add an AI action task
    const aiTask = await QueueManager.AddTask(
      'AI Action',
      {
        actionName: 'GenerateText',
        prompt: 'Write a product description',
        parameters: { maxTokens: 100 }
      },
      {},
      contextUser
    );
    
    // Add an entity-specific AI action
    const entityAITask = await QueueManager.AddTask(
      'Entity AI Action',
      {
        entityName: 'Products',
        entityID: 123,
        actionName: 'GenerateDescription'
      },
      {},
      contextUser
    );

    Database Schema

    The queue system requires the following database tables:

    Queue Types Table (__mj.QueueType)

    Stores definitions of different queue types (e.g., "Email Notification", "AI Action")

    Queues Table (__mj.Queue)

    Tracks active queue instances with process information:

    • Queue type reference
    • Process details (PID, platform, hostname)
    • Heartbeat timestamp
    • Network information

    Queue Tasks Table (__mj.QueueTask)

    Stores individual tasks:

    • Queue reference
    • Task status
    • Task data (JSON)
    • Task options (JSON)
    • Output and error information

    Process Management

    The QueueManager automatically captures process information for monitoring:

    • Process ID (PID)
    • Platform and version
    • Working directory
    • Network interfaces
    • Operating system details
    • User information
    • Heartbeat timestamps

    This information helps track queue health and enables failover scenarios.

    Configuration

    Queue behavior can be configured through the implementation:

    export class CustomQueue extends QueueBase {
      private _maxTasks = 5;        // Maximum concurrent tasks
      private _checkInterval = 500;  // Check interval in milliseconds
      
      // Override these values in your constructor
      constructor(QueueRecord: QueueEntity, QueueTypeID: string, ContextUser: UserInfo) {
        super(QueueRecord, QueueTypeID, ContextUser);
        // Customize queue behavior
        this._maxTasks = 10;
        this._checkInterval = 1000;
      }
    }

    Best Practices

    1. Task Data Structure: Keep task data serializable as JSON
    2. Error Handling: Always return proper TaskResult with error details
    3. Queue Registration: Use @RegisterClass decorator for automatic registration
    4. Idempotency: Design tasks to be safely retryable
    5. Resource Cleanup: Clean up resources in finally blocks
    6. Monitoring: Check heartbeat timestamps for queue health

    Integration with MemberJunction

    The queue system integrates seamlessly with:

    • Entity System: Use entities for task data and processing
    • User Context: All operations respect user permissions
    • Global Registry: Automatic queue discovery via class registration
    • AI Engine: Built-in support for AI task processing

    Build & Development

    # Build the package
    npm run build
    
    # Development mode with auto-reload
    npm run start
    
    # TypeScript compilation only
    npm run build

    License

    ISC