S3 Output Partitioner/Parquet output
Batches message data into Parquet files based on the specified S3 path dimensions and copies them to S3 when they reach the maximum size or maximum age. Note: For now this plugin renames the file to "*.done" and a separate process takes care of the S3 upload.
0.0.1. Sample Configuration
filename = "s3_parquet.lua"
message_matcher = "Type == 'telemetry' && Fields[docType] == 'testpilot'"
ticker_interval = 60
preserve_data = false
parquet_schema = [=[
message testpilot {
required binary id (UTF8);
optional binary clientId (UTF8);
required group metadata {
required int64 Timestamp;
required binary submissionDate (UTF8);
optional binary Date (UTF8);
optional binary normalizedChannel (UTF8);
optional binary geoCountry (UTF8);
optional binary geoCity (UTF8);
}
optional group application {
optional binary name (UTF8);
}
optional group environment {
optional group system {
optional group os {
optional binary name (UTF8);
optional binary version (UTF8);
}
}
}
optional group payload {
optional binary version (UTF8);
optional binary test (UTF8);
repeated group events {
optional int64 timestamp;
optional binary event (UTF8);
optional binary object (UTF8);
}
}
}
]=]
-- optionally load the schema from disk instead of specifying `parquet_schema` (not allowed from the admin UI)
-- parquet_schema_file = "/usr/share/mozilla-pipeline-schemas/telemetry/new-profile/new-profile.4.parquetmr.txt"
-- The name of a top level parquet group used to specify additional information
-- to be extracted from the message (using read_message). If the column name
-- matches a Heka message header name the data is extracted from 'msg.name'
-- otherwise the data is extracted from msg.Fields[name]
metadata_group = "metadata"
-- metadata_prefix = nil -- if this is set any root level schema object
-- containing this prefix will be treated as metadata
-- Array of Heka message variables containing JSON strings. The decoded JSON
-- objects are assembled into a record that is dissected based on the parquet
-- schema. This provides a generic way to cherry pick and re-combine the
-- segmented JSON structures like the Mozilla telemetry pings. A table can be
-- passed as the first value either empty or with some pre-seeded values.
-- If not specified the schema is applied directly to the Heka message.
json_objects = {"Fields[submission]", "Fields[environment.system]"}
s3_path_dimensions = {
-- access message data with using read_message()
{name = "_submission_date", source = "Fields[submissionDate]"},
-- access Timestamp using read_message() and then encode it using the dateformat string.
-- scaling_factor is multiplied by source to output timestamp seconds. The default is 1e-9,
-- which implies that source is in nanoseconds.
{name = "_submission_date", source = "Timestamp", dateformat = "%Y-%m-%d-%H", scaling_factor = 1e-9},
-- access the record data with a path array
-- {name = "_submission_date", source = {"metadata", "submissionDate"}}
}
-- directory location to store the intermediate output files
batch_dir = "/var/tmp/parquet"
-- Specifies how many parquet writers can be opened at once. If this value is
-- exceeded the least-recently used writer will have its data finalize and be
-- closed. The default is 100. A value of 0 means no maximum **warning** if
-- there are a large number of partitions this can easily run the system out of
-- file handles and/or memory.
max_writers = 100
-- Specifies how many records to aggregate before creating a rowgroup
-- (default 10000)
max_rowgroup_size = 10000
-- Specifies how much data (in bytes) can be written to a single file before
-- it is finalized. The file size is only checked after each rowgroup write
-- (default 300MiB).
max_file_size = 1024 * 1024 * 300
-- Specifies how long (in seconds) to wait before the file is finalized
-- (default 1 hour). Idle files are only checked every ticker_interval seconds.
max_file_age = 60 * 60
-- This option causes the field name to be converted to a hive compatible column
-- name in the parquet output. The conversion snake cases the field name and
-- replaces any non [-_a-z0-9] characters with an underscore.
-- e.g. FooBar? -> foo_bar_
hive_compatible = true -- default false
source code: s3_parquet.lua