JSPM

  • Created
  • Published
  • Downloads 79004
  • Score
    100M100P100Q155587F
  • License MIT

A collection of utilities that work with sync and async iterables and iterators. Designed to replace your streams.

Package Exports

  • streaming-iterables

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (streaming-iterables) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme

streaming-iterables

Build Status Try streaming-iterables on RunKit

A collection of utilities that work with sync and async iterables and iterators. Designed to replace your streams. Think some sort of combination of bluestream and ramda but for a much more simple construct, async iterators. The goal is to make it dead easy to replace your stream based processes with async iterators, which in general should make your code smaller, faster and have less bugs.

Contributors welcome!

Overview

Every function is curryable, you can call it with any number of arguments. For example:

import { map } from 'streaming-iterables'

for await (const str of map(String, [1,2,3])) {
  console.log(str)
}
// "1", "2", "3"

const stringable = map(String)
for await (const str of stringable([1,2,3])) {
  console.log(str)
}
// "1", "2", "3"

Since this works with async iterators it polyfills Symbol.asyncIterator if it doesn't exist. (Not an issue since node 10.)

if ((Symbol as any).asyncIterator === undefined) {
  ;(Symbol as any).asyncIterator = Symbol.for('asyncIterator')
}

Types

Iterableish

type Iterableish<T> = Iterable<T> | Iterator<T> | AsyncIterable<T> | AsyncIterator<T>

Any iterable or iterator.

AnyIterable

type AnyIterable<T> = Iterable<T> | AsyncIterable<T>

Literally any Iterable (async or regular).

API

batch

function batch<t>(size: number, iterable: AnyIterable<T>): AsyncIterableIterator<T[]>

Batch objects from iterable into arrays of size length. The final array may be shorter than size if there is not enough items.

import { batch } from 'streaming-iterables'
import { getPokemon, trainMonster } from './util'

// Instantly load 10 monsters while we process them
for await (const monstes of buffer(10, getPokemon())) {
  console.log(monsters) // 10 pokemon at a time!
}

buffer

function buffer<T>(size: number, iterable: AnyIterable<T>): AsyncIterableIterator<T>

Buffer keeps a number of objects in reserve available for immediate reading. This is helpful with async iterators as it will prefetch results so you don't have to wait for them to load.

import { buffer } from 'streaming-iterables'
import { getPokemon, trainMonster } from './util'

// Instantly load 10 monsters while we process them
for await (const monster of buffer(10, getPokemon())) {
  await trainMonster(monster) // got to do some pokéwork
}

collect

function collect<T>(iterable: AnyIterable<T>): Promise<T[]>

Collect all the values from an iterable into an array.

import { collect } from 'streaming-iterables'
import { getPokemon } from './util'

console.log(await collect(getPokemon()))
// [bulbasaur, ivysaur, venusaur, charmander, ...]

concat

function concat(...iterables: Array<AnyIterable<any>>): AsyncIterableIterator<any>

Combine multiple iterators into a single iterable. Reads each iterable one at a time.

import { concat } from 'streaming-iterables'
import { getPokemon, getTransformers } from './util'

for await (const hero of concat(getPokemon(2), getTransformers(2))) {
  console.log(hero)
}
// charmander
// bulbasaur <- end of pokemon
// megatron
// bumblebee <- end of transformers

consume

function consume<T>(iterator: AnyIterable<T>): Promise<void>

A promise that resolves after the function drains the iterable of all data. Useful for processing a pipeline of data.

import { consume, map } from 'streaming-iterables'
import { getPokemon, trainMonster } from './util'

const train = map(trainMonster)
await consume(train(getPokemon())) // load all the pokemon and train them!

flatten

function flatten(iterable: AnyIterable<any>): AsyncIterableIterator<any>

Returns a new iterator by pulling every item out of iterable (and all its sub iterables) and yielding them depth-first. Checks for the iterable interfaces and iterates it if it exists.

note: Typescript doesn't have recursive types so we use any

import { flatten } from 'streaming-iterables'

for await (const item of flatten([1, 2, [3, [4, 5], 6], 7, 8])) {
  console.log(item)
}
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8

filter

function filter<T>(filterFunc: (data: T) => boolean | Promise<boolean>, iterable: AnyIterable<T>): AsyncIterableIterator<T>

Takes a filterFunc and a iterable, and returns a new async iterator of the same type containing the members of the given iterable which cause the filterFunc to return true.

import { filter } from 'streaming-iterables'
import { getPokemon } from './util'

const filterWater = filter(pokemon => pokemon.elements.include('water'))

for await (const pokemon of filterWater(getPokemon())) {
  console.log(pokemon)
}
// squirtle
// vaporeon
// magikarp

getIterator

function getIterator<T>(values: Iterableish<T>): Iterator<T> | AsyncIterator<T>

Get the iterator from any iterable or just return an iterator itself.

map

function map<T, B>(func: (data: T) => B | Promise<B>, iterable: AnyIterable<T>): AsyncIterableIterator<B>

Map a function or async function over all the values of an iterable.

import { consume, map } from 'streaming-iterables'
import got from 'got'

const urls = ['https://http.cat/200', 'https://http.cat/201', 'https://http.cat/202']
const download = map(got)

for await (page of download(urls)) {
  console.log(page)
}

merge

function merge(...iterables: Array<AnyIterable<any>>): AsyncIterableIterator<any>

Combine multiple iterators into a single iterable. Reads one item off each iterable in order repeatedly until they are all exhausted.

parallelMap

function parallelMap<T, R>(concurrency: number, func: (data: T) => R | Promise<R>, iterable: AnyIterable<T>): AsyncIterableIterator<R>

parallelMerge

function parallelMerge<T>(...iterables: Array<AnyIterable<T>>): AsyncIterableIterator<T>

Combine multiple iterators into a single iterable. Reads one item off of every iterable and yields them as they resolve. This is useful for pulling items out of an array of iterables as soon as they're available.

import { parallelMerge } from 'streaming-iterables'
import { getPokemon, trainMonster } from './util'

// pokemon are much faster to load btw
const heros = parallelMerge(getPokemon(), trainMonster())
for await (const hero of heros) {
  console.log(hero)
}
// charmander
// bulbasaur
// megatron
// pikachu
// eevee
// bumblebee
// jazz

reduce

function reduce<T, B>(func: (acc: B, value: T) => B, start: B, iterable: AnyIterable<T>): Promise<B>;

An async function that takes a reducer function, an initial value and .

Reduces an iterable to a value which is the accumulated result of running each value from the iterable thru func, where each successive invocation is supplied the return value of the previous.

take

function take<T>(count: number, iterable: AnyIterable<T>): AsyncIterableIterator<T>

A passthrough iterator that reads a specific number of items from an iterator.

tap

function tap<T>(func: (data: T) => any, iterable: AnyIterable<T>): AsyncIterableIterator<T>

A passthrough iterator that yields the data it consumes passing the data through to a function. If you provide an async function the iterator will wait for the promise to resolve before yielding the value. This is useful for logging, or processing information and passing it along.

Contributors needed!

Writing docs and code is a lot of work! Thank you in advance for helping out.