S3 Output Partitioner/Parquet output

Batches message data into Parquet files based on the specified S3 path dimensions and copies them to S3 when they reach the maximum size or maximum age. Note: For now this plugin renames the file to "*.done" and a separate process takes care of the S3 upload.

Sample Configuration

filename        = "s3_parquet.lua"
message_matcher = "Type == 'telemetry' && Fields[docType] == 'testpilot'"
ticker_interval = 60
preserve_data   = false

parquet_schema = [=[
    message testpilot {
        required binary id (UTF8);
        optional binary clientId (UTF8);
        required group metadata {
            required int64  Timestamp;
            required binary submissionDate (UTF8);
            optional binary Date (UTF8);
            optional binary normalizedChannel (UTF8);
            optional binary geoCountry (UTF8);
            optional binary geoCity (UTF8);
        }
        optional group application {
            optional binary name (UTF8);
        }
        optional group environment {
            optional group system {
                optional group os {
                    optional binary name (UTF8);
                    optional binary version (UTF8);
                }
            }
        }
        optional group payload {
            optional binary version (UTF8);
            optional binary test (UTF8);
            repeated group events {
                optional int64  timestamp;
                optional binary event (UTF8);
                optional binary object (UTF8);
            }
        }
    }
]=]

-- The name of a top level parquet group used to specify additional information
-- to be extracted from the message (using read_message). If the column name
-- matches a Heka message header name the data is extracted from 'msg.name'
-- otherwise the data is extracted from msg.Fields[name]
metadata_group = "metadata"

-- Array of Heka message variables containing JSON strings. The decoded JSON
-- objects are assembled into a record that is dissected based on the parquet
-- schema. This provides a generic way to cherry pick and re-combine the
-- segmented JSON structures like the Mozilla telemetry pings. A table can be
-- passed as the first value either empty or with some pre-seeded values.
-- If not specified the schema is applied directly to the Heka message.
json_objects = {"Fields[submission]", "Fields[environment.system]"}

s3_path_dimensions  = {
    -- access message data with using read_message()
    {name = "_submission_date", source = "Fields[submissionDate]"},
    -- access Timestamp using read_message() and then encode it using the dateformat string.
    -- scaling_factor is multiplied by source to output timestamp seconds. The default is 1e-9,
    -- which implies that source is in nanoseconds.
    {name = "_submission_date", source = "Timestamp", dateformat = "%Y-%m-%d-%H", scaling_factor = 1e-9},
    -- access the record data with a path array
    -- {name = "_submission_date", source = {"metadata", "submissionDate"}}
}

-- directory location to store the intermediate output files
batch_dir       = "/var/tmp/parquet"

-- Specifies how many parquet writers can be opened at once. If this value is
-- exceeded the least-recently used writer will have its data finalize and be
-- closed. The default is 100. A value of 0 means no maximum **warning** if
-- there are a large number of partitions this can easily run the system out of
-- file handles and/or memory.
max_writers         = 100

-- Specifies how many records to aggregate before creating a rowgroup
-- (default 10000)
max_rowgroup_size   = 10000

-- Specifies how much data (in bytes) can be written to a single file before
-- it is finalized. The file size is only checked after each rowgroup write
-- (default 300MiB).
max_file_size       = 1024 * 1024 * 300

-- Specifies how long (in seconds) to wait before the file is finalized
-- (default 1 hour).  Idle files are only checked every ticker_interval seconds.
max_file_age        = 60 * 60

-- This option causes the field name to be converted to a hive compatible column
-- name in the parquet output. The conversion snake cases the field name and
-- replaces any non [-_a-z0-9] characters with an underscore.
-- e.g. FooBar? -> foo_bar_
hive_compatible     = true -- default false

source code: s3_parquet.lua