Heka Message Conversion into a Column Based Schema
1. Functions
1.1. load_schema
Validates the schema and returns the process_message and timer_event function to be used for the Heka sandbox API or throws an error.
Arguments
- name (string) - derived stream/schema name
- schema (table) - schema table as described below
Return
- process_message (function)
- timer_event (function)
2. Schema
The schema is a lua table consisting of five columns:
- column name - The name of the field in the output. . For protobuf output if it exactly matches a message header name the header . variable will be used otherwise it is added in the message Fields table.
- type - http://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html
- length - Maximum length for string fields (nil for everything else)
- attributes - http://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html
- field /function - If this is a string the data is retrieved from read_message(field) otherwise the provided function is invoked and its return value is used for the column data.
2.1. Schema Example
local schema = {
-- column name type length attributes field /function
{"Timestamp" ,"TIMESTAMP" ,nil ,"SORTKEY" ,"Timestamp"},
{"clientId" ,"CHAR" ,36 ,"DISTKEY" ,"Fields[clientId]"},
{"version" ,"VARCHAR" ,32 ,nil ,get_version}
}
3. Configuration Examples
format = "redshift.sql"
buffer_dir = "/mnt/output" -- directory where the temporary buffer files are stored
buffer_size = 10000 * 1000 -- size of the largest buffer before performing a multi-line insert max 16MB - 8KB
ts_field = "Timestamp" -- default
db_config = {
host = "example.com",
port = 5432,
keepalives = 1,
keepalives_idle = 30,
dbname = "pipeline",
user = "user",
_password = "password",
}
format = "redshift.psv"
buffer_dir = "/mnt/output"
buffer_size = 100 * 1024 * 1024 -- max 1GiB
s3_uri = "s3://test"
format = "tsv"
output_dir = "/mnt/output" -- directory where the daily output files are written
ts_field = "Timestamp" -- default
nil_value = "NULL" -- defaults to an empty string
format = "protobuf"
output_dir = "/mnt/output" -- directory where the daily output files are written
ts_field = "Timestamp" -- default
source code: derived_stream.lua