Lua Kafka Module
1. Overview
A Kafka producer/consumer library for Lua
2. Module
2.1. Example Usage
require "kafka"
local consumer = kafka.consumer("localhost:9092",
{"test"},
{["group.id"] = "example"},
{["auto.offset.reset"] = "smallest"}
)
msg, topic, partition, key = consumer:receive()
if msg then
-- consume msg
end
2.2. Functions
2.2.1. producer
Creates a Kafka producer.
local brokerlist = "localhost:9092"
local producer_conf = {
["queue.buffering.max.messages"] = 20000,
["batch.num.messages"] = 200,
["message.max.bytes"] = 1024 * 1024,
["queue.buffering.max.ms"] = 10,
["topic.metadata.refresh.interval.ms"] = -1,
}
local producer = kafka.producer(brokerlist, producer_conf)
Arguments
- brokerlist (string) - librdkafka broker string
- producer_conf (table) - librdkafka producer configuration
Return
- producer (userdata) - Kafka producer or an error is thrown
2.2.2. consumer
Creates a Kafka consumer.
local brokerlist = "localhost:9092"
local topics = {"test"}
local consumer_conf = {["group.id"] = "test_g1"})
local topic_conf = nil
local consumer = kafka.consumer(brokerlist, topics, consumer_conf, topic_conf)
Arguments
- brokerlist (string) - librdkafka broker string
- topics (array of 'topic[:partition]' strings) - Balanced consumer group mode a consumer can only subscribe on topics, not topics:partitions. The partition syntax is only used for manual assignments (without balanced consumer groups).
- consumer_conf (table) - must contain 'group.id' see: librdkafka consumer configuration
- topic_conf (table, optional) - librdkafka topic configuration
Return
- consumer (userdata) - Kafka consumer or an error is thrown
2.2.3. version
require "kafka"
local v = kafka.version()
-- v == "1.0.0"
Returns a string with the running version of Kafka module.
Arguments
- none
Return
- Semantic version string
2.3. Producer Methods
2.3.1. create_topic
Creates a topic to be used by a producer, no-op if the topic already exists.
producer:create_topic(topic, topic_conf) -- creates the topic if it does not exist
Arguments
- topic (string) - Name of the topic
- topic_conf (table, optional) - librdkafka topic configuration
Return
- none
2.3.2. has_topic
Tests if a producer is managing a topic.
local b = producer:has_topic(topic)
Arguments
- topic (string) - Name of the topic
Return
- bool - True if the producer is managing a topic with the specificed name
2.3.3. destroy_topic
Removes a topic from the producer.
producer:destroy_topic(topic)
Arguments
- topic (string) - Name of the topic
Return
- none - no-op on non-existent topic
2.3.4. send
Sends a message using the specified topic.
local ret = producer:send(topic, -1, sequence_id, message)
Arguments
- topic (string) - Name of the topic
- partition (number) - Topic partition number (-1 for automatic assignment)
- sequence_id
- lua_sandbox (lightuserdata) - Opaque pointer for checkpointing
- Lua 5.1 (number) - range: zero to UINTPTR_MAX
- message
- heka_sandbox (string/userdata)
- string - message to send
- userdata - zero copy specifier
- Lua 5.1 (string) - Message to send
- heka_sandbox (string/userdata)
Return
- ret (number) - 0 on success or errno
- ENOBUFS (105) maximum number of outstanding messages has been reached
- EMSGSIZE (90) message is larger than configured max size
- ESRCH (2) requested partition is unknown in the Kafka cluster
- ENOENT (3) topic is unknown in the Kafka cluster
2.3.5. poll
Polls the provided Kafka producer for events and invokes callback. This should be called after every send.
local sequence_id, failures = producer:poll()
Arguments
* Lua 5.1
* timeout (number/nil/none) - timeout in ms (default 0 non-blocking).
Use -1 to wait indefinitely.
* heka_sandbox
* none
Return
* Lua 5.1
* sequence_id (number/nil) - Sequence number of the last message
processed
* failures (number) - number of messages that failed
* heka_sandbox
* none - the checkpoint and error counts are automatically updated
2.4. Consumer Methods
2.4.1. receive
Receives a message from the specified Kafka topic(s).
local msg, topic, partition, key = consumer:receive()
Arguments
- none
Return
- msg (string) - Kafka message payload
- topic (string) - Topic name the message was received from
- partition (number) - Topic partition the message was received from
- key (string) - Message key (if available)