GCP Pub/Sub Module
1. Overview
Lua wrapper for the GCP gRPC Pub/Sub APIs. The synchronous APIs are included for test purposes, unless the traffic volume is tiny you would not want to use them in production.
2. Module
2.1. Example Usage
require "gcp.pubsub"
2.2. Functions
2.2.1. subscriber
Creates a GCP pub/sub subscriber.
local sub = gcp.pubsub.subscriber(channel, topic, subscription_name, max_async_requests)
Arguments
- channel (string) e.g. "pubsub.googleapis.com"
- topic (string) e.g. "projects/MyProject/topics/MyTopic" -- used to validate the subscription topic or create the subscription if necessary
- subscription_name (string) e.g. "MySubscription"
- max_async_requests (integer) Defaults to 20 (0 synchronous only)
Return
- subscriber (userdata) or an error is thrown
2.2.2. publisher
Creates a GCP pub/sub publisher.
local publisher = gcp.pubsub.publisher(channel, topic, max_async_requests)
Arguments
- channel (string) e.g. "pubsub.googleapis.com"
- topic (string) e.g. "projects/MyProject/topics/MyTopic"
- max_async_requests (integer) Defaults to 20 (0 synchronous only)
- batch_size (integer) Defaults to 1000
Return
- publisher (userdata) or an error is thrown
2.3. subscriber Methods
2.3.1. pull/pull_sync
Reads a set of messages from the pub/sub topic.
local msgs, cnt = subscriber:pull(batch_size)
Arguments
- batch_size (integer) Number of items in a single request (1-1000)
Returns
- msgs (array/nil) One or more messages (can throw on error)
The array contains an array for each message, column one is the data
payload and column two is nil or the attribute table
msgs = { {data, attribute_table}, ...}
- cnt (string/nil) Number of messsages returned
2.3.2. ack
Send the ack for the last set of messages received (always invoked by pull*).
subscriber:ack()
Arguments
- none
Returns
- none
2.4. publisher Methods
2.4.1. publish/publish_sync
Writes a message to the pub/sub topic.
local ret = publisher:publish(sequence_id, msg, attributes)
local ret = publisher:publish_sync(msg, attributes)
Arguments
- sequence_id Used with publish() only
- lua_sandbox (lightuserdata) Opaque pointer for checkpointing
- Lua 5.1 (number) range: zero to UINTPTR_MAX
- msg (string/(userdata/nil lua_sandbox only)) Message to send
- nil uses msg.Payload as the msg and coverts everything else to string attributes (headers overwrite fields if there is a naming conflict)
- attributes (nil/table) Lua 5.1 only, map of string = tostring(val)
Return
- status_code (integer) or throws an error
- sent (0)
- retry (-3)
- batched (-4)
- async (-5)
- err (nil/string) error message
2.4.2. flush
Flushes the batched messages over the network.
publisher.flush()
Arguments none
Return
- none or throws an error
2.4.3. poll
Polls the CompletionQueue to process the asynchronous publish responses. This should be called after every send.
producer:poll()
Arguments
- none
Return
- Lua 5.1
- sequence_id (number/nil) - Sequence number of the last message processed
- failures (number) - number of batches that failed
- lua_sandbox
- none - the checkpoint and error counts are automatically updated