Output Sandbox Interface

1. Recommendations

Since the sandbox does not run in isolation there are some expectations of how the host infrastructure behaves. The current recommendations are based on the Hindsight reference implementation.

2. Disabled Functionality

3. Required Lua Functions (called by the host)

3.1. process_message

Called when the host has a message available for analysis. Usually used in combination with a message matcher expression.

Recommenation: specify this as a message_matcher configuration option.

Arguments

  • sequence_id (optional: lightuserdata) - pass in when async_buffer_size is configured

Return

  • status_code (number) - see the Modes of Operation for a detail explanation of the return codes
    • fatal error (greater than zero)
    • success (0)
    • non fatal failure (-1)
    • skip (-2)
    • retry (-3)
    • batching (-4)
    • async output (-5)
  • status_message (optional: string) logged when the status code is -1

3.2. timer_event

Called when the host timer expires or on shutdown.

Recommendation: specify this as a ticker_interval configuration option.

Arguments

  • ns (number) - nanosecond timestamp of the function call (it is actually time_t * 1e9 to keep the timestamp units consistent so it will only have a one second resolution)
  • shutdown (bool) - true if timer_event is being called due to a shutdown

Return

  • none

4. Available C Functions (called from the plugin)

4.1. read_config

Provides access to the sandbox configuration variables.

Arguments

  • key (string) - configuration key name

Return

  • value (string, number, bool, table)

4.2. read_message

Provides access to the Heka message data. See read_message for details.

4.3. decode_message

Converts a Heka protobuf encoded message string into a Lua table. See decode_message for details.

4.4. encode_message

Returns a Heka protocol buffer message using the contents of the specified Lua table. Timestamp, Logger, Hostname and Pid are restricted header values. An override configuration option is provided restricted_headers; when true the headers are always set to the configuration values; when false (default) the headers are set to the values provided in the message table, if no value is provided it defaults to the appropriate configuration value.

Note: this operation uses the internal output buffer so it is goverened by the output_limit configuration setting.

Arguments

  • msg (Heka message table)
  • framed (bool default: false) A value of true includes the framing header

Return

  • heka_pb (string) - Heka protobuf binary string, framed as specified or an error is thrown

4.5. inject_message

Creates a new Heka protocol buffer message, in the input queue, using the contents of the specified Lua table. The restricted_headers configuration defaults to false (see encode_message above for a full description).

Arguments

Return

  • none (throws an error if the table does not match the Heka message schema)

4.6. create_message_matcher

Returns a Heka protocol buffer message matcher; used to dynamically filter messages sent to the output plugin.

Arguments

Return

  • message_matcher (userdata) - or an error is thrown The message matcher object has one method eval that returns true if the current message matches, false if it does not.

4.6.1. Example

See: heka_tcp_matcher.lua

4.7. update_checkpoint

4.7.1. Batch Mode

Advances the output checkpoint when in batching mode. The standard use case is to call it from timer_event after successfully flushing a batch on timeout/shutdown.

Arguments

  • none

4.7.2. Asynchronous Mode

Advances the output checkpoint and optionally reports the number of failures that occured.

Arguments

  • sequence_id (lightuserdata) - sequence_id for the message that was just successfully delivered/acknowledged
  • failures (optional: integer) - number of failures that occured in the asynchronus processing (added to the failure count)

Return

  • none (throws an error on invalid arg types)

4.8. Modes of Operation

4.8.1. Lock Step

  • process_message operates on the message and returns one of the following values:
    • success (0) - the message was successfully processed and the output checkpoint is advanced
    • failure (-1) - the message was not successfully processed
      • the failure count is incremented
      • any optional error message is written to the log
      • the message is skipped
      • the checkpoint is advanced
    • skip (-2) - the message was intentionally not processed and the checkpoint is advanced
    • retry (-3) - the message was not successfully processed and the host will call process_message again, with the same message, after a one second delay

4.8.2. Example Payload Output

-- cfg
message_matcher = "Type == 'inject_payload'"
ticker_interval = 0

--location where the payload is written
output_dir      = "/tmp"
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

require "io"
require "string"

local output_dir = read_config("output_dir") or "/tmp"

function process_message()
    local pt = read_message("Fields[payload_type]")
    if type(pt) ~= "string" then return -1, "invalid payload_type" end

    local pn = read_message("Fields[payload_name]") or ""
    if type(pn) ~= "string" then return -1, "invalid payload_name" end

    local logger = read_message("Logger") or ""

    pn = string.gsub(pn, "%W", "_")
    pt = string.gsub(pt, "%W", "_")
    logger = string.gsub(logger, "%W", "_")

    local fn = string.format("%s/%s.%s.%s", output_dir, logger, pn, pt)
    local fh, err = io.open(fn, "w")
    if err then return -1, err end

    local payload = read_message("Payload") or ""
    fh:write(payload)
    fh:close()
    return 0
end

function timer_event(ns)
    -- no op
end

4.8.3. Batching

  • process_message batches the message/transformation in memory or on disk and returns one of the following values:
    • batching (-4) - the message was successfully added to the batch
    • failure (-1) - the message cannot be batch
      • the failure count is incremented
      • any optional error message is written to the log
      • the message is skipped
    • skip (-2) - the message was intentionally not added to the batch
    • retry (-3) - the message was not successfully added to the batch and the host will call process_message again, with the same message, after a one second delay
    • success (0) - the batch has been successfully committed and the output checkpoint is advanced to the most recent message

4.8.4. Example Postgres Output

-- cfg
message_matcher = "Type == 'logfile'"
memory_limit = 0
ticker_interval = 60

buffer_max = 1000
db_config = {
    host = "example.com",
    port = 5432,
    name = "dev",
    user = "test",
    _password = "testpw",
}
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

require "os"
require "string"
require "table"

local driver = require "luasql.postgres"

local ticker_interval = read_config("ticker_interval")
local db_config       = read_config("db_config") or error("db_config must be set")
local buffer_max      = read_config("buffer_max") or 1000
assert(buffer_max > 0, "buffer_max must be greater than zero")

local env = assert(driver.postgres())
local con, err = env:connect(db_config.name, db_config.user, db_config._password, db_config.host, db_config.port)
assert(con, err)

local buffer = {}
local buffer_len = 0
local table_name = "test_table"
local MAX_LENGTH = 65535
local columns = {
--   column name        field name                  field type      field length
    {"msg_Timestamp"    ,"Timestamp"                ,"TIMESTAMP"    ,nil},
    {"payload"          ,"Payload"                  ,"VARCHAR"      ,MAX_LENGTH},
    {"sourceName"       ,"Fields[sourceName]"       ,"VARCHAR"      ,30},
    {"sourceVersion"    ,"Fields[sourceVersion]"    ,"VARCHAR"      ,12},
    {"submissionDate"   ,"Fields[submissionDate]"   ,"DATE"         ,nil},
    {"sampleId"         ,"Fields[sampleId]"         ,"SMALLINT"     ,nil},
}

local function make_create_table()
    local pieces = {"CREATE TABLE IF NOT EXISTS ", table_name, " ("}
    for i, c in ipairs(columns) do
        table.insert(pieces, c[1])
        table.insert(pieces, " ")
        table.insert(pieces, c[3])
        if c[4] ~= nil then
            table.insert(pieces, "(")
            table.insert(pieces, c[4])
            table.insert(pieces, ")")
        end
        if c[4] == MAX_LENGTH then
            table.insert(pieces, " ENCODE LZO")
        end
        if i < #columns then
            table.insert(pieces, ", ")
        end
    end
    table.insert(pieces, ")")
    return table.concat(pieces)
end
assert(con:execute(make_create_table()))

local function bulk_load()
    local cnt, err = con:execute(table.concat(buffer))
    if not err then
        buffer = {}
        buffer_len = 0
    else
        return err
    end
end

local function esc_str(v)
    if v == nil then return "NULL" end

    if type(v) ~= "string" then v = tostring(v) end
    if string.len(v) > MAX_LENGTH then
        v = "TRUNCATED:" .. string.sub(v, 1, MAX_LENGTH - 10)
    end

    local escd = con:escape(v)
    if not escd then return "NULL" end

    return string.format("'%s'", escd)
end

local function esc_num(v)
    if v == nil then return "NULL" end
    if type(v) ~= "number" then return esc_str(v) end
    return tostring(v)
end

local function esc_ts(v)
    if v == nil then return "NULL" end
    if type(v) ~= "number" then return esc_str(v) end
    local seconds = v / 1e9
    return table.concat({"(TIMESTAMP 'epoch' + ", seconds, " * INTERVAL '1 seconds')"})
end

local function make_insert(sep)
    local pieces = {sep, "("}
    for i=1,#columns do
        if i > 1 then
            table.insert(pieces, ",")
        end
        local col = columns[i]
        if col[3] == "TIMESTAMP" then
            table.insert(pieces, esc_ts(read_message(col[2])))
        elseif col[3] == "SMALLINT" then
            table.insert(pieces, esc_num(read_message(col[2])))
        else
            table.insert(pieces, esc_str(read_message(col[2])))
        end
    end
    table.insert(pieces, ")")
    return table.concat(pieces)
end

local last_load = 0
function process_message()
    local sep = ","
    if buffer_len == 0 then
        buffer_len = buffer_len + 1
        buffer[buffer_len] = string.format("INSERT INTO %s VALUES", table_name)
        sep = " "
    end
    buffer_len = buffer_len + 1
    buffer[buffer_len] = make_insert(sep)

    if buffer_len - 1 >= buffer_max then
        local err = bulk_load()
        if err then
            buffer[buffer_len] = nil
            buffer_len = buffer_len - 1
            return -3, err
        else
            last_load = os.time()
            return 0
        end
    end
    return -4
end

function timer_event(ns, shutdown)
    if buffer_len > 1 and (shutdown or last_load + ticker_interval <= ns / 1e9)then
        local err = bulk_load()
        if not err then
            update_checkpoint()
        end
    end
end

4.8.5. Asynchronous

  • async_buffer_size Recommendation: this configuration variable should be set and consumed by the host
  • process_message is called with a sequence_id parameter and asynchronously sends the message/transformation to the destination and returns one of the following values:
    • asynchronous (-5) - the message was successfully queued
    • failure (-1) - the message cannot be queue
      • the failure count is incremented
      • any optional error message is written to the log
      • the message is skipped
    • skip (-2) - the message was intentionally not queued
    • retry (-3) - the message was not successfully queued and the host will call process_message again, with the same message, after a one second delay
  • When an asynchronously sent message is acknowledged update_checkpoint MUST be called to advance the checkpoint to that specific message

4.8.6. Example Kafka Output

kafka.lua

results matching ""

    No results matching ""