JSPM

  • Created
  • Published
  • Downloads 652
  • Score
    100M100P100Q81373F
  • License ISC

An applicaiton execution model using RxJS

Package Exports

  • rx-txjs

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (rx-txjs) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

NPM dependencies Status

Application Execution Model

Latest Update

    • Remove Application specific code. rx-txjs is a library enable your classes to communicate one with another with another but without a regular API call. This communicate can be done locally within the process very similar to rxjs public / subscribe or between process / services / containers by some method of communication like express (http), system queue (Kafka, RanbbitMQ, Bull) or any other method. rx-txjs id agnostic to what method you prefer to use as long as you "present and object that honor TxConnector interface". once you present that object to rx-txjs by for example
TxQueuePointRegistry.instance.setDriver(my-object-that-handle-queue-communication);
TxRoutePointRegistry.instance.setDriver(my-object-that-handle-express);

rx-txjs will able to communicate directly between components with in different processes and run a Job cross services. see the doc for more info.

This change to move a bunch of predefine adapters to a new package rx-txjs-adapters anable you to use express for example out of the box.

    • Add distribute component execution. rx-txjs will run your components (your business logic) as a job one after the other. This feature enable you to run those components distrute using other instance of your service / container. See below for more info.

Documentation

Full document is now here

##Introduction This package implement an execution model based on decoupling objects (a Components) in a Nodejs environment.

A Components are regular class (better it to be TypeScript class) which implement a small piece of business unit. But the unique is that there is NO API running it implementation.

The way to interact with a Component is by sending it a message

To interact with a Component you are using several version a * MountPoints * objects.

MountPoint

    • TxMountPoint * - implement two ways 1:N traditional public / subscribe model. With TxMountPoint you can send a message to a Component and having multiple subscribe listen to it's reply (see the like above for full documentation).
    • TxSinglePoint * - implement 1:1 one way communication channel with the Component. Use this MountPoint to send a data but in order to get it's reply you have to provide your own (other) TxSinglePoint object.
    • TxDoublePoint * a kind of wrapper that include two different SinglePoint to implement bi-directional communication channels with a Component.
    • TxRoutePoint * a kind of MountPoint that implement Class-2-Class direct communication over express (HTTP).
    • TxQueuePoint * a kind of MountPoint that implement Class-2-Class direct communication over queue system.

Featute List

  • Job Execution - Able to run an series of Components implement one more complex job.
  • Job Continue / Step / Undo - features of job executions (see link above for full documentation)
  • Persistence - able to save each step of execution. This feature is well fit with 'Job Continue' so you can run a job until certain point, stop it and resume it later on.
  • Error Handling - in case of error, have the chance for each object to clean up.
  • Job Events - job has many events you ca n listen to during it's execution.
  • Recording - record the data passing between objects during the execution of a Job.
  • C2C - Class-2-Class communication enable to communication directory between to Components over some communication channel. this could be over HTTP (node express) or some kind of queue system (like Kafka, RabbitMQ).
  • S2S - Cross Service Job, this enable to run a job spreading on several services.
  • Monitor - a full monitor solution.
  • Distribution - run a job's components in different service / container instance.

What's new 0.2.3

  1. adding TxSubscribe - an implement of RxJS Subject API.
  2. adding TxSubject - a wrapper around RxJS Subject to support a type of RPC on a Componet.
  3. adding TxConnectorExpress - Coomponent-2-Component communication using subscribe / next API.

DOTO List

  1. Complete the work on S2S using node express (HTTP)
  2. Run Componets in paralle.
  3. Adding retry when execute a component (during job exection)
  4. High availablity - on component level.

Quick Start

Using a Component

Add the following code line to your class

mountpoint = TxMountPointRegistry.instance.create('MOUNTPOINT::NAME');

Note: you can any of kind of mountpoint version depend on your specfic case.

For example your component may looks like that:

class C1Component {
  mountpoint = TxMountPointRegistry.instance.create('GITHUB::GIST::C1');    

  constructor() {
    this.mountpoint.tasks().subscribe(
      (task) => {
        logger.info('[C1Component:task] got task = ' + JSON.stringify(task, undefined, 2));

         // Do some stuff here ...

        // send reply back to all the subcribers
        mountpoint.reply().next(new TxTask('get', '', task['data']));
      }
    )  
  }
  // the rest of the class ...
}

To send a message to your component use the code line:

let mountpoint = TxMountPointRegistry.instance.get('MOUNTPOINT::NAME');

mountpoint.task().next(new TxTask({method: 'doit'}, {data: 'this here'}));

Using a Job

Create the components you want them to run as a Job, sat C1Component, C2Component and C3Component.

Note: When using a component as part of the a job you must use TxSinglePoint. So in you Component define:

let mountpoint = TxSinglePointRegistry.instance.create('GITHUB::GIST::C1');

Define a Job as follow:

  let job = new TxJob();

  job.add(TxSinglePointRegistry.instance.get('GITHUB::GIST::C1'));
  job.add(TxSinglePointRegistry.instance.get('GITHUB::GIST::C2'));
  job.add(TxSinglePointRegistry.instance.get('GITHUB::GIST::C3'));

  job.execute(new TxTask(
    'create',
    '',
    {something: 'more data here'})
  ); 

This will run the components in the order they defined. This is a very simple verision of the exection options. job.execute include many more options like persist, recording, run-until and so on.

Distribure Job

First define your components, for example S1Component, S2Component and S3Component like this

export class S1Component {
  singlepoint = TxSinglePointRegistry.instance.create('GITHUB::S1');

  constructor() {
    this.singlepoint.tasks().subscribe(
      (task: TxTask<any>) => {
        logger.info('[S1Component:tasks] got task = ' + JSON.stringify(task.get(), undefined, 2));                  

        // just send the reply to whom is 'setting' on this reply subject
        task.reply().next(new TxTask({method: 'from S1', status: 'ok'}, task['data']))
      }
    );    
  }

}  

Now you need to preset to rx-txjs an object which implement TxDistribute interface.

This object has to do two things:

  1. implement send method of TxDistribute which send data to queue, express or other method of distribution.
  2. On receiving and the data to TxDistributeComponent.

You can use the builtin TxDistributeBull from rx-txjs-adapters package which will do all the work using bull library.

// import TxDistributeBull from rx-txjs-adapters package
import { TxDistributeBull } from 'rx-txjs-adapters';

// set the distributer so the job will know to where the send the data 
TxJobRegistry.instance.setDistribute(new TxDistributeBull('redis://localhost:6379'));

Now you can define the job and run it in distributed manner.

// create the job and add it's components
let job = new TxJob('job-1'); 

job.add(TxSinglePointRegistry.instance.get('GITHUB::S1'));
job.add(TxSinglePointRegistry.instance.get('GITHUB::S2'));
job.add(TxSinglePointRegistry.instance.get('GITHUB::S3'));
// define and callback when job is completed
TxJobRegistry.instance.once('job: ' + job.getUuid(), (data: TxJobEventType) => {      
  console.log('[job-execute-test] job.getIsCompleted: complete running all tasks - data:' + JSON.stringify(data, undefined, 2));
});

// now execute the job with *publish* flag turn on. 
job.execute(new TxTask({
    method: 'create',
    status: ''
  },
  {something: 'more data here'}
  ),
  {
    publish: 'distribute'
  } as TxJobExecutionOptions
);        
  • Here the complete example
import createLogger from 'logging';
const logger = createLogger('Job-Execute-Test');

import { 
  TxSinglePointRegistry,
  TxJobExecutionOptions,
  TxTask,
  TxJob,
  TxJobRegistry,  
  TxJobEventType,
} from 'rx-txjs';

import { TxDistributeBull } from 'rx-txjs-adapters';
import { S1Component } from '../components/S1.component';
import { S2Component } from '../components/S2.component';
import { S3Component } from '../components/S3.component';

new S1Component();
new S2Component();
new S3Component();

TxJobRegistry.instance.setDistribute(new TxDistributeBull('redis://localhost:6379'));

logger.info('tx-job-distribute.spec: check running S1-S2-S3 through distribute');    

let job = new TxJob('job-1');  

job.add(TxSinglePointRegistry.instance.get('GITHUB::S1'));
job.add(TxSinglePointRegistry.instance.get('GITHUB::S2'));
job.add(TxSinglePointRegistry.instance.get('GITHUB::S3'));

TxJobRegistry.instance.once('job: ' + job.getUuid(), (data: TxJobEventType) => {      
  console.log('[job-execute-test] job.getIsCompleted: complete running all tasks - data:' + JSON.stringify(data, undefined, 2));
});

job.execute(new TxTask({
    method: 'create',
    status: ''
  },
  {something: 'more data here'}
  ),
  {
    publish: 'distribute'
  } as TxJobExecutionOptions
);