JSPM

  • Created
  • Published
  • Downloads 446
  • Score
    100M100P100Q85402F
  • License MIT

kafka client (consumer + producer); polite out of the box

Package Exports

  • sinek

This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (sinek) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.

Readme


node-sinek

Build Status license 5cf

kafka client (consumer + producer) polite out of the box

make it about them, not about you

  • Simon Sinek

info

  • promise based api
  • core builds kafka-node module (checkout for options & tweaking)
  • uses ConsumerGroup(s) means your kafka needs to be > 0.9.x ( - 0.10.2+)
  • check out :goberserk: kafka-streams for a stream based kafka api

offers

  • provides an incoming message flow control for consumers
  • provides a drain once for consumers
  • provides an easy api for producers
  • Documentation is still wip; checkout /test/int/Sinek.test.js

install

npm install --save sinek

test

//requires a localhost kafka broker + zookeeper @ localhost:2181
npm test

Usage

const {Kafka, Drainer, Publisher, PartitionDrainer} = require("sinek");

producer (Publisher)

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeProducer([TEST_TOPIC], CLIENT_NAME, OPTIONS);

kafkaClient.on("ready", () => {
    producer = new Publisher(kafkaClient, PARTITION_COUNT); //partition count should be the default count on your brokers partiitons e.g. 30
    
    producer.send(topic, messages, partitionKey, partition, compressionType)
    producer.batch(topic, [])
    
    producer.appendBuffer(topic, identifier, object, compressionType)
    producer.flushBuffer(topic)
    
    //easy api that uses a KeyedPartitioner Type and identifies the
    //target partition for the object's identifier by itself
    //it also brings your payload (object) in perfect shape for 
    //a nicely consumeable topic
    //call producer.flushBuffer(topic) to batch send the payloads
    producer.bufferPublishMessage(topic, identifier, object, version, compressionType)
    producer.bufferUnpublishMessage(topic, identifier, object, version, compressionType)
    producer.bufferUpdatehMessage(topic, identifier, object, version, compressionType)
});

kafkaClient.on("error", err => console.log("producer error: " + err));

consumer (Drainer)

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);

kafkaClient.on("ready", () => {
    consumer = new Drainer(kafkaClient, 1); //1 = thread/worker/parallel count
    
    consumer.drain((message, done) => {
        console.log(message);
        done();
    });
    
    consumer.stopDrain();
    
    consumer.drainOnce((message, done) => {
        console.log(message);
        done();
    }, DRAIN_THRESHOLD, DRAIN_TIMEOUT).then(r => {
        console.log("drain done: " + r);
    }).catch(e => {
        console.log("drain timeout: " + e);
    });
    
    consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
});

kafkaClient.on("error", err => console.log("consumer error: " + err));

consumer (PartitionDrainer) [faster ~ runs a queue per topic partition]

const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);

kafkaClient.on("ready", () => {
    consumer = new PartitionDrainer(kafkaClient, 1); //1 = thread/worker/parallel count per partition
    
    //drain requires a topic-name and returns a promise 
    consumer.drain(TEST_TOPIC, (message, done) => {
        console.log(message);
        done();
    }).then(_ => ..).catch(e => console.log(e));
    
    consumer.stopDrain();
    
    //drainOnce requires a topic-name
    consumer.drainOnce(TEST_TOPIC, (message, done) => {
        console.log(message);
        done();
    }, DRAIN_THRESHOLD, DRAIN_TIMEOUT).then(r => {
        console.log("drain done: " + r);
    }).catch(e => {
        console.log("drain timeout: " + e);
    });
    
    consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
});

kafkaClient.on("error", err => console.log("consumer error: " + err));

hints

  • interesting options for tweaking consumers
const OPTIONS = {
    sessionTimeout: 12500,
    protocol: ["roundrobin"],
    fromOffset: "latest", //earliest
    fetchMaxBytes: 1024 * 100,
    fetchMinBytes: 1,
    fetchMaxWaitMs: 100,
    autoCommit: true,
    autoCommitIntervalMs: 5000
};
  • remove and create topic api will require a special broker configuration or these will just result in nothing at all
drainer.removeTopics([]).then(..)
publisher.createTopics([]).then(..)
  • using the .getStats() functions on Drainer, Publisher or PartitionDrainer you can get some valueable insights into whats currently going on in your client

  • when using "Drainer" to consume and write upserts into a database that require ACID functionality and a build-up of models/message-payloads you must set the AsyncLimit of new Drainer(.., 1) to "1" or you will have trouble with data integrity

  • if your data is spread entity wise above partitions you can use the "PartitionDrainer" to drain multiple partitions at the same time

  • the "Publisher" offers a simple API to create such (keyed) partitioned topics

  • it is probably a good idea to spawn a Consumer per Topic