JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 49362
  • Score
    100M100P100Q154778F
  • License MIT

An iterable asynchronous stream demultiplexer.

Package Exports

  • stream-demux

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (stream-demux) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

stream-demux

An asynchronous iterable stream demultiplexer.

Lets you write data to multiple async iterable streams from a central place without keeping any references to those streams. The StreamDemux class returns streams of type DemuxedAsyncIterableStream (base type AsyncIterableStream).
See https://github.com/SocketCluster/async-iterable-stream

Installation

npm install stream-demux

Usage

let demux = new StreamDemux();

(async () => {
  // Consume data from 'abc' stream.
  let substream = demux.stream('abc');
  for await (let packet of substream) {
    console.log('ABC:', packet);
  }
})();

(async () => {
  // Consume data from 'def' stream.
  let substream = demux.stream('def');
  for await (let packet of substream) {
    console.log('DEF:', packet);
  }
})();

(async () => {
  // Consume data from 'def' stream.
  // Can also work with a while loop for
  // older environments.
  // Can have multiple loops consuming the same
  // stream at the same time.
  let asyncIterator = demux.stream('def').getAsyncIterator();
  while (true) {
    let packet = await asyncIterator.next();
    if (packet.done) break;
    console.log('DEF (while loop):', packet.value);
  }
})();

(async () => {
  for (let i = 0; i < 10; i++) {
    await wait(10);
    demux.write('abc', 'message-abc-' + i);
    demux.write('def', 'message-def-' + i);
  }
  demux.end('abc');
  demux.end('def');
})();

// Utility function for using setTimeout() with async/await.
function wait(duration) {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, duration);
  });
}

Goal

The goal of this module is to efficiently distribute data to a large number of named asynchronous streams while facilitating functional programming patterns which decrease the probability of memory leaks.

Each stream returned by this module is responsible for picking up its own data from a shared source stream - This means that the stream-demux module doesn't hold any references to streams which it produces via its stream() method; this reduces the likelihood of programming mistakes which would lead to memory leaks because streams don't need to be destroyed or cleaned up explicitly.

The downside to making each stream responsible for consuming its own data is that having a lot of concurrent streams can have a negative impact on performance (especially if there are a lot of idle streams). A goal of stream-demux is to keep that overhead to a minimum.