Heka Message Conversion into a Column Based Schema

1. Functions

1.1. load_schema

Validates the schema and returns the process_message and timer_event function to be used for the Heka sandbox API or throws an error.

Arguments

  • name (string) - derived stream/schema name
  • schema (table) - schema table as described below

Return

  • process_message (function)
  • timer_event (function)

2. Schema

The schema is a lua table consisting of five columns:

  1. column name - The name of the field in the output. . For protobuf output if it exactly matches a message header name the header . variable will be used otherwise it is added in the message Fields table.
  2. type - http://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html
  3. length - Maximum length for string fields (nil for everything else)
  4. attributes - http://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html
  5. field /function - If this is a string the data is retrieved from read_message(field) otherwise the provided function is invoked and its return value is used for the column data.

2.1. Schema Example

local schema = {
--  column name  type                length  attributes  field /function
    {"Timestamp" ,"TIMESTAMP"        ,nil    ,"SORTKEY"  ,"Timestamp"},
    {"clientId"  ,"CHAR"             ,36     ,"DISTKEY"  ,"Fields[clientId]"},
    {"version"   ,"VARCHAR"          ,32     ,nil        ,get_version}
}

3. Configuration Examples

format      = "redshift.sql"
buffer_dir = "/mnt/output" -- directory where the temporary buffer files are stored
buffer_size = 10000 * 1000  -- size of the largest buffer before performing a multi-line insert max 16MB - 8KB
ts_field    = "Timestamp"   -- default

db_config = {
    host = "example.com",
    port = 5432,
    keepalives = 1,
    keepalives_idle = 30,
    dbname = "pipeline",
    user = "user",
    _password = "password",
}
format      = "redshift.psv"
buffer_dir = "/mnt/output"
buffer_size = 100 * 1024 * 1024 -- max 1GiB
s3_uri     = "s3://test"
format = "tsv"
output_dir = "/mnt/output" -- directory where the daily output files are written
ts_field    = "Timestamp"   -- default
nil_value   = "NULL"        -- defaults to an empty string
format = "protobuf"
output_dir = "/mnt/output" -- directory where the daily output files are written
ts_field    = "Timestamp"   -- default

source code: derived_stream.lua

results matching ""

    No results matching ""