Lua Kafka Module

1. Overview

A Kafka producer/consumer library for Lua

2. Module

2.1. Example Usage

require "kafka"
local consumer = kafka.consumer("localhost:9092",
                                {"test"},
                                {["group.id"] = "example"},
                                {["auto.offset.reset"] = "smallest"}
                               )
msg, topic, partition, key = consumer:receive()
if msg then
    -- consume msg
end

2.2. Functions

2.2.1. producer

Creates a Kafka producer.

local brokerlist    = "localhost:9092"
local producer_conf = {
    ["queue.buffering.max.messages"] = 20000,
    ["batch.num.messages"] = 200,
    ["message.max.bytes"] = 1024 * 1024,
    ["queue.buffering.max.ms"] = 10,
    ["topic.metadata.refresh.interval.ms"] = -1,
}
local producer = kafka.producer(brokerlist, producer_conf)

Arguments

Return

  • producer (userdata) - Kafka producer or an error is thrown

2.2.2. consumer

Creates a Kafka consumer.

local brokerlist    = "localhost:9092"
local topics        = {"test"}
local consumer_conf = {["group.id"] = "test_g1"})
local topic_conf    = nil
local consumer   = kafka.consumer(brokerlist, topics, consumer_conf, topic_conf)

Arguments

Return

  • consumer (userdata) - Kafka consumer or an error is thrown

2.2.3. version

require "kafka"
local v = kafka.version()
-- v == "1.0.0"

Returns a string with the running version of Kafka module.

Arguments

  • none

Return

  • Semantic version string

2.3. Producer Methods

2.3.1. create_topic

Creates a topic to be used by a producer, no-op if the topic already exists.

producer:create_topic(topic, topic_conf) -- creates the topic if it does not exist

Arguments

Return

  • none

2.3.2. has_topic

Tests if a producer is managing a topic.

local b = producer:has_topic(topic)

Arguments

  • topic (string) - Name of the topic

Return

  • bool - True if the producer is managing a topic with the specificed name

2.3.3. destroy_topic

Removes a topic from the producer.

producer:destroy_topic(topic)

Arguments

  • topic (string) - Name of the topic

Return

  • none - no-op on non-existent topic

2.3.4. send

Sends a message using the specified topic.

local ret = producer:send(topic, -1, sequence_id, message)

Arguments

  • topic (string) - Name of the topic
  • partition (number) - Topic partition number (-1 for automatic assignment)
  • sequence_id
    • lua_sandbox (lightuserdata) - Opaque pointer for checkpointing
    • Lua 5.1 (number) - range: zero to UINTPTR_MAX
  • message
    • heka_sandbox (string/userdata)
      • string - message to send
      • userdata - zero copy specifier
    • Lua 5.1 (string) - Message to send

Return

  • ret (number) - 0 on success or errno
    • ENOBUFS (105) maximum number of outstanding messages has been reached
    • EMSGSIZE (90) message is larger than configured max size
    • ESRCH (2) requested partition is unknown in the Kafka cluster
    • ENOENT (3) topic is unknown in the Kafka cluster

2.3.5. poll

Polls the provided Kafka producer for events and invokes callback. This should be called after every send.

local sequence_id, failures = producer:poll()

Arguments

* Lua 5.1
    * timeout (number/nil/none) - timeout in ms (default 0 non-blocking).
      Use -1 to wait indefinitely.
* heka_sandbox
    * none

Return

* Lua 5.1
    * sequence_id (number/nil) - Sequence number of the last message
      processed
    * failures (number) - number of messages that failed
* heka_sandbox
    * none - the checkpoint and error counts are automatically updated

2.4. Consumer Methods

2.4.1. receive

Receives a message from the specified Kafka topic(s).

local msg, topic, partition, key = consumer:receive()

Arguments

  • none

Return

  • msg (string) - Kafka message payload
  • topic (string) - Topic name the message was received from
  • partition (number) - Topic partition the message was received from
  • key (string) - Message key (if available)

results matching ""

    No results matching ""