JSPM

  • Created
  • Published
  • Downloads 485
  • Score
    100M100P100Q85731F
  • License MIT

kafka client (consumer + producer); polite out of the box

Package Exports

  • sinek

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (sinek) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme


node-sinek

license 5cf

kafka client (consumer + producer) polite out of the box

make it about them, not about you

  • Simon Sinek

info

  • ~ 80% test coverage + promises
  • core builds kafka-node module (checkout for options & tweaking)
  • uses ConsumerGroup(s) means your kafka needs to be > 0.9.x ( - 0.10.2+)

offers

  • provides an incoming message flow control for consumers
  • provides a drain once for consumers
  • provides an easy api for producers
  • Documentation is still wip; checkout /test/int/Sinek.test.js

install

npm install --save sinek

test

//requires a localhost kafka broker + zookeeper @ localhost:2181
npm test

Usage

const {Kafka, Drainer, Publisher} = require("sinek");

producer (Publisher)

const kp = new Kafka(ZK_CON_STR, LOGGER);
kp.becomeProducer([TEST_TOPIC], CLIENT_NAME, OPTIONS);

kp.on("ready", () => {
    producer = new Publisher(kp);
    
    producer.send()
    producer.batch(TEST_TOPIC, [])
});

kp.on("error", err => console.log("producer error: " + err));

consumer (Drainer)

const kc = new Kafka(ZK_CON_STR, LOGGER);
kc.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);

kc.on("ready", () => {
    consumer = new Drainer(kc, 1); //1 = thread/worker/parallel count
    
    consumer.drain((message, done) => {
        console.log(message);
        done();
    });
    
    consumer.stopDrain();
    
    consumer.drainOnce((message, done) => {
        console.log(message);
        done();
    }, DRAIN_THRESHOLD, DRAIN_TIMEOUT).then(r => {
        console.log("drain done: " + r);
    }).catch(e => {
        console.log("drain timeout: " + e);
    });
    
    consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
});

kc.on("error", err => console.log("consumer error: " + err));

hints

//interesting options for tweaking consumers
const OPTIONS = {
    sessionTimeout: 12500,
    protocol: ["roundrobin"],
    fromOffset: "latest", //earliest
    fetchMaxBytes: 1024 * 100,
    fetchMinBytes: 1,
    fetchMaxWaitMs: 100,
    autoCommit: true,
    autoCommitIntervalMs: 5000
};

//remove and create topic api will require
//a special broker configuration or these
//will just result in nothing at all
drainer.removeTopics([]).then(..)
publisher.createTopics([]).then(..)