mozilla

hekad

Description

Available hekad plugins compiled with this version of hekad.

Inputs

Common Input Parameters

New in version 0.9.

There are some configuration options that are universally available to all Heka input plugins. These will be consumed by Heka itself when Heka initializes the plugin and do not need to be handled by the plugin-specific initialization code.

  • decoder (string, optional):

    Decoder to be used by the input. This should refer to the name of a registered decoder plugin configuration. If supplied, messages will be decoded before being passed on to the router when the InputRunner’s Deliver method is called.

  • synchronous_decode (bool, optional):

    If synchronous_decode is false, then any specified decoder plugin will be run by a DecoderRunner in its own goroutine and messages will be passed in to the decoder over a channel, freeing the input to start processing the next chunk of incoming or available data. If true, then any decoding will happen synchronously and message delivery will not return control to the input until after decoding has completed. Defaults to false.

  • send_decode_failures (bool, optional):

    If true, then if an attempt to decode a message fails then decode failure will cause the original, undecoded message to be tagged with a decode_failure field (set to true) and delivered to the router for possible further processing. Defaults to false. See also log_decode_failures.

  • can_exit (bool, optional):

    If false, the input plugin exiting will trigger a Heka shutdown. If set to true, Heka will continue processing other plugins. Defaults to false on most inputs.

  • splitter (string, optional)

    Splitter to be used by the input. This should refer to the name of a registered splitter plugin configuration. It specifies how the input should split the incoming data stream into individual records prior to decoding and/or injection to the router. Typically defaults to “NullSplitter”, although certain inputs override this with a different default value.

New in version 0.10.

  • log_decode_failures (bool, optional):

    If true, then if an attempt to decode a message fails then Heka will log an error message. Defaults to true. See also send_decode_failures.

AMQP Input

Plugin Name: AMQPInput

Connects to a remote AMQP broker (RabbitMQ) and retrieves messages from the specified queue. As AMQP is dynamically programmable, the broker topology needs to be specified in the plugin configuration.

Config:

  • url (string):

    An AMQP connection string formatted per the RabbitMQ URI Spec.

  • exchange (string):

    AMQP exchange name

  • exchange_type (string):

    AMQP exchange type (fanout, direct, topic, or headers).

  • exchange_durability (bool):

    Whether the exchange should be configured as a durable exchange. Defaults to non-durable.

  • exchange_auto_delete (bool):

    Whether the exchange is deleted when all queues have finished and there is no publishing. Defaults to auto-delete.

  • routing_key (string):

    The message routing key used to bind the queue to the exchange. Defaults to empty string.

  • prefetch_count (int):

    How many messages to fetch at once before message acks are sent. See RabbitMQ performance measurements for help in tuning this number. Defaults to 2.

  • queue (string):

    Name of the queue to consume from, an empty string will have the broker generate a name for the queue. Defaults to empty string.

  • bind_queue (string):

    Whether the queue should be explicitly bound to the exchange. Not all exchanges require the consumer to define and bind their own queue. Defaults to true.

  • queue_durability (bool):

    Whether the queue is durable or not. Defaults to non-durable.

  • queue_exclusive (bool):

    Whether the queue is exclusive (only one consumer allowed) or not. Defaults to non-exclusive.

  • queue_auto_delete (bool):

    Whether the queue is deleted when the last consumer un-subscribes. Defaults to auto-delete.

  • queue_ttl (int):

    Allows ability to specify TTL in milliseconds on Queue declaration for expiring messages. Defaults to undefined/infinite.

  • retries (RetryOptions, optional):

    A sub-section that specifies the settings to be used for restart behavior. See Configuring Restarting Behavior

New in version 0.6.

  • tls (TlsConfig):

    An optional sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if URL uses the AMQPS URI scheme. See Configuring TLS.

New in version 0.9.

  • read_only (bool):

    Whether the AMQP user is read-only. If this is true the exchange, queue and binding must be declared before starting Heka. Defaults to false.

Since many of these parameters have sane defaults, a minimal configuration to consume serialized messages would look like:

[AMQPInput]
url = "amqp://guest:guest@rabbitmq/"
exchange = "testout"
exchange_type = "fanout"

Or you might use a PayloadRegexDecoder to parse OSX syslog messages with the following:

[AMQPInput]
url = "amqp://guest:guest@rabbitmq/"
exchange = "testout"
exchange_type = "fanout"
decoder = "logparser"

[logparser]
type = "MultiDecoder"
subs = ["logline", "leftovers"]

[logline]
type = "PayloadRegexDecoder"
MatchRegex = '\w+ \d+ \d+:\d+:\d+ \S+ (?P<Reporter>[^\[]+)\[(?P<Pid>\d+)](?P<Sandbox>[^:]+)?: (?P Remaining>.*)'

    [logline.MessageFields]
    Type = "amqplogline"
    Hostname = "myhost"
    Reporter = "%Reporter%"
    Remaining = "%Remaining%"
    Logger = "%Logger%"
    Payload = "%Remaining%"

[leftovers]
type = "PayloadRegexDecoder"
MatchRegex = '.*'

    [leftovers.MessageFields]
    Type = "drop"
    Payload = ""

If the downstream heka messages are delimited by Heka’s Stream Framing, you will need to specify “HekaFramingSplitter” as the AMQPInput splitter. An example would look like:

[rsyslog-mq-input]
type = "AMQPInput"
url = "amqp://guest:guest@rabbitmq/"
exchange = "system-logs"
exchange_type = "topic"
exchange_durability = true
exchange_auto_delete = false
routing_key = "system.rsyslog"
queue = "rsyslog-logs"
queue_durability = true
queue_auto_delete = false
prefetch_count = 20
decoder = "rsyslog-multidecoder"
splitter = "HekaFramingSplitter"

    [rsyslog-mq-input.retries]
    max_delay = "180s"
    delay = "30s"
    max_retries = -1

[rsyslog-multidecoder]
type = "MultiDecoder"
subs = ["ProtobufDecoder", "rsyslog-decoder"]
cascade_strategy = "all"
log_sub_errors = true

[ProtobufDecoder]

[rsyslog-decoder]
type = "SandboxDecoder"
filename = "/usr/share/heka/lua_decoders/rsyslog.lua"

    [rsyslog-decoder.config]
    hostname_keep = true
    template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'

Docker Event Input

New in version 0.10.0.

Plugin Name: DockerEventInput

The DockerEventInput plugin connects to the Docker daemon and watches the Docker events API, sending all events to the Heka pipeline. See: Docker Events API Messages will be populated as follows:

  • Uuid: Type 4 (random) UUID generated by Heka.
  • Timestamp: Time when the event was received by the plugin.
  • Type: DockerEvent.
  • Hostname: Hostname of the machine on which Heka is running.
  • Payload: The event id, status, from and time. Example: - id:47e08ded0abb57ca263136291f14ed7689de8b6ec519f01ea76958fe512abff9 status:create from:gliderlabs/alpine:3.1 time:1429555298
  • Logger: The id provided in the event
  • Fields[“ID”] (string): The ID provided in the docker event.
  • Fields[“Status”] (string): The Status in the docker event.
  • Fields[“From”] (string): The From in the docker event.
  • Fields[“Time”] (string): The timestamp in the docker event.

Config:

  • endpoint (string):

    A Docker endpoint. Defaults to “unix:///var/run/docker.sock”.

  • cert_path (string, optional):

    Path to directory containing client certificate and keys. This value works in the same way as DOCKER_CERT_PATH.

Example:

[DockerEventInput]
type = "DockerEventInput"

[PayloadEncoder]
append_newlines = false

[LogOutput]
type = "LogOutput"
message_matcher = "Type == 'DockerEvent'"
encoder = "PayloadEncoder"

Docker Log Input

New in version 0.8.

Plugin Name: DockerLogInput

The DockerLogInput plugin attaches to all containers running on a host and sends their logs messages into the Heka pipeline. The plugin is based on Logspout by Jeff Lindsay. Messages will be populated as follows:

  • Uuid: Type 4 (random) UUID generated by Heka.
  • Timestamp: Time when the log line was received by the plugin.
  • Type: DockerLog.
  • Hostname: Hostname of the machine on which Heka is running.
  • Payload: The log line received from a Docker container.
  • Logger: stdout or stderr, depending on source.
  • Fields[“ContainerID”] (string): The container ID.
  • Fields[“ContainerName”] (string): The container name.

Note

Logspout expects to be dealing exclusively with textual log file data, and always assumes that the file data is newline delimited, i.e. one line in the log file equals one logical unit of data. For this reason, the DockerLogInput currently does not support the use of alternate splitter plugins. Any splitter setting specified in a DockerLogInput’s configuration will be ignored.

Config:

  • endpoint (string):

    A Docker endpoint. Defaults to “unix:///var/run/docker.sock”.

  • decoder (string):

    The name of the decoder used to further transform the message into a structured hekad message. No default decoder is specified.

New in version 0.9.

  • cert_path (string, optional):

    Path to directory containing client certificate and keys. This value works in the same way as DOCKER_CERT_PATH.

New in version 0.10.

  • name_from_env_var (string, optional):

    Overwrite the ContainerName with this environment variable on the Container if exists. If left empty the container name will still be used.

  • fields_from_env (array[string], optional):

    A list of environment variables to extract from the container and add as fields.

New in version 0.11.

  • since_path (string, optional):

    Path to file where input will write a record of the “since” time for each container to be able to not miss log records while Heka is down (see Dockers Get container logs API). Relative paths will be relative to Heka’s configured base_dir. Defaults to ${BASE_DIR}/docker/logs_since.txt

  • since_interval (string, optional):

    Time interval (as supported by Go’s time.ParseDuration API) that specifies how often the DockerLogInput will write out the “since” file containing the most recently retrieved log times for each container. Defaults to “5s”. If set to zero (e.g. “0s”) then the file will only be written out when Heka cleanly shuts down, meaning that if Heka crashes all container logs written since Heka has started will be re-fetched.

Example:

[nginx_log_decoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"

[nginx_log_decoder.config]
type = "nginx.access"
user_agent_transform = true
log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'

[DockerLogInput]
decoder = "nginx_log_decoder"
fields_from_env = [ "MESOS_TASK_ID" ]

Docker Stats Input

New in version 0.11.

Plugin Name: DockerStatsInput

The DockerStatsInput plugin attaches to all containers running on a host and sends their stats into the Heka pipeline. The plugin is based on Logspout by Jeff Lindsay and the existing DockerLogInput. Messages will be populated as follows:

  • Uuid: Type 4 (random) UUID generated by Heka.
  • Timestamp: Time when the stat was received by the plugin.
  • Type: DockerStats.
  • Hostname: Hostname of the machine on which Heka is running.
  • Payload: The JSON formatted Stats from the Docker API.
  • Logger: The container name
  • Fields[“ContainerID”] (string): The container ID.
  • Fields[“ContainerName”] (string): The container name.
  • Fields: Optional fields specified in the fields_from_env and fields_from_labels config parameters.

Config:

  • endpoint (string):

    A Docker endpoint. Defaults to “unix:///var/run/docker.sock”.

  • decoder (string):

    The name of the decoder used to further transform the message into a structured hekad message. No default decoder is specified.

  • cert_path (string, optional):

    Path to directory containing client certificate and keys. This value works in the same way as DOCKER_CERT_PATH.

  • name_from_env_var (string, optional):

    Overwrite the ContainerName with this environment variable on the Container if it exists. If left empty the container name will still be used.

  • fields_from_env (array[string], optional):

    A list of environment variables to extract from the container and add as fields.

  • fields_from_labels (array[string], optional):

    A list of values to extract from the container’s labels and add as fields.

Example:

[DockerStatsInput]
endpoint = "unix:///var/run/docker2.sock"
fields_from_env = [ "MESOS_TASK_ID" ]

File Polling Input

New in version 0.7.

Plugin Name: FilePollingInput

FilePollingInputs periodically read (unbuffered) the contents of a file specified, and creates a Heka message with the contents of the file as the payload.

Config:

  • file_path(string):

    The absolute path to the file which the input should read.

  • ticker_interval (unit):

    How often, in seconds to input should read the contents of the file.

Example:

[MemStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/meminfo"
decoder = "MemStatsDecoder"

HTTP Input

Plugin Name: HttpInput

HttpInput plugins intermittently poll remote HTTP URLs for data and populate message objects based on the results of the HTTP interactions. Messages will be populated as follows:

  • Uuid: Type 4 (random) UUID generated by Heka.

  • Timestamp: Time HTTP request is completed.

  • Type: heka.httpinput.data or heka.httpinput.error depending on whether or

    not the request completed. (Note that a response returned with an HTTP error code is still considered complete and will generate type heka.httpinput.data.)

  • Hostname: Hostname of the machine on which Heka is running.

  • Payload: Entire contents of the HTTP response body.

  • Severity: HTTP response 200 uses success_severity config value, all other

    results use error_severity config value.

  • Logger: Fetched URL.

  • Fields[“Status”] (string): HTTP status string value (e.g. “200 OK”).

  • Fields[“StatusCode”] (int): HTTP status code integer value.

  • Fields[“ResponseSize”] (int): Value of HTTP Content-Length header.

  • Fields[“ResponseTime”] (float64): Clock time elapsed for HTTP request, in

    seconds.

  • Fields[“Protocol”] (string): HTTP protocol used for the request (e.g.

    “HTTP/1.0”)

The Fields values above will only be populated in the event of a completed HTTP request. Also, it is possible to specify a decoder to further process the results of the HTTP response before injecting the message into the router.

Config:

  • url (string):

    A HTTP URL which this plugin will regularly poll for data. This option cannot be used with the urls option. No default URL is specified.

  • urls (array):

    New in version 0.5.

    An array of HTTP URLs which this plugin will regularly poll for data. This option cannot be used with the url option. No default URLs are specified.

  • method (string):

    New in version 0.5.

    The HTTP method to use for the request. Defaults to “GET”.

  • body (string):

    New in version 0.5.

    The request body (e.g. for an HTTP POST request). No default body is specified.

  • user (string):

    New in version 0.5.

    The username for HTTP Basic Authentication. No default username is specified.

  • password (string):

    New in version 0.5.

    The password for HTTP Basic Authentication. No default password is specified.

  • ticker_interval (uint):

    Time interval (in seconds) between attempts to poll for new data. Defaults to 10.

  • success_severity (uint):

    New in version 0.5.

    Severity level of successful HTTP request. Defaults to 6 (information).

  • error_severity (uint):

    New in version 0.5.

    Severity level of errors, unreachable connections, and non-200 responses of successful HTTP requests. Defaults to 1 (alert).

  • headers (subsection):

    New in version 0.5.

    Subsection defining headers for the request. By default the User-Agent header is set to “Heka”

Example:

[HttpInput]
url = "http://localhost:9876/"
ticker_interval = 5
success_severity = 6
error_severity = 1
decoder = "MyCustomJsonDecoder"
    [HttpInput.headers]
    user-agent = "MyCustomUserAgent"

HTTP Listen Input

New in version 0.5.

Plugin Name: HttpListenInput

HttpListenInput plugins start a webserver listening on the specified address and port. If no decoder is specified data in the request body will be populated as the message payload. Messages will be populated as follows:

  • Uuid: Type 4 (random) UUID generated by Heka.

  • Timestamp: Time HTTP request is handled.

  • Type: heka.httpdata.request

  • Hostname: The remote network address of requester.

  • Payload: Entire contents of the HTTP request body.

  • Severity: 6

  • Logger: HttpListenInput

  • Fields[“UserAgent”] (string): Request User-Agent header (e.g. “GitHub Hookshot dd0772a”).

  • Fields[“ContentType”] (string): Request Content-Type header (e.g. “application/x-www-form-urlencoded”).

  • Fields[“Protocol”] (string): HTTP protocol used for the request (e.g.

    “HTTP/1.0”)

New in version 0.6.

All query parameters are added as fields. For example, a request to “127.0.0.1:8325?user=bob” will create a field “user” with the value “bob”.

Config:

  • address (string):

    An IP address:port on which this plugin will expose a HTTP server. Defaults to “127.0.0.1:8325”.

New in version 0.7.

  • headers (subsection, optional):

    It is possible to inject arbitrary HTTP headers into each outgoing response by adding a TOML subsection entitled “headers” to your HttpOutput config section. All entries in the subsection must be a list of string values.

New in version 0.9.

  • request_headers ([]string):

    Add additional request headers as message fields. Defaults to empty list.

New in version 0.10.

  • auth_type (string, optional):

    If requiring Authentication specify “Basic” or “API” To use “API” you must set a header called “X-API-KEY” with the value of the “api_key” config.

  • username (string, optional):

    Username to check against if auth_type = “Basic”.

  • password (string, optional):

    Password to check against if auth_type = “Basic”.

  • api_key (string, optional):

    String to validate the “X-API-KEY” header against when using auth_type = “API”

  • use_tls (bool):

    Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

  • tls (TlsConfig):

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

Example:

[HttpListenInput]
address = "0.0.0.0:8325"

With Basic Auth:

[HttpListenInput]
address = "0.0.0.0:8325"
auth_type = "Basic"
username = "foo"
password = "bar"

With API Key Auth:

[HttpListenInput]
address = "0.0.0.0:8325"
auth_type = "API"
api_key = "1234567"

Kafka Input

Plugin Name: KafkaInput

Connects to a Kafka broker and subscribes to messages from the specified topic and partition.

Config:

  • id (string)

    Client ID string. Default is the hostname.

  • addrs ([]string)

    List of brokers addresses.

  • metadata_retries (int)

    How many times to retry a metadata request when a partition is in the middle of leader election. Default is 3.

  • wait_for_election (uint32)

    How long to wait for leader election to finish between retries (in milliseconds). Default is 250.

  • background_refresh_frequency (uint32)

    How frequently the client will refresh the cluster metadata in the background (in milliseconds). Default is 600000 (10 minutes). Set to 0 to disable.

  • max_open_reqests (int)

    How many outstanding requests the broker is allowed to have before blocking attempts to send. Default is 4.

  • dial_timeout (uint32)

    How long to wait for the initial connection to succeed before timing out and returning an error (in milliseconds). Default is 60000 (1 minute).

  • read_timeout (uint32)

    How long to wait for a response before timing out and returning an error (in milliseconds). Default is 60000 (1 minute).

  • write_timeout (uint32)

    How long to wait for a transmit to succeed before timing out and returning an error (in milliseconds). Default is 60000 (1 minute).

  • topic (string)

    Kafka topic (must be set).

  • partition (int32)

    Kafka topic partition. Default is 0.

  • group (string)

    A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. Default is the id.

  • default_fetch_size (int32)

    The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.

  • min_fetch_size (int32)

    The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available. The default is 1, as 0 causes the consumer to spin when no messages are available.

  • max_message_size (int32)

    The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is treated as no limit.

  • max_wait_time (uint32)

    The maximum amount of time the broker will wait for min_fetch_size bytes to become available before it returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available. 100-500ms is a reasonable range for most cases.

  • offset_method (string)

    The method used to determine at which offset to begin consuming messages. The valid values are:

    • Manual Heka will track the offset and resume from where it last left off (default).
    • Newest Heka will start reading from the most recent available offset.
    • Oldest Heka will start reading from the oldest available offset.
  • event_buffer_size (int)

    The number of events to buffer in the Events channel. Having this non-zero permits the consumer to continue fetching messages in the background while client code consumes events, greatly improving throughput. The default is 16.

New in version 0.11.

  • use_tls (bool, optional):

    Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

  • tls (TlsConfig, optional):

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

Example 1: Read Fxa messages from partition 0.

[FxaKafkaInputTest]
type = "KafkaInput"
topic = "Fxa"
addrs = ["localhost:9092"]

Example 2: Send messages between two Heka instances via a Kafka broker.

# On the producing instance
[KafkaOutputExample]
type = "KafkaOutput"
message_matcher = "TRUE"
topic = "heka"
addrs = ["kafka-broker:9092"]
encoder = "ProtobufEncoder"
# On the consuming instance
[KafkaInputExample]
type = "KafkaInput"
topic = "heka"
addrs = ["kafka-broker:9092"]
splitter = "KafkaSplitter"
decoder = "ProtobufDecoder"

[KafkaSplitter]
type = "NullSplitter"
use_message_bytes = true

Logstreamer Input

New in version 0.5.

Plugin Name:: LogstreamerInput

Tails a single log file, a sequential single log source, or multiple log sources of either a single logstream or multiple logstreams.

Config:

  • hostname (string):

    The hostname to use for the messages, by default this will be the machine’s qualified hostname. This can be set explicitly to ensure it’s the correct name in the event the machine has multiple interfaces/hostnames.

  • oldest_duration (string):

    A time duration string (e.x. “2s”, “2m”, “2h”). Logfiles with a last modified time older than oldest_duration ago will not be included for parsing. Defaults to “720h” (720 hours, i.e. 30 days).

  • journal_directory (string):

    The directory to store the journal files in for tracking the location that has been read to thus far. By default this is stored under heka’s base directory.

  • log_directory (string):

    The root directory to scan files from. This scan is recursive so it should be suitably restricted to the most specific directory this selection of logfiles will be matched under. The log_directory path will be prepended to the file_match.

  • rescan_interval (int):

    During logfile rotation, or if the logfile is not originally present on the system, this interval is how often the existence of the logfile will be checked for. The default of 5 seconds is usually fine. This interval is in milliseconds.

  • file_match (string):

    Regular expression used to match files located under the log_directory. This regular expression has $ added to the end automatically if not already present, and log_directory as the prefix. WARNING: file_match should typically be delimited with single quotes, indicating use of a raw string, rather than double quotes, which require all backslashes to be escaped. For example, ‘access\.log’ will work as expected, but “access\.log” will not, you would need “access\\.log” to achieve the same result.

  • priority (list of strings):

    When using sequential logstreams, the priority is how to sort the logfiles in order from oldest to newest.

  • differentiator (list of strings):

    When using multiple logstreams, the differentiator is a set of strings that will be used in the naming of the logger, and portions that match a captured group from the file_match will have their matched value substituted in. Only the last (according to priority) file per differentiator is kept opened.

  • translation (hash map of hash maps of ints):

    A set of translation mappings for matched groupings to the ints to use for sorting purposes.

  • splitter (string, optional):

    Defaults to “TokenSplitter”, which will split the log stream into one Heka message per line.

New in version 0.10.

  • check_data_interval (string)

    A time duration string. This interval is how often streams will be checked for new data. Defaults to “250ms”. If the plugin processes many logstreams, you may increase this value to reduce the CPU load.

New in version 0.11.

  • initial_tail (bool, optional, default: false):

    If this setting is true, when there is no cursor file for a given stream (which is always the case when reading a stream for the first time) then the input will start from the end of the stream instead of the beginning. If a cursor file exists, the input will attempt to continue from the specified cursor location, as always.

Process Input

Plugin Name: ProcessInput

Executes one or more external programs on an interval, creating messages from the output. Supports a chain of commands, where stdout from each process will be piped into the stdin for the next process in the chain. ProcessInput creates Fields[ExitStatus] and Fields[SubcmdErrors]. Fields[ExitStatus] represents the platform dependent exit status of the last command in the command chain. Fields[SubcmdErrors] represents errors from each sub command, in the format of “Subcommand[<subcommand ID>] returned an error: <error message>”.

Config:

  • command (map[uint]cmd_config):

    The command is a structure that contains the full path to the binary, command line arguments, optional environment variables and an optional working directory (see below). ProcessInput expects the commands to be indexed by integers starting with 0, where 0 is the first process in the chain.

  • ticker_interval (uint):

    The number of seconds to wait between each run of command. Defaults to 15. A ticker_interval of 0 indicates that the command is run only once, and should only be used for long running processes that do not exit. If ticker_interval is set to 0 and the process exits, then the ProcessInput will exit, invoking the restart behavior (see Configuring Restarting Behavior). Ignored when used in conjunction with Process Directory Input, where ticker_interval value is instead parsed from the directory path.

  • immediate_start (bool):

    If true, heka starts process immediately instead of waiting for first interval defined by ticker_interval to pass. Defaults to false.

  • stdout (bool):

    If true, for each run of the process chain a message will be generated with the last command in the chain’s stdout as the payload. Defaults to true.

  • stderr (bool):

    If true, for each run of the process chain a message will be generated with the last command in the chain’s stderr as the payload. Defaults to false.

  • timeout (uint):

    Timeout in seconds before any one of the commands in the chain is terminated.

  • retries (RetryOptions, optional):

    A sub-section that specifies the settings to be used for restart behavior. See Configuring Restarting Behavior

cmd_config structure:

  • bin (string):

    The full path to the binary that will be executed.

  • args ([]string):

    Command line arguments to pass into the executable.

  • env ([]string):

    Used to set environment variables before command is run. Default is nil, which uses the heka process’s environment.

  • directory (string):

    Used to set the working directory of Bin Default is “”, which uses the heka process’s working directory.

Example:

[on_space]
type = "TokenSplitter"
delimiter = " "

[DemoProcessInput]
type = "ProcessInput"
ticker_interval = 2
splitter = "on_space"
stdout = true
stderr = false

    [DemoProcessInput.command.0]
    bin = "/bin/cat"
    args = ["../testsupport/process_input_pipes_test.txt"]

    [DemoProcessInput.command.1]
    bin = "/usr/bin/grep"
    args = ["ignore"]

Process Directory Input

Plugin Name: ProcessDirectoryInput

New in version 0.5.

The ProcessDirectoryInput periodically scans a filesystem directory looking for ProcessInput configuration files. The ProcessDirectoryInput will maintain a pool of running ProcessInputs based on the contents of this directory, refreshing the set of running inputs as needed with every rescan. This allows Heka administrators to manage a set of data collection processes for a running hekad server without restarting the server.

Each ProcessDirectoryInput has a process_dir configuration setting, which is the root folder of the tree where scheduled jobs are defined. It should contain exactly one nested level of subfolders, named with ASCII numeric characters indicating the interval, in seconds, between each process run. These numeric folders must contain TOML files which specify the details regarding which processes to run.

For example, a process_dir might look like this:

-/usr/share/heka/processes/
 |-5
   |- check_myserver_running.toml
 |-61
   |- cat_proc_mounts.toml
   |- get_running_processes.toml
 |-302
   |- some_custom_query.toml

This indicates one process to be run every five seconds, two processes to be run every 61 seconds, and one process to be run every 302 seconds.

Note that ProcessDirectoryInput will ignore any files that are not nested one level deep, are not in a folder named for an integer 0 or greater, and do not end with ‘.toml’. Each file which meets these criteria, such as those shown in the example above, should contain the TOML configuration for exactly one Process Input, matching that of a standalone ProcessInput with the following restrictions:

  • The section name must be ProcessInput. Any TOML sections named anything other than ProcessInput will be ignored.
  • Any specified ticker_interval value will be ignored. The ticker interval value to use will be parsed from the directory path.

By default, if the specified process fails to run or the ProcessInput config fails for any other reason, ProcessDirectoryInput will log an error message and continue, as if the ProcessInput’s can_exit flag has been set to true. If the managed ProcessInput’s can_exit flag is manually set to false, it will trigger a Heka shutdown.

Config:

  • ticker_interval (int, optional):

    Amount of time, in seconds, between scans of the process_dir. Defaults to 300 (i.e. 5 minutes).

  • process_dir (string, optional):

    This is the root folder of the tree where the scheduled jobs are defined. Absolute paths will be honored, relative paths will be computed relative to Heka’s globally specified share_dir. Defaults to “processes” (i.e. “$share_dir/processes”).

  • retries (RetryOptions, optional):

    A sub-section that specifies the settings to be used for restart behavior of the ProcessDirectoryInput (not the individual ProcessInputs, which are configured independently). See Configuring Restarting Behavior

Example:

[ProcessDirectoryInput]
process_dir = "/etc/hekad/processes.d"
ticker_interval = 120

Sandbox Input

New in version 0.9.

Plugin Name: SandboxInput

The SandboxInput provides a flexible execution environment for data ingestion and transformation without the need to recompile Heka. Like all other sandboxes it needs to implement a process_message function. However, it doesn’t have to return until shutdown. If you would like to implement a polling interface process_message can return zero when complete and it will be called again the next time TickerInterval fires (if ticker_interval was set to zero it would simply exit after running once). See Sandbox.

Config:

  • All of the common input configuration parameters are ignored since the data processing (splitting and decoding) should happen in the plugin.

  • Common Sandbox Parameters
    • instruction_limit is always set to zero for SandboxInputs

Example

[MemInfo]
type = "SandboxInput"
filename = "meminfo.lua"

[MemInfo.config]
path = "/proc/meminfo"

Stat Accumulator Input

Plugin Name: StatAccumInput

Provides an implementation of the StatAccumulator interface which other plugins can use to submit Stat objects for aggregation and roll-up. Accumulates these stats and then periodically emits a “stat metric” type message containing aggregated information about the stats received since the last generated message.

Config:

  • emit_in_payload (bool):

    Specifies whether or not the aggregated stat information should be emitted in the payload of the generated messages, in the format accepted by the carbon portion of the graphite graphing software. Defaults to true.

  • emit_in_fields (bool):

    Specifies whether or not the aggregated stat information should be emitted in the message fields of the generated messages. Defaults to false. NOTE: At least one of ‘emit_in_payload’ or ‘emit_in_fields’ must be true or it will be considered a configuration error and the input won’t start.

  • percent_threshold (slice):

    Percent threshold to use for computing “upper_N%” type stat values. Defaults to [90].

  • ticker_interval (uint):

    Time interval (in seconds) between generated output messages. Defaults to 10.

  • message_type (string):

    String value to use for the Type value of the emitted stat messages. Defaults to “heka.statmetric”.

  • legacy_namespaces (bool):

    If set to true, then use the older format for namespacing counter stats, with rates recorded under stats.<counter_name> and absolute count recorded under stats_counts.<counter_name>. See statsd metric namespacing. Defaults to false.

  • global_prefix (string):

    Global prefix to use for sending stats to graphite. Defaults to “stats”.

  • counter_prefix (string):

    Secondary prefix to use for namespacing counter metrics. Has no impact unless legacy_namespaces is set to false. Defaults to “counters”.

  • timer_prefix (string):

    Secondary prefix to use for namespacing timer metrics. Defaults to “timers”.

  • gauge_prefix (string):

    Secondary prefix to use for namespacing gauge metrics. Defaults to “gauges”.

  • statsd_prefix (string):

    Prefix to use for the statsd numStats metric. Defaults to “statsd”.

  • delete_idle_stats (bool):

    Don’t emit values for inactive stats instead of sending 0 or in the case of gauges, sending the previous value. Defaults to false.

Example:

[StatAccumInput]
emit_in_fields = true
delete_idle_stats = true
ticker_interval = 5

Statsd Input

Plugin Name: StatsdInput

Listens for statsd protocol counter, timer, or gauge messages on a UDP port, and generates Stat objects that are handed to a StatAccumulator for aggregation and processing.

Config:

  • address (string):

    An IP address:port on which this plugin will expose a statsd server. Defaults to “127.0.0.1:8125”.

  • stat_accum_name (string):

    Name of a StatAccumInput instance that this StatsdInput will use as its StatAccumulator for submitting received stat values. Defaults to “StatAccumInput”.

  • max_msg_size (uint):

    Size of a buffer used for message read from statsd. In some cases, when statsd sends a lots in single message of stats it’s required to boost this value. All over-length data will be truncated without raising an error. Defaults to 512.

Example:

[StatsdInput]
address = ":8125"
stat_accum_name = "custom_stat_accumulator"

TCP Input

Plugin Name: TcpInput

Listens on a specific TCP address and port for messages. If the message is signed it is verified against the signer name and specified key version. If the signature is not valid the message is discarded otherwise the signer name is added to the pipeline pack and can be use to accept messages using the message_signer configuration option.

Config:

  • address (string):

    An IP address:port on which this plugin will listen.

New in version 0.4.

  • decoder (string):

    Defaults to “ProtobufDecoder”.

New in version 0.5.

  • use_tls (bool):

    Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

  • tls (TlsConfig):

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

  • net (string, optional, default: “tcp”)

    Network value must be one of: “tcp”, “tcp4”, “tcp6”, “unix” or “unixpacket”.

New in version 0.6.

  • keep_alive (bool):

    Specifies whether or not TCP keepalive should be used for established TCP connections. Defaults to false.

  • keep_alive_period (int):

    Time duration in seconds that a TCP connection will be maintained before keepalive probes start being sent. Defaults to 7200 (i.e. 2 hours).

New in version 0.9.

  • splitter (string):

    Defaults to “HekaFramingSplitter”.

Example:

[TcpInput]
address = ":5565"

UDP Input

Plugin Name: UdpInput

Listens on a specific UDP address and port for messages. If the message is signed it is verified against the signer name and specified key version. If the signature is not valid the message is discarded otherwise the signer name is added to the pipeline pack and can be use to accept messages using the message_signer configuration option.

Note

The UDP payload is not restricted to a single message; since the stream parser is being used multiple messages can be sent in a single payload.

Config:

  • address (string):

    An IP address:port or Unix datagram socket file path on which this plugin will listen.

  • signer:

    Optional TOML subsection. Section name consists of a signer name, underscore, and numeric version of the key.

    • hmac_key (string):

      The hash key used to sign the message.

New in version 0.5.

  • net (string, optional, default: “udp”)

    Network value must be one of: “udp”, “udp4”, “udp6”, or “unixgram”.

New in version 0.10.

  • set_hostname (boolean, default: false)

    Set Hostname field from remote address.

Example:

[UdpInput]
address = "127.0.0.1:4880"
splitter = "HekaFramingSplitter"
decoder = "ProtobufDecoder"

[UdpInput.signer.ops_0]
hmac_key = "4865ey9urgkidls xtb0[7lf9rzcivthkm"
[UdpInput.signer.ops_1]
hmac_key = "xdd908lfcgikauexdi8elogusridaxoalf"

[UdpInput.signer.dev_1]
hmac_key = "haeoufyaiofeugdsnzaogpi.ua,dp.804u"

New in version 0.9.

Splitters

:

Common Splitter Parameters

There are some configuration options that are universally available to all Heka splitter plugins. These will be consumed by Heka itself when Heka initializes the plugin and do not need to be handled by the plugin-specific initialization code.

  • keep_truncated (bool, optional):

    If true, then any records that exceed the capacity of the input buffer will still be delivered in their truncated form. If false, then these records will be dropped. Defaults to false.

  • use_message_bytes (bool, optional):

    Most decoders expect to find the raw, undecoded input data stored as the payload of the received Heka Message struct. Some decoders, however, such as the ProtobufDecoder, expect to receive a blob of bytes representing an entire Message struct, not just the payload. In this case, the data is expected to be found on the MsgBytes attribute of the Message’s PipelinePack. If use_message_bytes is true, then the data will be written as a byte slice to the MsgBytes attribute, otherwise it will be written as a string to the Message payload. Defaults to false in most cases, but defaults to true for the HekaFramingSplitter, which is almost always used with the ProtobufDecoder.

  • min_buffer_size (uint, optional):

    The initial size, in bytes, of the internal buffer that the SplitterRunner will use for buffering data streams. Must not be greater than the globally configured max_message_size. Defaults to 8KiB, although certain splitters may specify a different default.

  • deliver_incomplete_final (bool, optional):

    When a splitter is used to split a stream, that stream can end part way through a record. It’s sometimes appropriate to drop that data, but in other cases the incomplete data can still be useful. If ‘deliver_incomplete_final’ is set to true, then when the SplitterRunner’s SplitStream method is used a delivery attempt will be made with any partial record data that may come through immediately before an EOF. Defaults to false.

Heka Framing Splitter

Plugin Name: HekaFramingSplitter

A HekaFramingSplitter is used to split streams of data that use Heka’s built- in Stream Framing, with a protocol buffers encoded message header supporting HMAC key authentication.

A default configuration of the HekaFramingSplitter is automatically registered as an available splitter plugin as “HekaFramingSplitter”, so it is only necessary to add an additional TOML section if you want to use an instance of the splitter with settings other than the default.

Config:

  • signer:

    Optional TOML subsection. Section name consists of a signer name, underscore, and numeric version of the key.

    • hmac_key (string):

      The hash key used to sign the message.

  • use_message_bytes (bool, optional):

    The HekaFramingSplitter is almost always used in concert with an instance of ProtobufDecoder, which expects the protocol buffer message data to be available in the PipelinePack’s MsgBytes attribute, so use_message_bytes defaults to true.

  • skip_authentication (bool, optional):

    Usually if a HekaFramingSplitter identifies an incorrectly signed message, that message will be silently dropped. In some cases, however, such as when loading a stream of protobuf encoded Heka messages from a file system file, it may be desirable to skip authentication altogether. Setting this to true will do so. Defaults to false.

Example:

[acl_splitter]
type = "HekaFramingSplitter"

  [acl_splitter.signer.ops_0]
  hmac_key = "4865ey9urgkidls xtb0[7lf9rzcivthkm"
  [acl_splitter.signer.ops_1]
  hmac_key = "xdd908lfcgikauexdi8elogusridaxoalf"

  [acl_splitter.signer.dev_1]
  hmac_key = "haeoufyaiofeugdsnzaogpi.ua,dp.804u"

[tcp_control]
type = "TcpInput"
address = ":5566"
splitter = "acl_splitter"

Null Splitter

Plugin Name: NullSplitter

The NullSplitter is used in cases where the incoming data is already naturally divided into logical messages, such that Heka doesn’t need to do any further splitting. For instance, when used in conjunction with a UdpInput, the contents of each UDP packet will be made into a separate message.

Note that this means generally the NullSplitter should not be used with a stream oriented input transport, such as with TcpInput or LogstreamerInput. If this is done then the splitting will be arbitrary, each message will contain whatever happens to be the contents of a particular read operation.

The NullSplitter has no configuration options, and is automatically registered as an available splitter plugin of the name “NullSplitter”, so it doesn’t require a separate TOML configuration section.

New in version 0.11.

PatternGrouping Splitter

Plugin Name: PatternGroupingSplitter

A PatternGroupingSplitter is an extension of the RegexSplitter for use cases where a single log record may sometimes span multiple delimiter patterns. The delimiter pattern is first used to break the stream into records as it is in the RegexSplitter. But then a second pass is taken over the split records and the grouping pattern is used to rejoin matching, contiguous records into a single record.

This splitter is useful for things like Stacktraces intermixed with other logs in the same stream. The normal record delimiter pattern might be a line feed and so the first pass by the splitter splits the stream into lines. In the second pass, the grouping pattern is used to identify the lines of a stacktrace that should be grouped back together to keep the trace in a single log record. In this way you can get a single record for each stacktrace, but all other lines are split and recorded separately.

Another example use case is where an application dumps configuration values to the log on a timed basis and these values should all be grouped into a single record, rather than one line at a time.

The performance characteristics of the PatternGroupingSplitter are noticeably worse than a RegexSplitter. The RegexSplitter is optimized to break one record off at a time, matching only the first occurrence of the record delimiter before processing the record. The PatternGroupingSplitter must break apart all of the records in a single read from the stream, then re-test each line against the grouping pattern. This can result in the same line being processed by the delimiter pattern many times. Careful consideration should be made as to whether or not this performance tradeoff is tolerable.

Finally, the PatternGroupingSplitter, unlike the RegexSplitter, will always include the delimiter pattern for each line and also at the end of the record.

Config:

  • delimiter (string)

    Regular expression to be used as the record boundary. May contain zero or one specified capture groups.

  • grouping (string)

    Regular expression to be used to regroup matching records into a single final record. Any contiguous lines matching this expression will become a single record.

  • max_lines (int, optional)
    The maximum number of records to process in a single splitting operation

    with the delimiter pattern. This is used to tune for performance by helping to limit the number of times a single line will be re-parsed with the delimiter expression. The knock-on effect is that this will also set the upper bound on the number of lines that can be grouped into a single record. Defaults to 99.

Example:

[stacktrace_grouping_splitter]
type = "PatternGroupingSplitter"
delimiter = '\n'
grouping = '(\] FATAL )|(\A\s*.+Exception: .)|(at \S+\(\S+\))|(\A\s+... \d+ more)|(\A\s*Caused by:.)|(\A\s*Grave:)'

Regex Splitter

Plugin Name: RegexSplitter

A RegexSplitter considers any text that matches a specified regular expression to represent a boundary on which records should be split. The regular expression may consist of exactly one capture group. If a capture group is specified, then the captured text will be included in the returned record. If not, then the returned record will not include the text that caused the regular expression match.

Config:

  • delimiter (string)

    Regular expression to be used as the record boundary. May contain zero or one specified capture groups.

  • delimiter_eol (bool, optional):

    Specifies whether the contents of a delimiter capture group should be appended to the end of a record (true) or prepended to the beginning (false). Defaults to true. If the delimiter expression does not specify a capture group, this will have no effect.

Example:

[mysql_slow_query_splitter]
type = "RegexSplitter"
delimiter = '\n(# User@Host:)'
delimiter_eol = false

Token Splitter

Plugin Name: TokenSplitter

A TokenSplitter is used to split an incoming data stream on every occurrence (or every Nth occurrence) of a single, one byte token character. The token will be included as the final character in the returned record.

A default configuration of the TokenSplitter (i.e. splitting on every newline) is automatically registered as an available splitter plugin as “TokenSplitter”, so additional TOML sections don’t need to be added unless you want to use different settings.

Config:

  • delimiter (string, optional):

    String representation of the byte token to be used as message delimiter. Defaults to “\n”.

  • count (uint, optional):

    Number of instances of the delimiter that should be encountered before returning a record. Defaults to 1. Setting to 0 has no effect, 0 and 1 will be treated identically. Often used in conjunction with the deliver_incomplete_final option set to true, to ensure trailing partial records are still delivered.

Example:

[split_on_space]
type = "TokenSplitter"
delimiter = " "

[split_every_50th_newline_keep_partial]
type = "TokenSplitter"
count = 50
deliver_incomplete_final = true

Decoders

Apache Access Log Decoder

New in version 0.6.

Plugin Name: SandboxDecoder
File Name: lua_decoders/apache_access.lua

Parses the Apache access logs based on the Apache ‘LogFormat’ configuration directive. The Apache format specifiers are mapped onto the Nginx variable names where applicable e.g. %a -> remote_addr. This allows generic web filters and outputs to work with any HTTP server input.

Config:

  • log_format (string)

    The ‘LogFormat’ configuration directive from the apache2.conf. %t variables are converted to the number of nanosecond since the Unix epoch and used to set the Timestamp on the message. http://httpd.apache.org/docs/2.4/mod/mod_log_config.html

  • type (string, optional, default nil):

    Sets the message ‘Type’ header to the specified value

  • user_agent_transform (bool, optional, default false)

    Transform the http_user_agent into user_agent_browser, user_agent_version, user_agent_os.

  • user_agent_keep (bool, optional, default false)

    Always preserve the http_user_agent value if transform is enabled.

  • user_agent_conditional (bool, optional, default false)

    Only preserve the http_user_agent value if transform is enabled and fails.

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[TestWebserver]
type = "LogstreamerInput"
log_directory = "/var/log/apache"
file_match = 'access\.log'
decoder = "CombinedLogDecoder"

[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/apache_access.lua"

[CombinedLogDecoder.config]
type = "combined"
user_agent_transform = true
# combined log format
log_format = '%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"'

# common log format
# log_format = '%h %l %u %t \"%r\" %>s %O'

# vhost_combined log format
# log_format = '%v:%p %h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"'

# referer log format
# log_format = '%{Referer}i -> %U'

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:combined
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Logger:TestWebserver
Payload:
EnvVersion:
Severity:7
Fields:
name:”remote_user” value_string:”-“
name:”http_x_forwarded_for” value_string:”-“
name:”http_referer” value_string:”-“
name:”body_bytes_sent” value_type:DOUBLE representation:”B” value_double:82
name:”remote_addr” value_string:”62.195.113.219” representation:”ipv4”
name:”status” value_type:DOUBLE value_double:200
name:”request” value_string:”GET /v1/recovery_email/status HTTP/1.1”
name:”user_agent_os” value_string:”FirefoxOS”
name:”user_agent_browser” value_string:”Firefox”
name:”user_agent_version” value_type:DOUBLE value_double:29

Graylog Extended Log Format Decoder

New in version 0.8.

Plugin Name: SandboxDecoder
File Name: lua_decoders/graylog_extended.lua

Parses a payload containing JSON in the Graylog2 Extended Format specficiation. http://graylog2.org/resources/gelf/specification

Config:

  • type (string, optional, default nil):

    Sets the message ‘Type’ header to the specified value

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example of Graylog2 Exteded Format Log

{
  "version": "1.1",
  "host": "rogueethic.com",
  "short_message": "This is a short message to identify what is going on.",
  "full_message": "An entire backtrace\ncould\ngo\nhere",
  "timestamp": 1385053862.3072,
  "level": 1,
  "_user_id": 9001,
  "_some_info": "foo",
  "_some_env_var": "bar"
}

Example Heka Configuration

[GELFLogInput]
type = "LogstreamerInput"
log_directory = "/var/log"
file_match = 'application\.gelf'
decoder = "GraylogDecoder"

[GraylogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/graylog_decoder.lua"

    [GraylogDecoder.config]
    type = "gelf"
    payload_keep = true

Geo IP Decoder

New in version 0.6.

Plugin Name: GeoIpDecoder

Decoder plugin that generates GeoIP data based on the IP address of a specified field. It uses the GeoIP Go project as a wrapper around MaxMind’s geoip-api-c library, and thus assumes you have the library downloaded and installed. Currently, only the GeoLiteCity database is supported, which you must also download and install yourself into a location to be referenced by the db_file config option. By default the database file is opened using “GEOIP_MEMORY_CACHE” mode. This setting is hard- coded into the wrapper’s geoip.go file. You will need to manually override that code if you want to specify one of the other modes listed here.

Note

Due to external dependencies, this plugin is not compiled in to the released Heka binaries. It will automatically be included in a source build if GeoIP.h is available in the include path during build time. The generated binary will then only work on machines with the appropriate GeoIP shared library (e.g. libGeoIP.so.1) installed.

Note

If you are using this with the ES output you will likely need to specify the raw_bytes_fields option for the target_field specified. This is required to preserve the formatting of the JSON object.

Config:

  • db_file:

    The location of the GeoLiteCity.dat database. Defaults to “/var/cache/hekad/GeoLiteCity.dat”

  • source_ip_field:

    The name of the field containing the IP address you want to derive the location for.

  • target_field:

    The name of the new field created by the decoder. The decoder will output a JSON object with the following elements:

    • latitude: string,

    • longitude: string,

    • location: [ float64, float64 ],
    • coordinates: [ string, string ],

    • countrycode: string,

    • countrycode3: string,

    • region: string,

    • city: string,

    • postalcode: string,

    • areacode: int,

    • charset: int,

    • continentalcode: string

[apache_geoip_decoder]
type = "GeoIpDecoder"
db_file="/etc/geoip/GeoLiteCity.dat"
source_ip_field="remote_host"
target_field="geoip"

JSON Decoder

New in version 0.10.

Plugin Name: SandboxDecoder
File Name: lua_decoders/json.lua

Parses a payload containing JSON.

Config:

  • type (string, optional, default “json”):

    Sets the message ‘Type’ header to the specified value, will be overridden if Type config option is specified.

  • payload_keep (bool, optional, default false)

    Whether to preserve the original log line in the message payload.

  • map_fields (bool, optional, default false)

    Enables mapping of json fields to heka message fields.

  • Payload (string, optional, default nil)

    String specifying json field to map to message Payload, expects field value to be a string. Overrides the keep_payload config option.

  • Uuid (string, optional, default nil)

    String specifying json field to map to message Uuid, expects field value to be a string.

  • Type (string, optional, default nil)

    String specifying json field to map to to message Type, expects field value to be a string. Overrides the type config option

  • Logger (string, optional, default nil)

    String specifying json field to map to message Logger, expects field value to be a string.

  • Hostname (string, optional, default nil)

    String specifying json field to map to message Hostname, expects field value to be a string.

  • Severity (string, optional, default nil)

    String specifying json field to map to message Severity, expects field value to be numeric.

  • EnvVersion (string, optional, default nil)

    String specifying json field to map to message EnvVersion, expects field value to be numeric.

  • Pid (string, optional, default nil)

    String specifying json field to map to message Pid, expects field value to be numeric

  • Timestamp (string, optional, default nil)

    String specifying json field to map to message Timestamp, if field value not in ns-since-epoch format, provide the timestamp_format config option.

  • timestamp_format (string, optional, default nil)

    String specifying the format used to parse extracted JSON values for the Timestamp fields, in standard strftime format. If left blank, timestamp values will be assumed to be in nanoseconds-since-epoch.

  • maximum_depth (uint, optional, default nil)

    Maximum depth to flatten nested keys to. Additional nesting below max_depth will be stringified via json encoding. For example, specifying a maximum_depth of 1 for the json string ‘{“top”:{“nested”:1}}’ would decode to ‘{“top”:”{“nested”:1}”}’.

  • separator (string, optional, default ”.”)

    String specifying the character to use between keys during flattening. For example: ‘{“top”:{“nested”:1}}’ would decode to ‘{“top.nested”:1}”

{
  "msg": "Start Request",
  "event": "artemis.web.ensure-running",
  "extra": {
    "workspace-id": "cN907xLngi"
  },
  "time": "2015-05-06T20:40:05.509926234Z",
  "severity": 1
}

Example Heka Configuration

[ArtemisLogInput]
type = "LogstreamerInput"
log_directory = "/srv/artemis/current/logs"
file_match = 'artemis\.log'
decoder = "JsonDecoder"

[JsonDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/json.lua"

    [JsonDecoder.config]
    type = "artemis"
    payload_keep = true
    map_fields = true
    Severity = "severity"

Example Heka Message

Timestamp:2015-05-06 20:40:05 -0800 PST
Type:artemis
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:1
Fields:
name:”msg” value_type:STRING value_string:”Start Request”
name:”event” value_type:STRING value_string:”artemis.web.ensure-running”
name:”extra.workspace-id” value_type:STRING value_string:”cN907xLngi”
name:”time” value_type:STRING value_string:”2015-05-06T20:40:05.509926234Z”

MultiDecoder

Plugin Name: MultiDecoder

This decoder plugin allows you to specify an ordered list of delegate decoders. The MultiDecoder will pass the PipelinePack to be decoded to each of the delegate decoders in turn until decode succeeds. In the case of failure to decode, MultiDecoder will return an error and recycle the message.

Config:

  • subs ([]string):

    An ordered list of subdecoders to which the MultiDecoder will delegate. Each item in the list should specify another decoder configuration section by section name. Must contain at least one entry.

  • log_sub_errors (bool):

    If true, the DecoderRunner will log the errors returned whenever a delegate decoder fails to decode a message. Defaults to false.

  • cascade_strategy (string):

    Specifies behavior the MultiDecoder should exhibit with regard to cascading through the listed decoders. Supports only two valid values: “first-wins” and “all”. With “first-wins”, each decoder will be tried in turn until there is a successful decoding, after which decoding will be stopped. With “all”, all listed decoders will be applied whether or not they succeed. In each case, decoding will only be considered to have failed if none of the sub-decoders succeed.

Here is a slightly contrived example where we have protocol buffer encoded messages coming in over a TCP connection, with each message containing a single nginx log line. Our MultiDecoder will run each message through two decoders, the first to deserialize the protocol buffer and the second to parse the log text:

[TcpInput]
address = ":5565"
parser_type = "message.proto"
decoder = "shipped-nginx-decoder"

[shipped-nginx-decoder]
type = "MultiDecoder"
subs = ['ProtobufDecoder', 'nginx-access-decoder']
cascade_strategy = "all"
log_sub_errors = true

[ProtobufDecoder]

[nginx-access-decoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"

    [nginx-access-decoder.config]
    type = "combined"
    user_agent_transform = true
    log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'

Linux CPU Stats Decoder

New in version 0.10.

Plugin Name: SandboxDecoder
File Name: lua_decoders/linux_procstat.lua

Parses a payload containing the contents of file /proc/stat.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[ProcStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/stat"
decoder = "ProcStatDecoder"

[ProcStatDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_procstat.lua"

Example Heka Message

Timestamp:

2014-12-10 22:38:24 +0000 UTC

Type:

stats.proc

Hostname:

yourhost.net

Pid:

0

Uuid:

d2546942-7c36-4042-ad2e-f6bfdac11cdb

Logger:
Payload:
EnvVersion:
Severity:

7

Fields:
name:”cpu” type:double value:[14384,125,3330,946000,333,0,356,0,0,0]
name:”cpu[1-#]” type:double value:[14384,125,3330,946000,333,0,356,0,0,0]
name:”ctxt” type:double value:2808304
name:”btime” type:double value:1423004780
name:”intr” type:double value:[14384,125,3330,0,0,0,0,0,0,0...0]
name:”processes” type:double value:3811
name:”procs_running” type:double value:1
name:”procs_blocked” type:double value:0
name:”softirq” type:double value:[288977,23,101952,19,13046,19217,7,...]
Cpu fields:

1 2 3 4 5 6 7 8 9 10 user nice system idle [iowait] [irq] [softirq] [steal] [guest] [guestnice] Note: systems provide user, nice, system, idle. Other fields depend on kernel.

intr

This line shows counts of interrupts serviced since boot time, for each of the possible system interrupts. The first column is the total of all interrupts serviced including unnumbered architecture specific interrupts; each subsequent column is the total for that particular numbered interrupt. Unnumbered interrupts are not shown, only summed into the total.

Linux Disk Stats Decoder

New in version 0.7.

Plugin Name: SandboxDecoder
File Name: lua_decoders/linux_diskstats.lua

Parses a payload containing the contents of a /sys/block/$DISK/stat file (where $DISK is a disk identifier such as sda) into a Heka message struct. This also tries to obtain the TickerInterval of the input it recieved the data from, by extracting it from a message field named TickerInterval.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[DiskStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/sys/block/sda1/stat"
decoder = "DiskStatsDecoder"

[DiskStatsDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_diskstats.lua"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:stats.diskstats
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:7
Fields:
name:”ReadsCompleted” value_type:DOUBLE value_double:”20123”
name:”ReadsMerged” value_type:DOUBLE value_double:”11267”
name:”SectorsRead” value_type:DOUBLE value_double:”1.094968e+06”
name:”TimeReading” value_type:DOUBLE value_double:”45148”
name:”WritesCompleted” value_type:DOUBLE value_double:”1278”
name:”WritesMerged” value_type:DOUBLE value_double:”1278”
name:”SectorsWritten” value_type:DOUBLE value_double:”206504”
name:”TimeWriting” value_type:DOUBLE value_double:”3348”
name:”TimeDoingIO” value_type:DOUBLE value_double:”4876”
name:”WeightedTimeDoingIO” value_type:DOUBLE value_double:”48356”
name:”NumIOInProgress” value_type:DOUBLE value_double:”3”
name:”TickerInterval” value_type:DOUBLE value_double:”2”
name:”FilePath” value_string:”/sys/block/sda/stat”

Linux Load Average Decoder

New in version 0.7.

Plugin Name: SandboxDecoder
File Name: lua_decoders/linux_loadavg.lua

Parses a payload containing the contents of a /proc/loadavg file into a Heka message.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[LoadAvg]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/loadavg"
decoder = "LoadAvgDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:stats.loadavg
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:7
Fields:
name:”1MinAvg” value_type:DOUBLE value_double:”3.05”
name:”5MinAvg” value_type:DOUBLE value_double:”1.21”
name:”15MinAvg” value_type:DOUBLE value_double:”0.44”
name:”NumProcesses” value_type:DOUBLE value_double:”11”
name:”FilePath” value_string:”/proc/loadavg”

Linux Memory Stats Decoder

New in version 0.7.

Plugin Name: SandboxDecoder File Name: lua_decoders/linux_memstats.lua

Parses a payload containing the contents of a /proc/meminfo file into a Heka message.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[MemStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/meminfo"
decoder = "MemStatsDecoder"

[MemStatsDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_memstats.lua"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:stats.memstats
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:7
Fields:
name:”MemTotal” value_type:DOUBLE representation:”kB” value_double:”4047616”
name:”MemFree” value_type:DOUBLE representation:”kB” value_double:”3432216”
name:”Buffers” value_type:DOUBLE representation:”kB” value_double:”82028”
name:”Cached” value_type:DOUBLE representation:”kB” value_double:”368636”
name:”FilePath” value_string:”/proc/meminfo”

The total available fields can be found in man procfs. All fields are of type double, and the representation is in kB (except for the HugePages fields). Here is a full list of fields available:

MemTotal, MemFree, Buffers, Cached, SwapCached, Active, Inactive, Active(anon), Inactive(anon), Active(file), Inactive(file), Unevictable, Mlocked, SwapTotal, SwapFree, Dirty, Writeback, AnonPages, Mapped, Shmem, Slab, SReclaimable, SUnreclaim, KernelStack, PageTables, NFS_Unstable, Bounce, WritebackTmp, CommitLimit, Committed_AS, VmallocTotal, VmallocUsed, VmallocChunk, HardwareCorrupted, AnonHugePages, HugePages_Total, HugePages_Free, HugePages_Rsvd, HugePages_Surp, Hugepagesize, DirectMap4k, DirectMap2M, DirectMap1G.

Note that your available fields may have a slight variance depending on the system’s kernel version.

Linux netdev Decoder

New in version 0.10.

Plugin Name: SandboxDecoder
File Name: lua_decoders/linux_netdev.lua

Parses a payload containing the contents of a /proc/net/net/dev file into a Heka message.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[Netdev]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/net/dev"
decoder = "NetdevDecoder"

[NetdevDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_netdev.lua"

Example Heka Message

Timestamp:2015-10-16 13:31:07 +0000 UTC
Type:stats.netdev
Hostname:example.com
Pid:0
Uuid:505561dc-81f6-4856-abe5-077c24457010
Logger:NetdevInput
Payload:
EnvVersion:
Severity:7
Fields:
name:”receive_multicast” type:double value:0
name:”transmit_errs” type:double value:0
name:”receive_drop” type:double value:0
name:”netdevice” type:string value:”eth0”
name:”transmit_drop” type:double value:0
name:”transmit_carrier” type:double value:0
name:”receive_packets” type:double value:1.18443194e+08
name:”receive_compressed” type:double value:0
name:”transmit_colls” type:double value:0
name:”transmit_compressed” type:double value:0
name:”receive_frame” type:double value:0
name:”transmit_packets” type:double value:1.07330545e+08
name:”receive_fifo” type:double value:0
name:”receive_bytes” type:double value:1.3915983085e+11
name:”transmit_bytes” type:double value:1.78516842512e+11
name:”receive_errs” type:double value:0
name:”transmit_fifo” type:double value:0

Linux netstat Decoder

New in version 0.10.

Plugin Name: SandboxDecoder
File Name: lua_decoders/linux_netstat.lua

Parses a payload containing the contents of a /proc/net/netstat or /proc/net/snmp file into a Heka message.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[NetNetstat]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/net/netstat"
decoder = "NetstatDecoder"

[NetSnmp]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/net/snmp"
decoder = "NetstatDecoder"

[NetstatDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_netstat.lua"

Example Heka Message

Timestamp:2015-08-28 15:52:00 +0000 UTC
Type:stats.netstat
Hostname:test.example.com
Pid:0
Uuid:90c202d1-1375-4ec2-ac8c-eb53b2850d19
Logger:NetSnmp
Payload:
EnvVersion:
Severity:7
Fields:
name:”Ip_FragCreates” type:integer value:0
name:”Ip_FragOKs” type:integer value:0
name:”Icmp_InTimestamps” type:integer value:0
name:”Ip_InUnknownProtos” type:integer value:0
name:”Ip_ReasmFails” type:integer value:0
name:”Icmp_OutErrors” type:integer value:0
name:”Icmp_InDestUnreachs” type:integer value:19812
name:”Ip_InReceives” type:integer value:718979
name:”Ip_ReasmTimeout” type:integer value:0
name:”Ip_InHdrErrors” type:integer value:0
name:”Ip_ReasmOKs” type:integer value:0
name:”Icmp_OutSrcQuenchs” type:integer value:0
name:”Icmp_InAddrMaskReps” type:integer value:0
name:”Ip_OutNoRoutes” type:integer value:1788
name:”IcmpMsg_OutType0” type:integer value:81
name:”Ip_FragFails” type:integer value:0
name:”Icmp_OutTimeExcds” type:integer value:0
name:”Ip_ReasmReqds” type:integer value:0
name:”IcmpMsg_InType3” type:integer value:19812
name:”Ip_InDiscards” type:integer value:0
name:”Icmp_InTimestampReps” type:integer value:0
name:”Icmp_InEchoReps” type:integer value:0
name:”Icmp_OutAddrMasks” type:integer value:0
name:”Icmp_InMsgs” type:integer value:19893
name:”Icmp_OutMsgs” type:integer value:19892
name:”Icmp_OutTimestampReps” type:integer value:0
name:”Icmp_InSrcQuenchs” type:integer value:0
name:”IcmpMsg_OutType3” type:integer value:19811
name:”Icmp_OutEchoReps” type:integer value:81
name:”Icmp_OutParmProbs” type:integer value:0
name:”Icmp_OutRedirects” type:integer value:0
name:”Icmp_OutEchos” type:integer value:0
name:”Ip_DefaultTTL” type:integer value:64
name:”Icmp_InCsumErrors” type:integer value:0
name:”IcmpMsg_InType8” type:integer value:81
name:”Icmp_InRedirects” type:integer value:0
name:”Ip_OutDiscards” type:integer value:9272
name:”FilePath” type:string value:”/proc/net/snmp”
name:”Icmp_InErrors” type:integer value:0
name:”Ip_Forwarding” type:integer value:1
name:”Icmp_OutTimestamps” type:integer value:0
name:”Icmp_InEchos” type:integer value:81
name:”Icmp_InAddrMasks” type:integer value:0
name:”Icmp_InTimeExcds” type:integer value:0
name:”Ip_OutRequests” type:integer value:544286
name:”Ip_InDelivers” type:integer value:718236
name:”Ip_InAddrErrors” type:integer value:31
name:”Icmp_OutAddrMaskReps” type:integer value:0
name:”Ip_ForwDatagrams” type:integer value:0
name:”Icmp_InParmProbs” type:integer value:0
name:”Icmp_OutDestUnreachs” type:integer value:19811

MySQL Slow Query Log Decoder

New in version 0.6.

Plugin Name: SandboxDecoder
File Name: lua_decoders/mysql_slow_query.lua

Parses and transforms the MySQL slow query logs. Use mariadb_slow_query.lua to parse the MariaDB variant of the MySQL slow query logs.

Config:

  • truncate_sql (int, optional, default nil)

    Truncates the SQL payload to the specified number of bytes (not UTF-8 aware) and appends ”...”. If the value is nil no truncation is performed. A negative value will truncate the specified number of bytes from the end.

Example Heka Configuration

[Sync-1_5-SlowQuery]
type = "LogstreamerInput"
log_directory = "/var/log/mysql"
file_match = 'mysql-slow\.log'
parser_type = "regexp"
delimiter = "\n(# User@Host:)"
delimiter_location = "start"
decoder = "MySqlSlowQueryDecoder"

[MySqlSlowQueryDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/mysql_slow_query.lua"

    [MySqlSlowQueryDecoder.config]
    truncate_sql = 64

Example Heka Message

Timestamp:2014-05-07 15:51:28 -0700 PDT
Type:mysql.slow-query
Hostname:127.0.0.1
Pid:0
UUID:5324dd93-47df-485b-a88e-429f0fcd57d6
Logger:Sync-1_5-SlowQuery
Payload:/* [queryName=FIND_ITEMS] */ SELECT bso.userid, bso.collection, ...
EnvVersion:
Severity:7
Fields:
name:”Rows_examined” value_type:DOUBLE value_double:16458
name:”Query_time” value_type:DOUBLE representation:”s” value_double:7.24966
name:”Rows_sent” value_type:DOUBLE value_double:5001
name:”Lock_time” value_type:DOUBLE representation:”s” value_double:0.047038

Nginx Access Log Decoder

New in version 0.5.

Plugin Name: SandboxDecoder
File Name: lua_decoders/nginx_access.lua

Parses the Nginx access logs based on the Nginx ‘log_format’ configuration directive.

Config:

  • log_format (string)

    The ‘log_format’ configuration directive from the nginx.conf. $time_local or $time_iso8601 variable is converted to the number of nanosecond since the Unix epoch and used to set the Timestamp on the message. http://nginx.org/en/docs/http/ngx_http_log_module.html

  • type (string, optional, default nil):

    Sets the message ‘Type’ header to the specified value

  • user_agent_transform (bool, optional, default false)

    Transform the http_user_agent into user_agent_browser, user_agent_version, user_agent_os.

  • user_agent_keep (bool, optional, default false)

    Always preserve the http_user_agent value if transform is enabled.

  • user_agent_conditional (bool, optional, default false)

    Only preserve the http_user_agent value if transform is enabled and fails.

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[TestWebserver]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'access\.log'
decoder = "CombinedLogDecoder"

[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"

[CombinedLogDecoder.config]
type = "combined"
user_agent_transform = true
# combined log format
log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:combined
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Logger:TestWebserver
Payload:
EnvVersion:
Severity:7
Fields:
name:”remote_user” value_string:”-“
name:”http_x_forwarded_for” value_string:”-“
name:”http_referer” value_string:”-“
name:”body_bytes_sent” value_type:DOUBLE representation:”B” value_double:82
name:”remote_addr” value_string:”62.195.113.219” representation:”ipv4”
name:”status” value_type:DOUBLE value_double:200
name:”request” value_string:”GET /v1/recovery_email/status HTTP/1.1”
name:”user_agent_os” value_string:”FirefoxOS”
name:”user_agent_browser” value_string:”Firefox”
name:”user_agent_version” value_type:DOUBLE value_double:29

Nginx Error Log Decoder

New in version 0.6.

Plugin Name: SandboxDecoder
File Name: lua_decoders/nginx_error.lua

Parses the Nginx error logs based on the Nginx hard coded internal format.

Config:

  • tz (string, optional, defaults to UTC)

    The conversion actually happens on the Go side since there isn’t good TZ support here.

  • type (string, optional, defaults to “nginx.error”):

    Sets the message ‘Type’ header to the specified value

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[TestWebserverError]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'error\.log'
decoder = "NginxErrorDecoder"

[NginxErrorDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_error.lua"

[NginxErrorDecoder.config]
tz = "America/Los_Angeles"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:nginx.error
Hostname:trink-x230
Payload:
Pid:16842
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Logger:TestWebserverError
Payload:using inherited sockets from “6;”
EnvVersion:
Severity:5
Fields:
name:”tid” value_type:DOUBLE value_double:0
name:”connection” value_type:DOUBLE value_double:8878

Nginx Stub Status Decoder

New in version 0.10.

Plugin Name: SandboxDecoder
File Name: lua_decoders/nginx_stub_status.lua

Parses a payload containing the output of nginx’s stub status module: http://nginx.org/en/docs/http/ngx_http_stub_status_module.html

Config:

  • type (string, optional, default ‘nginx_stub_status’)

    Set the message type.

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[NginxStubStatusInput]
type = "HttpInput"
url = "http://localhost:8090/nginx_status"
ticker_interval = 1
success_severity = 6
error_severity = 1
decoder = "NginxStubStatusDecoder"

[NginxStubStatusDecoder]
filename = "lua_decoders/nginx_stub_status.lua"
type = "SandboxDecoder"

[NginxStubStatusDecoder.config]
payload_keep = false

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:nginx_stub_status
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:7
Fields:
name:”connections” value_type:INTEGER value_integer:”291”
name:”accepts” value_type:INTEGER value_integer:”16630948”
name:”handled” value_type:INTEGER value_integer:”16630948”
name:”requests” value_type:INTEGER value_integer:”31070465”
name:”reading” value_type:INTEGER value_integer:”6”
name:”writing” value_type:INTEGER value_integer:”179”
name:”waiting” value_type:INTEGER value_integer:”106”

Payload Regex Decoder

Plugin Name: PayloadRegexDecoder

Decoder plugin that accepts messages of a specified form and generates new outgoing messages from extracted data, effectively transforming one message format into another.

Note

The Go regular expression tester is an invaluable tool for constructing and debugging regular expressions to be used for parsing your input data.

Config:

  • match_regex:

    Regular expression that must match for the decoder to process the message.

  • severity_map:

    Subsection defining severity strings and the numerical value they should be translated to. hekad uses numerical severity codes, so a severity of WARNING can be translated to 3 by settings in this section. See Heka Message.

  • message_fields:

    Subsection defining message fields to populate and the interpolated values that should be used. Valid interpolated values are any captured in a regex in the message_matcher, and any other field that exists in the message. In the event that a captured name overlaps with a message field, the captured name’s value will be used. Optional representation metadata can be added at the end of the field name using a pipe delimiter i.e. ResponseSize|B = “%ResponseSize%” will create Fields[ResponseSize] representing the number of bytes. Adding a representation string to a standard message header name will cause it to be added as a user defined field i.e., Payload|json will create Fields[Payload] with a json representation (see Field Variables).

    Interpolated values should be surrounded with % signs, for example:

    [my_decoder.message_fields]
    Type = "%Type%Decoded"
    

    This will result in the new message’s Type being set to the old messages Type with Decoded appended.

  • timestamp_layout (string):

    A formatting string instructing hekad how to turn a time string into the actual time representation used internally. Example timestamp layouts can be seen in Go’s time documentation. In addition to the Go time formatting, special timestamp_layout values of “Epoch”, “EpochMilli”, “EpochMicro”, and “EpochNano” are supported for Unix style timestamps represented in seconds, milliseconds, microseconds, and nanoseconds since the Epoch, respectively.

  • timestamp_location (string):

    Time zone in which the timestamps in the text are presumed to be in. Should be a location name corresponding to a file in the IANA Time Zone database (e.g. “America/Los_Angeles”), as parsed by Go’s time.LoadLocation() function (see http://golang.org/pkg/time/#LoadLocation). Defaults to “UTC”. Not required if valid time zone info is embedded in every parsed timestamp, since those can be parsed as specified in the timestamp_layout. This setting will have no impact if one of the supported “Epoch*” values is used as the timestamp_layout setting.

  • log_errors (bool):

    New in version 0.5.

    If set to false, payloads that can not be matched against the regex will not be logged as errors. Defaults to true.

Example (Parsing Apache Combined Log Format):

[apache_transform_decoder]
type = "PayloadRegexDecoder"
match_regex = '^(?P<RemoteIP>\S+) \S+ \S+ \[(?P<Timestamp>[^\]]+)\] "(?P<Method>[A-Z]+) (?P<Url>[^\s]+)[^"]*" (?P<StatusCode>\d+) (?P<RequestSize>\d+) "(?P<Referer>[^"]*)" "(?P<Browser>[^"]*)"'
timestamp_layout = "02/Jan/2006:15:04:05 -0700"

# severities in this case would work only if a (?P<Severity>...) matching
# group was present in the regex, and the log file contained this information.
[apache_transform_decoder.severity_map]
DEBUG = 7
INFO = 6
WARNING = 4

[apache_transform_decoder.message_fields]
Type = "ApacheLogfile"
Logger = "apache"
Url|uri = "%Url%"
Method = "%Method%"
Status = "%Status%"
RequestSize|B = "%RequestSize%"
Referer = "%Referer%"
Browser = "%Browser%"

Payload XML Decoder

Plugin Name: PayloadXmlDecoder

This decoder plugin accepts XML blobs in the message payload and allows you to map parts of the XML into Field attributes of the pipeline pack message using XPath syntax using the xmlpath library.

Config:

  • xpath_map:

    A subsection defining a capture name that maps to an XPath expression. Each expression can fetch a single value, if the expression does not resolve to a valid node in the XML blob, the capture group will be assigned an empty string value.

  • severity_map:

    Subsection defining severity strings and the numerical value they should be translated to. hekad uses numerical severity codes, so a severity of WARNING can be translated to 3 by settings in this section. See Heka Message.

  • message_fields:

    Subsection defining message fields to populate and the interpolated values that should be used. Valid interpolated values are any captured in an XPath in the message_matcher, and any other field that exists in the message. In the event that a captured name overlaps with a message field, the captured name’s value will be used. Optional representation metadata can be added at the end of the field name using a pipe delimiter i.e. ResponseSize|B = “%ResponseSize%” will create Fields[ResponseSize] representing the number of bytes. Adding a representation string to a standard message header name will cause it to be added as a user defined field i.e., Payload|json will create Fields[Payload] with a json representation (see Field Variables).

    Interpolated values should be surrounded with % signs, for example:

    [my_decoder.message_fields]
    Type = "%Type%Decoded"
    

    This will result in the new message’s Type being set to the old messages Type with Decoded appended.

  • timestamp_layout (string):

    A formatting string instructing hekad how to turn a time string into the actual time representation used internally. Example timestamp layouts can be seen in Go’s time documentation. The default layout is ISO8601 - the same as Javascript. In addition to the Go time formatting, special timestamp_layout values of “Epoch”, “EpochMilli”, “EpochMicro”, and “EpochNano” are supported for Unix style timestamps represented in seconds, milliseconds, microseconds, and nanoseconds since the Epoch, respectively.

  • timestamp_location (string):

    Time zone in which the timestamps in the text are presumed to be in. Should be a location name corresponding to a file in the IANA Time Zone database (e.g. “America/Los_Angeles”), as parsed by Go’s time.LoadLocation() function (see http://golang.org/pkg/time/#LoadLocation). Defaults to “UTC”. Not required if valid time zone info is embedded in every parsed timestamp, since those can be parsed as specified in the timestamp_layout. This setting will have no impact if one of the supported “Epoch*” values is used as the timestamp_layout setting.

Example:

[myxml_decoder]
type = "PayloadXmlDecoder"

[myxml_decoder.xpath_map]
Count = "/some/path/count"
Name = "/some/path/name"
Pid = "//pid"
Timestamp = "//timestamp"
Severity = "//severity"

[myxml_decoder.severity_map]
DEBUG = 7
INFO = 6
WARNING = 4

[myxml_decoder.message_fields]
Pid = "%Pid%"
StatCount = "%Count%"
StatName =  "%Name%"
Timestamp = "%Timestamp%"

PayloadXmlDecoder’s xpath_map config subsection supports XPath as implemented by the xmlpath library.

  • All axes are supported (“child”, “following-sibling”, etc)
  • All abbreviated forms are supported (”.”, “//”, etc)
  • All node types except for namespace are supported
  • Predicates are restricted to [N], [path], and [path=literal] forms
  • Only a single predicate is supported per path step
  • Richer expressions and namespaces are not supported

Protobuf Decoder

Plugin Name: ProtobufDecoder

The ProtobufDecoder is used for Heka message objects that have been serialized into protocol buffers format. This is the format that Heka uses to communicate with other Heka instances, so one will always be included in your Heka configuration under the name “ProtobufDecoder”, whether specified or not. The ProtobufDecoder has no configuration options.

The hekad protocol buffers message schema is defined in the message.proto file in the message package.

Example:

[ProtobufDecoder]

Rsyslog Decoder

New in version 0.5.

Plugin Name: SandboxDecoder
File Name: lua_decoders/rsyslog.lua

Parses the rsyslog output using the string based configuration template.

Config:

  • hostname_keep (boolean, defaults to false)

    Always preserve the original ‘Hostname’ field set by Logstreamer’s ‘hostname’ configuration setting.

  • template (string)

    The ‘template’ configuration string from rsyslog.conf. http://rsyslog-5-8-6-doc.neocities.org/rsyslog_conf_templates.html

  • tz (string, optional, defaults to UTC)

    If your rsyslog timestamp field in the template does not carry zone offset information, you may set an offset to be applied to your events here. Typically this would be used with the “Traditional” rsyslog formats.

    Parsing is done by Go, supports values of “UTC”, “Local”, or a location name corresponding to a file in the IANA Time Zone database, e.g. “America/New_York”.

Example Heka Configuration

[RsyslogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/rsyslog.lua"

[RsyslogDecoder.config]
type = "RSYSLOG_TraditionalFileFormat"
template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'
tz = "America/Los_Angeles"

Example Heka Message

Timestamp:2014-02-10 12:58:58 -0800 PST
Type:RSYSLOG_TraditionalFileFormat
Hostname:trink-x230
Pid:0
UUID:e0eef205-0b64-41e8-a307-5772b05e16c1
Logger:RsyslogInput
Payload:“imklog 5.8.6, log source = /proc/kmsg started.”
EnvVersion:
Severity:7
Fields:
name:”programname” value_string:”kernel”

Sandbox Decoder

Plugin Name: SandboxDecoder

The SandboxDecoder provides an isolated execution environment for data parsing and complex transformations without the need to recompile Heka. See Sandbox.

Config:

Example

[sql_decoder]
type = "SandboxDecoder"
filename = "sql_decoder.lua"

Scribble Decoder

New in version 0.5.

Plugin Name: ScribbleDecoder

The ScribbleDecoder is a trivial decoder that makes it possible to set one or more static field values on every decoded message. It is often used in conjunction with another decoder (i.e. in a MultiDecoder w/ cascade_strategy set to “all”) to, for example, set the message type of every message to a specific custom value after the messages have been decoded from Protocol Buffers format. Note that this only supports setting the exact same value on every message, if any dynamic computation is required to determine what the value should be, or whether it should be applied to a specific message, a Sandbox Decoder using the provided write_message API call should be used instead.

Config:

  • message_fields:

    Subsection defining message fields to populate. Optional representation metadata can be added at the end of the field name using a pipe delimiter i.e. host|ipv4 = “192.168.55.55” will create Fields[Host] containing an IPv4 address. Adding a representation string to a standard message header name will cause it to be added as a user defined field, i.e. Payload|json will create Fields[Payload] with a json representation (see Field Variables). Does not support Timestamp or Uuid.

Example (in MultiDecoder context)

[AccesslogDecoder]
type = "MultiDecoder"
subs = ["AccesslogApacheDecoder", "EnvironmentScribbler"]
cascade_strategy = "all"
log_sub_errors = true

[AccesslogApacheDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/apache_access.lua"

    [AccesslogApacheDecoder.config]
    type = "combinedio"
    user_agent_transform = true
    user_agent_conditional = true
    log_format = '%h %l %u %t %m \"%U\" \"%q\" \"%H\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %T %I %O'

[EnvironmentScribbler]
type = "ScribbleDecoder"

    [EnvironmentScribbler.message_fields]
    Environment = "production"

You can also add static fields to messages decoded by a downstream heka instance that forwards messages for further processing.

[mytypedecoder]
type = "MultiDecoder"
subs = ["ProtobufDecoder", "mytype"]
cascade_strategy = "all"
log_sub_errors = true

[ProtobufDecoder]

[mytype]
type = "ScribbleDecoder"

    [mytype.message_fields]
    Type = "MyType"

Message scribbling is commonly performed after the lua sandboxed decoders have been applied. Otherwise the scribbled field may get discarded, depending on the decoder implementation.

Stats To Fields Decoder

New in version 0.4.

Plugin Name: StatsToFieldsDecoder

The StatsToFieldsDecoder will parse time series statistics data in the graphite message format and encode the data into the message fields, in the same format produced by a Stat Accumulator Input plugin with the emit_in_fields value set to true. This is useful if you have externally generated graphite string data flowing through Heka that you’d like to process without having to roll your own string parsing code.

This decoder has no configuration options. It simply expects to be passed messages with statsd string data in the payload. Incorrect or malformed content will cause a decoding error, dropping the message.

The fields format only contains a single “timestamp” field, so any payloads containing multiple timestamps will end up generating a separate message for each timestamp. Extra messages will be a copy of the original message except a) the payload will be empty and b) the unique timestamp and related stats will be the only message fields.

Example:

[StatsToFieldsDecoder]

Filters

Common Filter Parameters

There are some configuration options that are universally available to all Heka filter plugins. These will be consumed by Heka itself when Heka initializes the plugin and do not need to be handled by the plugin-specific initialization code.

  • message_matcher (string, optional):

    Boolean expression, when evaluated to true passes the message to the filter for processing. Defaults to matching nothing. See: Message Matcher Syntax

  • message_signer (string, optional):

    The name of the message signer. If specified only messages with this signer are passed to the filter for processing.

  • ticker_interval (uint, optional):

    Frequency (in seconds) that a timer event will be sent to the filter. Defaults to not sending timer events.

New in version 0.7.

  • can_exit (bool, optional)

    Whether or not this plugin can exit without causing Heka to shutdown. Defaults to false for non-sandbox filters, and true for sandbox filters.

New in version 0.10.

  • use_buffering (bool, optional)

    If true, all messages delivered to this filter will be buffered to disk before delivery, preventing back pressure and allowing retries in cases of message processing failure. Defaults to false, unless otherwise specified by the individual filter’s documentation.

  • buffering (QueueBufferConfig, optional)

    A sub-section that specifies the settings to be used for the buffering behavior. This will only have any impact if use_buffering is set to true. See Configuring Buffering.

Circular Buffer Delta Aggregator

New in version 0.5.

Plugin Name: SandboxFilter
File Name: lua_filters/cbufd_aggregator.lua

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). The purpose is to recreate the view at a larger scope in each level of the aggregation i.e., host view -> datacenter view -> service level view.

Config:

  • enable_delta (bool, optional, default false)

    Specifies whether or not this aggregator should generate cbuf deltas.

  • anomaly_config(string) - (see Anomaly Detection Module)

    A list of anomaly detection specifications. If not specified no anomaly detection/alerting will be performed.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[TelemetryServerMetricsAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsAggregator.config]
enable_delta = false
anomaly_config = 'roc("Request Statistics", 1, 15, 0, 1.5, true, false)'
preservation_version = 0

CBuf Delta Aggregator By Hostname

New in version 0.5.

Plugin Name: SandboxFilter
File Name: lua_filters/cbufd_host_aggregatory.lua

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). Each column from the source circular buffer will become its own graph. i.e., ‘Error Count’ will become a graph with each host being represented in a column.

Config:

  • max_hosts (uint)

    Pre-allocates the number of host columns in the graph(s). If the number of active hosts exceed this value, the plugin will terminate.

  • rows (uint)

    The number of rows to keep from the original circular buffer. Storing all the data from all the hosts is not practical since you will most likely run into memory and output size restrictions (adjust the view down as necessary).

  • host_expiration (uint, optional, default 120 seconds)

    The amount of time a host has to be inactive before it can be replaced by a new host.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the max_hosts or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[TelemetryServerMetricsHostAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_host_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsHostAggregator.config]
max_hosts = 5
rows = 60
host_expiration = 120
preservation_version = 0

Counter Filter

Plugin Name: CounterFilter

Once per ticker interval a CounterFilter will generate a message of type heka .counter-output. The payload will contain text indicating the number of messages that matched the filter’s message_matcher value during that interval (i.e. it counts the messages the plugin received). Every ten intervals an extra message (also of type heka.counter-output) goes out, containing an aggregate count and average per second throughput of messages received.

Config:

  • ticker_interval (int, optional):

    Interval between generated counter messages, in seconds. Defaults to 5.

Example:

[CounterFilter]
message_matcher = "Type != 'heka.counter-output'"

CPU Stats Filter

New in version 0.10.

Plugin Name: SandboxFilter
File Name: lua_filters/procstat.lua

Calculates deltas in /proc/stat data. Also emits CPU percentage utilization information.

Config:

  • whitelist (string, optional, default “”)

    Only process fields that fit the pattern, defaults to match all.

  • extras (boolean, optional, default false)

    Process extra fields like ctxt, softirq, cpu fields.

  • percent_integer (boolean, optional, default true)

    Process percentage as whole number.

Example Heka Configuration

[ProcStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/stat"
decoder = "ProcStatDecoder"

[ProcStatDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_procstat.lua"

[ProcStatFilter]
type = "SandboxFilter"
filename = "lua_filters/procstat.lua"
preserve_data = true
message_matcher = "Type == 'stats.procstat'"
[ProcStatFilter.config]
    whitelist = "cpu$"
    extras = false
    percent_integer = true
Cpu fields:

1 2 3 4 5 6 7 8 9 10 user nice system idle [iowait] [irq] [softirq] [steal] [guest] [guestnice] Note: systems provide user, nice, system, idle. Other fields depend on kernel.

user: Time spent executing user applications (user mode). nice: Time spent executing user applications with low priority (nice). system: Time spent executing system calls (system mode). idle: Idle time. iowait: Time waiting for I/O operations to complete. irq: Time spent servicing interrupts. softirq: Time spent servicing soft-interrupts. steal: ticks spent executing other virtual hosts [virtualization setups] guest: Used in virtualization setups. guestnice: running a niced guest

intr
This line shows counts of interrupts serviced since boot time, for each of the possible system interrupts. The first column is the total of all interrupts serviced including unnumbered architecture specific interrupts; each subsequent column is the total for that particular numbered interrupt. Unnumbered interrupts are not shown, only summed into the total.
ctxt 115315
The number of context switches that the system underwent.
btime 769041601
Boot time, in seconds since the Epoch, 1970-01-01 00:00:00 +0000 (UTC).
processes 86031
Number of forks since boot.
procs_running 6
Number of process in runnable state. (Linux 2.5.45 onward.)
procs_blocked 2
Number of process blocked waiting for I/O to complete. (Linux 2.5.45 onward.)
softirq 288977 23 101952 19 13046 19217 7 19125 92077 389 43122
Time spent servicing soft-interrupts.

Disk Stats Filter

New in version 0.7.

Plugin Name: SandboxFilter
File Name: lua_filters/diskstats.lua

Graphs disk IO stats. It automatically converts the running totals of Writes and Reads into rates of the values. The time based fields are left as running totals of the amount of time doing IO. Expects to receive messages with disk IO data embedded in a particular set of message fields which matches what is generated by Linux Disk Stats Decoder: WritesCompleted, ReadsCompleted, SectorsWritten, SectorsRead, WritesMerged, ReadsMerged, TimeWriting, TimeReading, TimeDoingIO, WeightedTimeDoingIO, TickerInterval.

Config:

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config(string) - (see Anomaly Detection Module)

Example Heka Configuration

[DiskStatsFilter]
type = "SandboxFilter"
filename = "lua_filters/diskstats.lua"
preserve_data = true
message_matcher = "Type == 'stats.diskstats'"
ticker_interval = 10

Frequent Items

New in version 0.5.

Plugin Name: SandboxFilter
File Name: lua_filters/frequent_items.lua

Calculates the most frequent items in a data stream.

Config:

  • message_variable (string)

    The message variable name containing the items to be counted.

  • max_items (uint, optional, default 1000)

    The maximum size of the sample set (higher will produce a more accurate list).

  • min_output_weight (uint, optional, default 100)

    Used to reduce the long tail output by only outputting the higher frequency items.

  • reset_days (uint, optional, default 1)

    Resets the list after the specified number of days (on the UTC day boundary). A value of 0 will never reset the list.

Example Heka Configuration

[FxaAuthServerFrequentIP]
type = "SandboxFilter"
filename = "lua_filters/frequent_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerFrequentIP.config]
message_variable = "Fields[remote_addr]"
max_items = 10000
min_output_weight = 100
reset_days = 1

Heka Memory Statistics

New in version 0.6.

Plugin Name: SandboxFilter
File Name: lua_filters/heka_memstat.lua

Graphs the Heka memory statistics using the heka.memstat message generated by pipeline/report.go.

Config:

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the rows or sec_per_row configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[HekaMemstat]
type = "SandboxFilter"
filename = "lua_filters/heka_memstat.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'heka.memstat'"

Heka Message Schema

New in version 0.5.

Plugin Name: SandboxFilter
File Name: lua_filters/heka_message_schema.lua

Generates documentation for each unique message in a data stream. The output is a hierarchy of Logger, Type, EnvVersion, and a list of associated message field attributes including their counts (number in the brackets). This plugin is meant for data discovery/exploration and should not be left running on a production system.

Config:

<none>

Example Heka Configuration

[SyncMessageSchema]
type = "SandboxFilter"
filename = "lua_filters/heka_message_schema.lua"
ticker_interval = 60
preserve_data = false
message_matcher = "Logger != 'SyncMessageSchema' && Logger =~ /^Sync/"

Example Output

Sync-1_5-Webserver [54600]
slf [54600]
-no version- [54600]
upstream_response_time (mismatch)
http_user_agent (string)
body_bytes_sent (number)
remote_addr (string)
request (string)
upstream_status (mismatch)
status (number)
request_time (number)
request_length (number)
Sync-1_5-SlowQuery [37]
mysql.slow-query [37]
-no version- [37]
Query_time (number)
Rows_examined (number)
Rows_sent (number)
Lock_time (number)

Heartbeat monitoring per host

New in version 0.9.

Plugin Name: SandboxFilter
File Name: lua_filters/heartbeat.lua

Heartbeat monitoring per host.

Generates a JSON structure that can be used to create a custom heartbeat dashboard. The output consists of a row per host which includes the host’s last_heartbeat, last_alert and status.

This plugin also sends an alert when the heartbeat_timeout is exceeded and supports alert throttling to reduce noise.

Config:

  • heartbeat_timeout (uint, optional, default 30)

    Sets the maximum duration (in seconds) between heartbeats before an alert is sent.

  • alert_throttle (uint, optional, default 300)

    Sets the minimum duration (in seconds) between alert event outputs.

Example Heka Configuration

[heartbeat]
type = "SandboxFilter"
filename = "lua_filters/heartbeat.lua"
ticker_interval = 30
preserve_data = true
message_matcher = "Type == 'heartbeat'"

  [heartbeat.config]
  heartbeat_timeout = 30
  alert_throttle = 300

[alert-encoder]
type = "SandboxEncoder"
filename = "lua_encoders/alert.lua"

[email-alert]
type = "SmtpOutput"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'alert'"
send_from = "acme-alert@example.com"
send_to = ["admin@example.com"]
auth = "Plain"
user = "smtp-user"
password = "smtp-pass"
host = "mail.example.com:25"
encoder = "alert"

Example Output

{"ip-10-0-0-11":{"last_heartbeat":1415311858257,"status":"up","last_alert":1415310603648},"ip-10-0-0-187":{"last_heartbeat":1415311856214,"status":"down","last_alert":1415310603648}
Timestamp:2014-05-14T14:20:18Z
Hostname:ip-10-226-204-51
Plugin:heartbeat
Alert:Missing Heartbeat - ip-10-0-0-187

HTTP Status Graph

New in version 0.5.

Plugin Name: SandboxFilter
File Name: lua_filters/http_status.lua

Graphs HTTP status codes using the numeric status code collected from web server access logs.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • status_field (string, optional, default “status”)

    Sets the message field containing the numeric HTTP status code.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • alert_throttle (uint, optional, default 3600)

    Sets the throttle for the anomaly alert, in seconds.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[FxaAuthServerHTTPStatus]
type = "SandboxFilter"
filename = "lua_filters/http_status.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerHTTPStatus.config]
sec_per_row = 60
rows = 1440
anomaly_config = 'roc("HTTP Status", 2, 15, 0, 1.5, true, false) roc("HTTP Status", 4, 15, 0, 1.5, true, false) mww_nonparametric("HTTP Status", 5, 15, 10, 0.8)'
alert_throttle = 300
preservation_version = 0

InfluxDB Batch Filter

New in version 0.10.

Plugin Name: SandboxFilter
File Name: lua_filters/influx_batch.lua

Converts Heka message contents to InfluxDB v0.9.0 API format and periodically emits batch messages with a payload ready for delivery via HTTP.

Optionally includes all standard message fields as tags or fields and iterates through all of the dynamic fields to add as points (series), skipping any fields explicitly omitted using the skip_fields config option. It can also map any Heka message fields as tags in the request sent to the InfluxDB write API, using the tag_fields config option. All dynamic fields in the Heka message are converted to separate points separated by newlines that are submitted to InfluxDB.

Note

This filter is intended for use with InfluxDB versions 0.9 or greater.

Config:

  • decimal_precision (string, optional, default “6”)

    String that is used in the string.format function to define the number of digits printed after the decimal in number values. The string formatting of numbers is forced to print with floating points because InfluxDB will reject values that change from integers to floats and vice-versa. By forcing all numbers to floats, we ensure that InfluxDB will always accept our numerical values, regardless of the initial format.

  • name_prefix (string, optional, default nil)

    String to use as the name key’s prefix value in the generated line. Supports message field interpolation. %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for name name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the name, those values will be interpreted as referring to dynamic message fields.

  • name_prefix_delimiter (string, optional, default nil)

    String to use as the delimiter between the name_prefix and the field name. This defaults to a blank string but can be anything else instead (such as ”.” to use Graphite-like naming).

  • skip_fields (string, optional, default nil)

    Space delimited set of fields that should not be included in the InfluxDB measurements being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field. The magic value “all_base” can be used to exclude base fields from being mapped to the event altogether (useful if you don’t want to use tags and embed them in the name_prefix instead).

  • source_value_field (string, optional, default nil)

    If the desired behavior of this encoder is to extract one field from the Heka message and feed it as a single line to InfluxDB, then use this option to define which field to find the value from. Be careful to set the name_prefix field if this option is present or no measurement name will be present when trying to send to InfluxDB. When this option is present, no other fields besides this one will be sent to InfluxDB as a measurement whatsoever.

  • tag_fields (string, optional, default “all_base”)

    Take fields defined and add them as tags of the measurement(s) sent to InfluxDB for the message. The magic values “all” and “all_base” are used to map all fields (including taggable base fields) to tags and only base fields to tags, respectively. If those magic values aren’t used, then only those fields defined will map to tags of the measurement sent to InfluxDB. The tag_fields values are independent of the skip_fields values and have no affect on each other. You can skip fields from being sent to InfluxDB as measurements, but still include them as tags.

  • timestamp_precision (string, optional, default “ms”)

    Specify the timestamp precision that you want the event sent with. The default is to use milliseconds by dividing the Heka message timestamp by 1e6, but this math can be altered by specifying one of the precision values supported by the InfluxDB write API (ms, s, m, h). Other precisions supported by InfluxDB of n and u are not yet supported.

  • value_field_key (string, optional, default “value”)

    This defines the name of the InfluxDB measurement. We default this to “value” to match the examples in the InfluxDB documentation, but you can replace that with anything else that you prefer.

  • flush_count (string, optional, default 0)

    Specifies a number of messages that will trigger a batch flush, if received before a timer tick. Values of zero or lower mean to never flush on message count, only on ticker intervals.

Example Heka Configuration

[LoadAvgPoller]
type = "FilePollingInput"
ticker_interval = 5
file_path = "/proc/loadavg"
decoder = "LinuxStatsDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

[LinuxStatsDecoder]
type = "MultiDecoder"
subs = ["LoadAvgDecoder", "AddStaticFields"]
cascade_strategy = "all"
log_sub_errors = false

[AddStaticFields]
type = "ScribbleDecoder"

    [AddStaticFields.message_fields]
    Environment = "dev"

[InfluxdbLineFilter]
type = "SandboxFilter"
message_matcher = "Type =~ /stats.*/"
filename = "lua_filters/influx_batch.lua"

    [InfluxdbLineFilter.config]
    skip_fields = "**all_base** FilePath NumProcesses Environment TickerInterval"
    tag_fields = "Hostname Environment"
    timestamp_precision= "s"
    flush_count = 10000

[PayloadEncoder]

[InfluxdbOutput]
type = "HttpOutput"
message_matcher = "Fields[payload_name] == 'influx_line'"
encoder = "PayloadEncoder"
address = "http://influxdbserver.example.com:8086/write?db=mydb&rp=mypolicy&precision=s"
username = "influx_username"
password = "influx_password"

Load Average Filter

New in version 0.7.

Plugin Name: SandboxFilter
File Name: lua_filters/loadavg.lua

Graphs the load average and process count data. Expects to receive messages containing fields entitled 1MinAvg, 5MinAvg, 15MinAvg, and NumProcesses, such as those generated by the Linux Load Average Decoder.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[LoadAvgFilter]
type = "SandboxFilter"
filename = "lua_filters/loadavg.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.loadavg'"

Memory Stats Filter

New in version 0.7.

Plugin Name: SandboxFilter
File Name: lua_filters/memstats.lua

Graphs memory usage statistics. Expects to receive messages with memory usage data embedded in a specific set of message fields, which matches the messages generated by Linux Memory Stats Decoder: MemFree, Cached, Active, Inactive, VmallocUsed, Shmem, SwapCached.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[MemoryStatsFilter]
type = "SandboxFilter"
filename = "lua_filters/memstats.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.memstats'"

MySQL Slow Query

New in version 0.6.

Plugin Name: SandboxFilter
File Name: lua_filters/mysql_slow_query.lua

Graphs MySQL slow query data produced by the MySQL Slow Query Log Decoder.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[Sync-1_5-SlowQueries]
type = "SandboxFilter"
message_matcher = "Logger == 'Sync-1_5-SlowQuery'"
ticker_interval = 60
filename = "lua_filters/mysql_slow_query.lua"

    [Sync-1_5-SlowQueries.config]
    anomaly_config = 'mww_nonparametric("Statistics", 5, 15, 10, 0.8)'
    preservation_version = 0

Sandbox Filter

Plugin Name: SandboxFilter

The sandbox filter provides an isolated execution environment for data analysis. Any output generated by the sandbox is injected into the payload of a new message for further processing or to be output.

Config:

Example:

[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
filename = "counter.lua"
preserve_data = true
profile = false

    [hekabench_counter.config]
    rows = 1440
    sec_per_row = 60

Sandbox Manager Filter

Plugin Name: SandboxManagerFilter

The SandboxManagerFilter provides dynamic control (start/stop) of sandbox filters in a secure manner without stopping the Heka daemon. Commands are sent to a SandboxManagerFilter using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.

SandboxManagerFilter Settings

  • Common Filter Parameters

  • working_directory (string):

    The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.

  • module_directory (string):

    The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules.

  • max_filters (uint):

    The maximum number of filters this manager can run.

New in version 0.5.

  • memory_limit (uint):

    The number of bytes managed sandboxes are allowed to consume before being terminated (default 8MiB).

  • instruction_limit (uint):

    The number of instructions managed sandboxes are allowed to execute during the process_message/timer_event functions before being terminated (default 1M).

  • output_limit (uint):

    The number of bytes managed sandbox output buffers can hold before being terminated (default 63KiB). Warning: messages exceeding 64KiB will generate an error and be discarded by the standard output plugins (File, TCP, UDP) since they exceed the maximum message size.

Example

[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
# message_matcher = "Type == 'heka.control.sandbox'" # automatic default setting
max_filters = 100

Stat Filter

Plugin Name: StatFilter

Filter plugin that accepts messages of a specified form and uses extracted message data to feed statsd-style numerical metrics in the form of Stat objects to a StatAccumulator.

Config:

  • Metric:

    Subsection defining a single metric to be generated. Both the name and value fields for each metric support interpolation of message field values (from ‘Type’, ‘Hostname’, ‘Logger’, ‘Payload’, or any dynamic field name) with the use of %% delimiters, so %Hostname% would be replaced by the message’s Hostname field, and %Foo% would be replaced by the first value of a dynamic field called “Foo”:

    • type (string):

      Metric type, supports “Counter”, “Timer”, “Gauge”.

    • name (string):

      Metric name, must be unique.

    • value (string):

      Expression representing the (possibly dynamic) value that the StatFilter should emit for each received message.

    • replace_dot (boolean):

      Replace all dots . per an underscore _ during the string interpolation. It’s useful if you output this result in a graphite instance.

  • stat_accum_name (string):

    Name of a StatAccumInput instance that this StatFilter will use as its StatAccumulator for submitting generate stat values. Defaults to “StatAccumInput”.

Example:

[StatAccumInput]
ticker_interval = 5

[StatsdInput]
address = "127.0.0.1:29301"

[Hits]
type = "StatFilter"
message_matcher = 'Type == "ApacheLogfile"'

[Hits.Metric.bandwidth]
type = "Counter"
name = "httpd.bytes.%Hostname%"
value = "%Bytes%"

[Hits.Metric.method_counts]
type = "Counter"
name = "httpd.hits.%Method%.%Hostname%"
value = "1"

Note

StatFilter requires an available StatAccumInput to be running.

Stats Graph

New in version 0.7.

Plugin Name: SandboxFilter
File Name: lua_filters/stat_graph.lua

Converts stat values extracted from statmetric messages (see Stat Accumulator Input) to circular buffer data and periodically emits messages containing this data to be graphed by a DashboardOutput. Note that this filter expects the stats data to be available in the message fields, so the StatAccumInput must be configured with emit_in_fields set to true for this filter to work correctly.

Config:

  • title (string, optional, default “Stats”):

    Title for the graph output generated by this filter.

  • rows (uint, optional, default 300):

    The number of rows to store in our circular buffer. Each row represents one time interval.

  • sec_per_row (uint, optional, default 1):

    The number of seconds in each circular buffer time interval.

  • stats (string):

    Space separated list of stat names. Each specified stat will be expected to be found in the fields of the received statmetric messages, and will be extracted and inserted into its own column in the accumulated circular buffer.

  • stat_labels (string):

    Space separated list of header label names to use for the extracted stats. Must be in the same order as the specified stats. Any label longer than 15 characters will be truncated.

  • anomaly_config (string, optional):

    Anomaly detection configuration, see Anomaly Detection Module.

  • preservation_version (uint, optional, default 0):

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time any edits are made to your rows, sec_per_row, stats, or stat_labels values, or else Heka will fail to start because the preserved data will no longer match the filter’s data structure.

  • stat_aggregation (string, optional, default “sum”):
    Controls how the column data is aggregated when combining multiple circular buffers.

    “sum” - The total is computed for the time/column (default). “min” - The smallest value is retained for the time/column. “max” - The largest value is retained for the time/column. “none” - No aggregation will be performed the column.

  • stat_unit (string, optional, default “count”):

    The unit of measure (maximum 7 characters). Alpha numeric, ‘/’, and ‘*’ characters are allowed everything else will be converted to underscores. i.e. KiB, Hz, m/s (default: count).

Example Heka Configuration

[stat-graph]
type = "SandboxFilter"
filename = "lua_filters/stat_graph.lua"
ticker_interval = 10
preserve_data = true
message_matcher = "Type == 'heka.statmetric'"

  [stat-graph.config]
  title = "Hits and Misses"
  rows = 1440
  stat_aggregation = "none"
  stat_unit = "count"
  sec_per_row = 10
  stats = "stats.counters.hits.count stats.counters.misses.count"
  stat_labels = "hits misses"
  anomaly_config = 'roc("Hits and Misses", 1, 15, 0, 1.5, true, false) roc("Hits and Misses", 2, 15, 0, 1.5, true, false)'
  preservation_version = 0

Unique Items

New in version 0.6.

Plugin Name: SandboxFilter
File Name: lua_filters/unique_items.lua

Counts the number of unique items per configurable time period e.g. active daily users by uid.

Config:

  • message_variable (string, required)

    The Heka message variable containing the item to be counted.

  • title (string, optional, default “Estimated Unique Daily message_variable”)

    The graph title for the cbuf output.

  • enable_delta (bool, optional, default false)

    Specifies whether or not this plugin should generate cbuf deltas. Deltas should be enabled when sharding is used; see: Circular Buffer Delta Aggregator.

  • rows (uint, optional, default 365)

    The numbers of rows or time periods to keep in history.

  • seconds_per_row (uint, optional, default 86400)

    The number of seconds per row or time period, before switching to the next row.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[FxaActiveDailyUsers]
type = "SandboxFilter"
filename = "lua_filters/unique_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'FxaAuth' && Type == 'request.summary' && Fields[path] == '/v1/certificate/sign' && Fields[errno] == 0"

    [FxaActiveDailyUsers.config]
    message_variable = "Fields[uid]"
    title = "Estimated Active Users Per Day"
    preservation_version = 0

[FxaActiveHourlyUsers]
type = "SandboxFilter"
filename = "lua_filters/unique_items.lua"
ticker_interval = 60
rows = 24
seconds_per_row = 3600
preserve_data = true
message_matcher = "Logger == 'FxaAuth' && Type == 'request.summary' && Fields[path] == '/v1/certificate/sign' && Fields[errno] == 0"

    [FxaActiveHourlyUsers.config]
    message_variable = "Fields[uid]"
    title = "Estimated Active Users Per Hour"
    preservation_version = 0

Encoders

Alert Encoder

Plugin Name: SandboxEncoder
File Name: lua_encoders/alert.lua

Produces more human readable alert messages.

Config:

<none>

Example Heka Configuration

[FxaAlert]
type = "SmtpOutput"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'alert' && Logger =~ /^Fxa/" || Type == 'heka.sandbox-terminated' && Fields[plugin] =~ /^Fxa/"
send_from = "heka@example.com"
send_to = ["alert@example.com"]
auth = "Plain"
user = "test"
password = "testpw"
host = "localhost:25"
encoder = "AlertEncoder"

[AlertEncoder]
type = "SandboxEncoder"
filename = "lua_encoders/alert.lua"

Example Output

Timestamp:2014-05-14T14:20:18Z
Hostname:ip-10-226-204-51
Plugin:FxaBrowserIdHTTPStatus
Alert:HTTP Status - algorithm: roc col: 1 msg: detected anomaly, standard deviation exceeds 1.5

CBUF Librato Encoder

New in version 0.8.

Plugin Name: SandboxEncoder
File Name: lua_encoders/cbuf_librato.lua

Extracts data from SandboxFilter circular buffer output messages and uses it to generate time series JSON structures that will be accepted by Librato’s POST API. It will keep track of the last time it’s seen a particular message, keyed by filter name and output name. The first time it sees a new message, it will send data from all of the rows except the last one, which is possibly incomplete. For subsequent messages, the encoder will automatically extract data from all of the rows that have elapsed since the last message was received.

The SandboxEncoder preserve_data setting should be set to true when using this encoder, or else the list of received messages will be lost whenever Heka is restarted, possibly causing the same data rows to be sent to Librato multiple times.

Config:

  • message_key (string, optional, default “%{Logger}:%{payload_name}”)

    String to use as the key to differentiate separate cbuf messages from each other. Supports message field interpolation.

Example Heka Configuration

[cbuf_librato_encoder]
type = "SandboxEncoder"
filename = "lua_encoders/cbuf_librato.lua"
preserve_data = true
  [cbuf_librato_encoder.config]
  message_key = "%{Logger}:%{Hostname}:%{payload_name}"

[librato]
type = "HttpOutput"
message_matcher = "Type == 'heka.sandbox-output && Fields[payload_type] == 'cbuf'"
encoder = "cbuf_librato_encoder"
address = "https://metrics-api.librato.com/v1/metrics"
username = "username@example.com"
password = "SECRET"
    [librato.headers]
    Content-Type = ["application/json"]

Example Output

{"gauges":[{"value":12,"measure_time":1410824950,"name":"HTTP_200","source":"thor"},{"value":1,"measure_time":1410824950,"name":"HTTP_300","source":"thor"},{"value":1,"measure_time":1410824950,"name":"HTTP_400","source":"thor"}]}

ElasticSearch JSON Encoder

Plugin Name: ESJsonEncoder

This encoder serializes a Heka message into a clean JSON format, preceded by a separate JSON structure containing information required for ElasticSearch BulkAPI indexing. The JSON serialization is done by hand, without the use of Go’s stdlib JSON marshalling. This is so serialization can succeed even if the message contains invalid UTF-8 characters, which will be encoded as U+FFFD.

Config:

  • index (string):

    Name of the ES index into which the messages will be inserted. Supports interpolation of message field values (from ‘Type’, ‘Hostname’, ‘Pid’, ‘UUID’, ‘Logger’, ‘EnvVersion’, ‘Severity’, a field name, or a timestamp format) with the use of ‘%{}’ chars, so ‘%{Hostname}-%{Logger}-data’ would add the records to an ES index called ‘some.example.com-processname-data’. Allows to use strftime format codes. Defaults to ‘heka-%{%Y.%m.%d}’.

  • type_name (string):

    Name of ES record type to create. Supports interpolation of message field values (from ‘Type’, ‘Hostname’, ‘Pid’, ‘UUID’, ‘Logger’, ‘EnvVersion’, ‘Severity’, field name, or a timestamp format) with the use of ‘%{}’ chars, so ‘%{Hostname}-stat’ would create an ES record with a type of ‘some.example.com-stat’. Defaults to ‘message’.

  • fields ([]string):

    The ‘fields’ parameter specifies that only specific message data should be indexed into ElasticSearch. Available fields to choose are “Uuid”, “Timestamp”, “Type”, “Logger”, “Severity”, “Payload”, “EnvVersion”, “Pid”, “Hostname”, and “DynamicFields” (where “DynamicFields” causes the inclusion of dynamically specified message fields, see dynamic_fields). Defaults to including all of the supported message fields.

  • timestamp (string):

    Format to use for timestamps in generated ES documents. Allows to use strftime format codes. Defaults to “%Y-%m-%dT%H:%M:%S”.

  • es_index_from_timestamp (bool):

    When generating the index name use the timestamp from the message instead of the current time. Defaults to false.

  • id (string):

    Allows you to optionally specify the document id for ES to use. Useful for overwriting existing ES documents. If the value specified is placed within %{}, it will be interpolated to its Field value. Default is allow ES to auto-generate the id.

  • raw_bytes_fields ([]string):

    This specifies a set of fields which will be passed through to the encoded JSON output without any processing or escaping. This is useful for fields which contain embedded JSON objects to prevent the embedded JSON from being escaped as normal strings. Only supports dynamically specified message fields.

  • field_mappings (map[string]string):

    Maps Heka message fields to custom ES keys. Can be used to implement a custom format in ES or implement Logstash V1. The available fields are “Timestamp”, “Uuid”, “Type”, “Logger”, “Severity”, “Payload”, “EnvVersion”, “Pid” and “Hostname”.

  • dynamic_fields ([]string):

    This specifies which of the message’s dynamic fields should be included in the JSON output. Defaults to including all of the messages dynamic fields. If dynamic_fields is non-empty, then the fields list must contain “DynamicFields” or an error will be raised.

Example

[ESJsonEncoder]
index = "%{Type}-%{%Y.%m.%d}"
es_index_from_timestamp = true
type_name = "%{Type}"
    [ESJsonEncoder.field_mappings]
    Timestamp = "@timestamp"
    Severity = "level"

[ElasticSearchOutput]
message_matcher = "Type == 'nginx.access'"
encoder = "ESJsonEncoder"
flush_interval = 50

ElasticSearch Logstash V0 Encoder

Plugin Name: ESLogstashV0Encoder

This encoder serializes a Heka message into a JSON format, preceded by a separate JSON structure containing information required for ElasticSearch BulkAPI indexing. The message JSON structure uses the original (i.e. “v0”) schema popularized by Logstash. Using this schema can aid integration with existing Logstash deployments. This schema also plays nicely with the default Logstash dashboard provided by Kibana.

The JSON serialization is done by hand, without using Go’s stdlib JSON marshalling. This is so serialization can succeed even if the message contains invalid UTF-8 characters, which will be encoded as U+FFFD.

Config:

  • index (string):

    Name of the ES index into which the messages will be inserted. Supports interpolation of message field values (from ‘Type’, ‘Hostname’, ‘Pid’, ‘UUID’, ‘Logger’, ‘EnvVersion’, ‘Severity’, a field name, or a timestamp format) with the use of ‘%{}’ chars, so ‘%{Hostname}-%{Logger}-data’ would add the records to an ES index called ‘some.example.com-processname-data’. Defaults to ‘logstash-%{2006.01.02}’.

  • type_name (string):

    Name of ES record type to create. Supports interpolation of message field values (from ‘Type’, ‘Hostname’, ‘Pid’, ‘UUID’, ‘Logger’, ‘EnvVersion’, ‘Severity’, field name, or a timestamp format) with the use of ‘%{}’ chars, so ‘%{Hostname}-stat’ would create an ES record with a type of ‘some.example.com-stat’. Defaults to ‘message’.

  • use_message_type (bool):

    If false, the generated JSON’s @type value will match the ES record type specified in the type_name setting. If true, the message’s Type value will be used as the @type value instead. Defaults to false.

  • fields ([]string):

    The ‘fields’ parameter specifies that only specific message data should be indexed into ElasticSearch. Available fields to choose are “Uuid”, “Timestamp”, “Type”, “Logger”, “Severity”, “Payload”, “EnvVersion”, “Pid”, “Hostname”, and “DynamicFields” (where “DynamicFields” causes the inclusion of dynamically specified message fields, see dynamic_fields). Defaults to including all of the supported message fields. The “Payload” field is sent to ElasticSearch as “@message”.

  • timestamp (string):

    Format to use for timestamps in generated ES documents. Allows to use strftime format codes. Defaults to “%Y-%m-%dT%H:%M:%S”.

  • es_index_from_timestamp (bool):

    When generating the index name use the timestamp from the message instead of the current time. Defaults to false.

  • id (string):

    Allows you to optionally specify the document id for ES to use. Useful for overwriting existing ES documents. If the value specified is placed within %{}, it will be interpolated to its Field value. Default is allow ES to auto-generate the id.

  • raw_bytes_fields ([]string):

    This specifies a set of fields which will be passed through to the encoded JSON output without any processing or escaping. This is useful for fields which contain embedded JSON objects to prevent the embedded JSON from being escaped as normal strings. Only supports dynamically specified message fields.

  • dynamic_fields ([]string):

    This specifies which of the message’s dynamic fields should be included in the JSON output. Defaults to including all of the messages dynamic fields. If dynamic_fields is non-empty, then the fields list must contain “DynamicFields” or an error will be raised.

Example

[ESLogstashV0Encoder]
es_index_from_timestamp = true
type_name = "%{Type}"

[ElasticSearchOutput]
message_matcher = "Type == 'nginx.access'"
encoder = "ESLogstashV0Encoder"
flush_interval = 50

ElasticSearch Payload Encoder

Plugin Name: SandboxEncoder
File Name: lua_encoders/es_payload.lua

Prepends ElasticSearch BulkAPI index JSON to a message payload.

Config:

  • index (string, optional, default “heka-%{%Y.%m.%d}”)

    String to use as the _index key’s value in the generated JSON. Supports field interpolation as described below.

  • type_name (string, optional, default “message”)

    String to use as the _type key’s value in the generated JSON. Supports field interpolation as described below.

  • id (string, optional)

    String to use as the _id key’s value in the generated JSON. Supports field interpolation as described below.

  • es_index_from_timestamp (boolean, optional)

    If true, then any time interpolation (often used to generate the ElasticSeach index) will use the timestamp from the processed message rather than the system time.

Field interpolation:

All of the string config settings listed above support message field interpolation.

Example Heka Configuration

[es_payload]
type = "SandboxEncoder"
filename = "lua_encoders/es_payload.lua"
    [es_payload.config]
    es_index_from_timestamp = true
    index = "%{Logger}-%{%Y.%m.%d}"
    type_name = "%{Type}-%{Hostname}"

[ElasticSearchOutput]
message_matcher = "Type == 'mytype'"
encoder = "es_payload"

Example Output

{"index":{"_index":"mylogger-2014.06.05","_type":"mytype-host.domain.com"}}
{"json":"data","extracted":"from","message":"payload"}

Payload Encoder

Plugin Name: PayloadEncoder

The PayloadEncoder simply extracts the payload from the provided Heka message and converts it into a byte stream for delivery to an external resource.

Config:

  • append_newlines (bool, optional):

    Specifies whether or not a newline character (i.e. n) will be appended to the captured message payload before serialization. Defaults to true.

  • prefix_ts (bool, optional):

    Specifies whether a timestamp will be prepended to the captured message payload before serialization. Defaults to false.

  • ts_from_message (bool, optional):

    If true, the prepended timestamp will be extracted from the message that is being processed. If false, the prepended timestamp will be generated by the system clock at the time of message processing. Defaults to true. This setting has no impact if prefix_ts is set to false.

  • ts_format (string, optional): Specifies the format that should be used for

    prepended timestamps, using the standard strftime string format. Defaults to [%Y/%b/%d:%H:%M:%S %z]. If the specified format string does not end with a space character, then a space will be inserted between the formatted timestamp and the payload.

Example

[PayloadEncoder]
append_newlines = false
prefix_ts = true
ts_format = "%Y/%m/%d %l:%M:%S%p %Z"

Protobuf Encoder

Plugin Name: ProtobufEncoder

The ProtobufEncoder is used to serialize Heka message objects back into Heka’s standard protocol buffers format. This is the format that Heka uses to communicate with other Heka instances, so one will always be included in your Heka configuration using the default “ProtobufEncoder” name whether specified or not.

The hekad protocol buffers message schema is defined in the message.proto file in the message package.

Config:

<none>

Example:

[ProtobufEncoder]

Restructured Text Encoder

Plugin Name: RstEncoder

The RstEncoder generates a reStructuredText rendering of a Heka message, including all fields and attributes. It is useful for debugging, especially when coupled with a Log Output.

Config:

<none>

Example:

[RstEncoder]

[LogOutput]
message_matcher = "TRUE"
encoder = "RstEncoder"

Sandbox Encoder

Plugin Name: SandboxEncoder

The SandboxEncoder provides an isolated execution environment for converting messages into binary data without the need to recompile Heka. See Sandbox.

Config:

Example

[custom_json_encoder]
type = "SandboxEncoder"
filename = "path/to/custom_json_encoder.lua"

    [custom_json_encoder.config]
    msg_fields = ["field1", "field2"]

Schema Carbon Line Encoder

New in version 0.10.

Plugin Name: SandboxEncoder
File Name: lua_encoders/schema_carbon_line.lua

Converts full Heka message contents to line protocol for Carbon Plaintext API Iterates through all of the dynamic fields to add as points (series), skipping any fields explicitly omitted using the skip_fields config option. All dynamic fields in the Heka message are converted to separate points separated by newlines that are submitted to Carbon.

Config:

  • name_prefix (string, optional, default nil)

    String to use as the name key’s prefix value in the generated line. Supports message field interpolation. %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for name name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the name, those values will be interpreted as referring to dynamic message fields.

  • name_prefix_delimiter (string, optional, default ”.”)

    String to use as the delimiter between the name_prefix and the field name. This defaults to a ”.” to use Graphite naming convention.

  • skip_fields (string, optional, default nil)

    Space delimited set of fields that should not be included in the Carbon records being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field. The magic value “all_base” can be used to exclude base fields from being mapped to the event altogether.

  • source_value_field (string, optional, default nil)

    If the desired behavior of this encoder is to extract one field from the Heka message and feed it as a single line to Carbon, then use this option to define which field to find the value from. Make sure to set the name_prefix value to use fields from the message with field interpolation so the full metric path in Graphite is populated. When this option is present, no other fields besides this one will be sent to Carbon whatsoever.

Example Heka Configuration

[LinuxStatsDecoder]
type = "MultiDecoder"
subs = ["LoadAvgDecoder", "AddStaticFields"]
cascade_strategy = "all"
log_sub_errors = false

[LoadAvgPoller]
type = "FilePollingInput"
ticker_interval = 5
file_path = "/proc/loadavg"
decoder = "LinuxStatsDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

[AddStaticFields]
type = "ScribbleDecoder"

    [AddStaticFields.message_fields]
    Environment = "dev"

[CarbonLineEncoder]
type = "SandboxEncoder"
filename = "lua_encoders/schema_carbon_line.lua"

    [CarbonLineEncoder.config]
    name_prefix = "%{Environment}.%{Hostname}.%{Type}"
    skip_fields = "**all_base** FilePath NumProcesses Environment TickerInterval"

[CarbonOutput]
type = "TcpOutput"
message_matcher = "Type =~ /stats.*/"
encoder = "CarbonLineEncoder"
address = "127.0.0.1:2003"

Example Output

dev.myhost.stats.loadavg.1MinAvg 0.12 1434932023
dev.myhost.stats.loadavg.15MinAvg 0.18 1434932023
dev.myhost.stats.loadavg.5MinAvg 0.11 1434932023

Schema InfluxDB Encoder

New in version 0.8.

Plugin Name: SandboxEncoder
File Name: lua_encoders/schema_influx.lua

Converts full Heka message contents to JSON for InfluxDB HTTP API. Includes all standard message fields and iterates through all of the dynamically specified fields, skipping any bytes fields or any fields explicitly omitted using the skip_fields config option.

Note

This encoder is intended for use with InfluxDB versions prior to 0.9. If you’re working with InfluxDB v0.9 or greater, you’ll want to use the Schema InfluxDB Line Encoder instead.

Config:

  • series (string, optional, default “series”)

    String to use as the series key’s value in the generated JSON. Supports interpolation of field values from the processed message, using %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for series name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the series name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the series name, those values will be interpreted as referring to dynamic message fields.

  • skip_fields (string, optional, default “”)

    Space delimited set of fields that should not be included in the InfluxDB records being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field.

  • multi_series (boolean, optional, default false)

    Instead of submitting all fields to InfluxDB as attributes of a single series, submit a series for each field that sets a “value” attribute to the value of the field. This also sets the name attribute to the series value with the field name appended to it by a ”.”. This is the recommended by InfluxDB for v0.9 onwards as it is found to provide better performance when querying and aggregating across multiple series.

  • exclude_base_fields (boolean, optional, default false)

    Don’t send the base fields to InfluxDB. This saves storage space by not including base fields that are mostly redundant and unused data. If skip_fields includes base fields, this overrides it and will only be relevant for skipping dynamic fields.

Example Heka Configuration

[influxdb]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx.lua"
    [influxdb.config]
    series = "heka.%{Logger}"
    skip_fields = "Pid EnvVersion"

[InfluxOutput]
message_matcher = "Type == 'influxdb'"
encoder = "influxdb"
type = "HttpOutput"
address = "http://influxdbserver.example.com:8086/db/databasename/series"
username = "influx_username"
password = "influx_password"

Example Output

[{"points":[[1.409378221e+21,"log","test","systemName","TcpInput",5,1,"test"]],"name":"heka.MyLogger","columns":["Time","Type","Payload","Hostname","Logger","Severity","syslogfacility","programname"]}]

Schema InfluxDB Line Encoder

New in version 0.10.

Plugin Name: SandboxEncoder
File Name: lua_encoders/schema_influx_line.lua

Converts full Heka message contents to line protocol for InfluxDB HTTP write API (new in InfluxDB v0.9.0). Optionally includes all standard message fields as tags or fields and iterates through all of the dynamic fields to add as points (series), skipping any fields explicitly omitted using the skip_fields config option. It can also map any Heka message fields as tags in the request sent to the InfluxDB write API, using the tag_fields config option. All dynamic fields in the Heka message are converted to separate points separated by newlines that are submitted to InfluxDB.

Note

This encoder is intended for use with InfluxDB versions 0.9 or greater. If you’re working with InfluxDB versions prior to 0.9, you’ll want to use the Schema InfluxDB Encoder instead.

Config:

  • decimal_precision (string, optional, default “6”)

    String that is used in the string.format function to define the number of digits printed after the decimal in number values. The string formatting of numbers is forced to print with floating points because InfluxDB will reject values that change from integers to floats and vice-versa. By forcing all numbers to floats, we ensure that InfluxDB will always accept our numerical values, regardless of the initial format.

  • name_prefix (string, optional, default nil)

    String to use as the name key’s prefix value in the generated line. Supports message field interpolation. %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for name name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the name, those values will be interpreted as referring to dynamic message fields.

  • name_prefix_delimiter (string, optional, default nil)

    String to use as the delimiter between the name_prefix and the field name. This defaults to a blank string but can be anything else instead (such as ”.” to use Graphite-like naming).

  • skip_fields (string, optional, default nil)

    Space delimited set of fields that should not be included in the InfluxDB measurements being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field. The magic value “all_base” can be used to exclude base fields from being mapped to the event altogether (useful if you don’t want to use tags and embed them in the name_prefix instead).

  • source_value_field (string, optional, default nil)

    If the desired behavior of this encoder is to extract one field from the Heka message and feed it as a single line to InfluxDB, then use this option to define which field to find the value from. Be careful to set the name_prefix field if this option is present or no measurement name will be present when trying to send to InfluxDB. When this option is present, no other fields besides this one will be sent to InfluxDB as a measurement whatsoever.

  • tag_fields (string, optional, default “all_base”)

    Take fields defined and add them as tags of the measurement(s) sent to InfluxDB for the message. The magic values “all” and “all_base” are used to map all fields (including taggable base fields) to tags and only base fields to tags, respectively. If those magic values aren’t used, then only those fields defined will map to tags of the measurement sent to InfluxDB. The tag_fields values are independent of the skip_fields values and have no affect on each other. You can skip fields from being sent to InfluxDB as measurements, but still include them as tags.

  • timestamp_precision (string, optional, default “ms”)

    Specify the timestamp precision that you want the event sent with. The default is to use milliseconds by dividing the Heka message timestamp by 1e6, but this math can be altered by specifying one of the precision values supported by the InfluxDB write API (ms, s, m, h). Other precisions supported by InfluxDB of n and u are not yet supported.

  • value_field_key (string, optional, default “value”)

    This defines the name of the InfluxDB measurement. We default this to “value” to match the examples in the InfluxDB documentation, but you can replace that with anything else that you prefer.

Example Heka Configuration

[LoadAvgPoller]
type = "FilePollingInput"
ticker_interval = 5
file_path = "/proc/loadavg"
decoder = "LinuxStatsDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

[LinuxStatsDecoder]
type = "MultiDecoder"
subs = ["LoadAvgDecoder", "AddStaticFields"]
cascade_strategy = "all"
log_sub_errors = false

[AddStaticFields]
type = "ScribbleDecoder"

    [AddStaticFields.message_fields]
    Environment = "dev"

[InfluxdbLineEncoder]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx_line.lua"

    [InfluxdbLineEncoder.config]
    skip_fields = "**all_base** FilePath NumProcesses Environment TickerInterval"
    tag_fields = "Hostname Environment"
    timestamp_precision= "s"

[InfluxdbOutput]
type = "HttpOutput"
message_matcher = "Type =~ /stats.*/"
encoder = "InfluxdbLineEncoder"
address = "http://influxdbserver.example.com:8086/write?db=mydb&rp=mypolicy&precision=s"
username = "influx_username"
password = "influx_password"

Example Output

5MinAvg,Hostname=myhost,Environment=dev value=0.110000 1434932024
1MinAvg,Hostname=myhost,Environment=dev value=0.110000 1434932024
15MinAvg,Hostname=myhost,Environment=dev value=0.170000 1434932024

StatMetric InfluxDB Encoder

New in version 0.7.

Plugin Name: SandboxEncoder
File Name: lua_encoders/statmetric_influx.lua

Extracts data from message fields in heka.statmetric messages generated by a Stat Accumulator Input and generates JSON suitable for use with InfluxDB’s HTTP API. StatAccumInput must be configured with emit_in_fields = true for this encoder to work correctly.

Config:

<none>

Example Heka Configuration

[statmetric-influx-encoder]
type = "SandboxEncoder"
filename = "lua_encoders/statmetric_influx.lua"

[influx]
type = "HttpOutput"
message_matcher = "Type == 'heka.statmetric'"
address = "http://myinfluxserver.example.com:8086/db/stats/series"
encoder = "statmetric-influx-encoder"
username = "influx_username"
password = "influx_password"

Example Output

[{"points":[[1408404848,78271]],"name":"stats.counters.000000.rate","columns":["time","value"]},{"points":[[1408404848,78271]],"name":"stats.counters.000000.count","columns":["time","value"]},{"points":[[1408404848,17420]],"name":"stats.timers.000001.count","columns":["time","value"]},{"points":[[1408404848,17420]],"name":"stats.timers.000001.count_ps","columns":["time","value"]},{"points":[[1408404848,1]],"name":"stats.timers.000001.lower","columns":["time","value"]},{"points":[[1408404848,1024]],"name":"stats.timers.000001.upper","columns":["time","value"]},{"points":[[1408404848,8937851]],"name":"stats.timers.000001.sum","columns":["time","value"]},{"points":[[1408404848,513.07985074627]],"name":"stats.timers.000001.mean","columns":["time","value"]},{"points":[[1408404848,461.72356167879]],"name":"stats.timers.000001.mean_90","columns":["time","value"]},{"points":[[1408404848,925]],"name":"stats.timers.000001.upper_90","columns":["time","value"]},{"points":[[1408404848,2]],"name":"stats.statsd.numStats","columns":["time","value"]}]

Outputs

:

Common Output Parameters

There are some configuration options that are universally available to all Heka output plugins. These will be consumed by Heka itself when Heka initializes the plugin and do not need to be handled by the plugin-specific initialization code.

  • message_matcher (string, optional):

    Boolean expression, when evaluated to true passes the message to the filter for processing. Defaults to matching nothing. See: Message Matcher Syntax

  • message_signer (string, optional):

    The name of the message signer. If specified only messages with this signer are passed to the filter for processing.

  • ticker_interval (uint, optional):

    Frequency (in seconds) that a timer event will be sent to the filter. Defaults to not sending timer events.

New in version 0.6.

  • encoder (string, optional):

    Encoder to be used by the output. This should refer to the name of an encoder plugin section that is specified elsewhere in the TOML configuration. Messages can be encoded using the specified encoder by calling the OutputRunner’s Encode() method.

  • use_framing (bool, optional):

    Specifies whether or not Heka’s Stream Framing should be applied to the binary data returned from the OutputRunner’s Encode() method.

New in version 0.7.

  • can_exit (bool, optional)

    Whether or not this plugin can exit without causing Heka to shutdown. Defaults to false.

New in version 0.10.

  • use_buffering (bool, optional)

    If true, all messages delivered to this output will be buffered to disk before delivery, preventing back pressure and allowing retries in cases of message processing failure. Defaults to false, unless otherwise specified by the individual output’s documentation.

  • buffering (QueueBufferConfig, optional)

    A sub-section that specifies the settings to be used for the buffering behavior. This will only have any impact if use_buffering is set to true. See Configuring Buffering.

AMQP Output

Plugin Name: AMQPOutput

Connects to a remote AMQP broker (RabbitMQ) and sends messages to the specified queue. The message is serialized if specified, otherwise only the raw payload of the message will be sent. As AMQP is dynamically programmable, the broker topology needs to be specified.

Config:

  • url (string):

    An AMQP connection string formatted per the RabbitMQ URI Spec.

  • exchange (string):

    AMQP exchange name

  • exchange_type (string):

    AMQP exchange type (fanout, direct, topic, or headers).

  • exchange_durability (bool):

    Whether the exchange should be configured as a durable exchange. Defaults to non-durable.

  • exchange_auto_delete (bool):

    Whether the exchange is deleted when all queues have finished and there is no publishing. Defaults to auto-delete.

  • routing_key (string):

    The message routing key used to bind the queue to the exchange. Defaults to empty string.

  • persistent (bool):

    Whether published messages should be marked as persistent or transient. Defaults to non-persistent.

  • retries (RetryOptions, optional):

    A sub-section that specifies the settings to be used for restart behavior. See Configuring Restarting Behavior

New in version 0.6.

  • content_type (string):

    MIME content type of the payload used in the AMQP header. Defaults to “application/hekad”.

  • encoder (string, optional)

    Specifies which of the registered encoders should be used for converting Heka messages to binary data that is sent out over the AMQP connection. Defaults to the always available “ProtobufEncoder”.

  • use_framing (bool, optional):

    Specifies whether or not the encoded data sent out over the TCP connection should be delimited by Heka’s Stream Framing. Defaults to true.

New in version 0.6.

  • tls (TlsConfig):

    An optional sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if URL uses the AMQPS URI scheme. See Configuring TLS.

Example (that sends log lines from the logger):

[AMQPOutput]
url = "amqp://guest:guest@rabbitmq/"
exchange = "testout"
exchange_type = "fanout"
message_matcher = 'Logger == "TestWebserver"'

Carbon Output

Plugin Name: CarbonOutput

CarbonOutput plugins parse the “stat metric” messages generated by a StatAccumulator and write the extracted counter, timer, and gauge data out to a graphite compatible carbon daemon. Output is written over a TCP or UDP socket using the plaintext protocol.

Config:

  • address (string):

    An IP address:port on which this plugin will write to. (default: “localhost:2003”)

New in version 0.5.

  • protocol (string):

    “tcp” or “udp” (default: “tcp”)

  • tcp_keep_alive (bool)

    if set, keep the TCP connection open and reuse it until a failure; then retry (default: false)

Example:

[CarbonOutput]
message_matcher = "Type == 'heka.statmetric'"
address = "localhost:2003"
protocol = "udp"

Dashboard Output

Plugin Name: DashboardOutput

Specialized output plugin that listens for certain Heka reporting message types and generates JSON data which is made available via HTTP for use in web based dashboards and health reports.

Config:

  • ticker_interval (uint):

    Specifies how often, in seconds, the dashboard files should be updated. Defaults to 5.

  • message_matcher (string):

    Defaults to “Type == ‘heka.all-report’ || Type == ‘heka.sandbox-output’ || Type == ‘heka.sandbox-terminated’”. Not recommended to change this unless you know what you’re doing.

  • address (string):

    An IP address:port on which we will serve output via HTTP. Defaults to “0.0.0.0:4352”.

  • working_directory (string):

    File system directory into which the plugin will write data files and from which it will serve HTTP. The Heka process must have read / write access to this directory. Relative paths will be evaluated relative to the Heka base directory. Defaults to $(BASE_DIR)/dashboard.

  • static_directory (string):

    File system directory where the Heka dashboard source code can be found. The Heka process must have read access to this directory. Relative paths will be evaluated relative to the Heka base directory. Defaults to ${SHARE_DIR}/dasher.

New in version 0.7.

  • headers (subsection, optional):

    It is possible to inject arbitrary HTTP headers into each outgoing response by adding a TOML subsection entitled “headers” to your HttpOutput config section. All entries in the subsection must be a list of string values.

Example:

[DashboardOutput]
ticker_interval = 30

ElasticSearch Output

Plugin Name: ElasticSearchOutput

Output plugin that uses HTTP or UDP to insert records into an ElasticSearch database. Note that it is up to the specified encoder to both serialize the message into a JSON structure and to prepend that with the appropriate ElasticSearch BulkAPI indexing JSON. Usually this output is used in conjunction with an ElasticSearch-specific encoder plugin, such as ElasticSearch JSON Encoder, ElasticSearch Logstash V0 Encoder, or ElasticSearch Payload Encoder.

Config:

  • flush_interval (int):

    Interval at which accumulated messages should be bulk indexed into ElasticSearch, in milliseconds. Defaults to 1000 (i.e. one second).

  • flush_count (int):

    Number of messages that, if processed, will trigger them to be bulk indexed into ElasticSearch. Defaults to 10.

  • server (string):

    ElasticSearch server URL. Supports http://, https:// and udp:// urls. Defaults to “http://localhost:9200”.

  • connect_timeout (int):

    Time in milliseconds to wait for a server name resolving and connection to ES. It’s included in an overall time (see ‘http_timeout’ option), if they both are set. Default is 0 (no timeout).

  • http_timeout (int):

    Time in milliseconds to wait for a response for each http post to ES. This may drop data as there is currently no retry. Default is 0 (no timeout).

  • http_disable_keepalives (bool):

    Specifies whether or not re-using of established TCP connections to ElasticSearch should be disabled. Defaults to false, that means using both HTTP keep-alive mode and TCP keep-alives. Set it to true to close each TCP connection after ‘flushing’ messages to ElasticSearch.

  • username (string):

    The username to use for HTTP authentication against the ElasticSearch host. Defaults to “” (i. e. no authentication).

  • password (string):

    The password to use for HTTP authentication against the ElasticSearch host. Defaults to “” (i. e. no authentication).

New in version 0.9.

  • tls (TlsConfig):

    An optional sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if URL uses the HTTPS URI scheme. See Configuring TLS.

  • use_buffering (bool, optional):

    Buffer records to a disk-backed buffer on the Heka server before writing them to ElasticSearch. Defaults to true.

  • buffering (QueueBufferConfig, optional):

    All of the buffering config options are set to the standard default options.

Example:

[ElasticSearchOutput]
message_matcher = "Type == 'sync.log'"
server = "http://es-server:9200"
flush_interval = 5000
flush_count = 10
encoder = "ESJsonEncoder"

File Output

Plugin Name: FileOutput

Writes message data out to a file system.

Config:

  • path (string):

    Full path to the output file. If date rotation is in use, then the output file path can support strftime syntax to embed timestamps in the file path: http://strftime.org

  • perm (string, optional):

    File permission for writing. A string of the octal digit representation. Defaults to “644”.

  • folder_perm (string, optional):

    Permissions to apply to directories created for FileOutput’s parent directory if it doesn’t exist. Must be a string representation of an octal integer. Defaults to “700”.

  • flush_interval (uint32, optional):

    Interval at which accumulated file data should be written to disk, in milliseconds (default 1000, i.e. 1 second). Set to 0 to disable.

  • flush_count (uint32, optional):

    Number of messages to accumulate until file data should be written to disk (default 1, minimum 1).

  • flush_operator (string, optional):

    Operator describing how the two parameters “flush_interval” and “flush_count” are combined. Allowed values are “AND” or “OR” (default is “AND”).

New in version 0.6.

  • use_framing (bool, optional):

    Specifies whether or not the encoded data sent out over the TCP connection should be delimited by Heka’s Stream Framing. Defaults to true if a ProtobufEncoder is used, false otherwise.

New in version 0.9.

  • rotation_interval (uint32, optional):

    Interval at which the output file should be rotated, in hours. Only the following values are allowed: 0, 1, 4, 12, 24 (set to 0 to disable). The files will be named relative to midnight of the day. Defaults to 0, i.e. disabled.

Example:

[counter_file]
type = "FileOutput"
message_matcher = "Type == 'heka.counter-output'"
path = "/var/log/heka/counter-output.log"
perm = "666"
flush_count = 100
flush_operator = "OR"
encoder = "PayloadEncoder"

New in version 0.6.

HTTP Output

Plugin Name: HttpOutput

A very simple output plugin that uses HTTP GET, POST, or PUT requests to deliver data to an HTTP endpoint. When using POST or PUT request methods the encoded output will be uploaded as the request body. When using GET the encoded output will be ignored.

This output doesn’t support any request batching; each received message will generate an HTTP request. Batching can be achieved by use of a filter plugin that accumulates message data, periodically emitting a single message containing the batched, encoded HTTP request data in the payload. An HttpOutput can then be configured to capture these batch messages, using a Payload Encoder to extract the message payload.

For now the HttpOutput only supports statically defined request parameters (URL, headers, auth, etc.). Future iterations will provide a mechanism for dynamically specifying these values on a per-message basis.

Config:

  • address (string):

    URL address of HTTP server to which requests should be sent. Must begin with “http://” or “https://”.

  • method (string, optional):

    HTTP request method to use, must be one of GET, POST, or PUT. Defaults to POST.

  • username (string, optional):

    If specified, HTTP Basic Auth will be used with the provided user name.

  • password (string, optional):

    If specified, HTTP Basic Auth will be used with the provided password.

  • headers (subsection, optional):

    It is possible to inject arbitrary HTTP headers into each outgoing request by adding a TOML subsection entitled “headers” to you HttpOutput config section. All entries in the subsection must be a list of string values.

  • http_timeout(uint, optional):

    Time in milliseconds to wait for a response for each http request. This may drop data as there is currently no retry. Default is 0 (no timeout)

  • tls (subsection, optional):

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if an “https://” address is used. See Configuring TLS.

Example:

[PayloadEncoder]

[influxdb]
type = "HttpOutput"
message_matcher = "Type == 'influx.formatted'"
address = "http://influxdb.example.com:8086/db/stats/series"
encoder = "PayloadEncoder"
username = "MyUserName"
password = "MyPassword"

IRC Output

Plugin Name: IrcOutput

Connects to an Irc Server and sends messages to the specified Irc channels. Output is encoded using the specified encoder, and expects output to be properly truncated to fit within the bounds of an Irc message before being receiving the output.

Config:

  • server (string):

    A host:port of the irc server that Heka will connect to for sending output.

  • nick (string):

    Irc nick used by Heka.

  • ident (string):

    The Irc identity used to login with by Heka.

  • password (string, optional):

    The password used to connect to the Irc server.

  • channels (list of strings):

    A list of Irc channels which every matching Heka message is sent to. If there is a space in the channel string, then the part after the space is expected to be a password for a protected irc channel.

  • timeout (uint, optional):

    The maximum amount of time (in seconds) to wait before timing out when connect, reading, or writing to the Irc server. Defaults to 10.

  • tls (TlsConfig, optional):

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

  • queue_size (uint, optional):

    This is the maximum amount of messages Heka will queue per Irc channel before discarding messages. There is also a queue of the same size used if all per-irc channel queues are full. This is used when Heka is unable to send a message to an Irc channel, such as when it hasn’t joined or has been disconnected. Defaults to 100.

  • rejoin_on_kick (bool, optional):

    Set this if you want Heka to automatically re-join an Irc channel after being kicked. If not set, and Heka is kicked, it will not attempt to rejoin ever. Defaults to false.

  • ticker_interval (uint, optional):

    How often (in seconds) heka should send a message to the server. This is on a per message basis, not per channel. Defaults to 2.

  • time_before_reconnect (uint, optional):

    How long to wait (in seconds) before reconnecting to the Irc server after being disconnected. Defaults to 3.

  • time_before_rejoin (uint, optional):

    How long to wait (in seconds) before attempting to rejoin an Irc channel which is full. Defaults to 3.

  • max_join_retries (uint, optional):

    The maximum amount of attempts Heka will attempt to join an Irc channel before giving up. After attempts are exhausted, Heka will no longer attempt to join the channel. Defaults to 3.

  • verbose_irc_logging (bool, optional):

    Enable to see raw internal message events Heka is receiving from the server. Defaults to false.

  • encoder (string):

    Specifies which of the registered encoders should be used for converting Heka messages into what is sent to the irc channels.

  • retries (RetryOptions, optional):

    A sub-section that specifies the settings to be used for restart behavior. See Configuring Restarting Behavior

Example:

[IrcOutput]
message_matcher = 'Type == "alert"'
encoder = "PayloadEncoder"
server = "irc.mozilla.org:6667"
nick = "heka_bot"
ident = "heka_ident"
channels = [ "#heka_bot_irc testkeypassword" ]
rejoin_on_kick = true
queue_size = 200
ticker_interval = 1

Kafka Output

Plugin Name: KafkaOutput

Connects to a Kafka broker and sends messages to the specified topic.

Config:

  • id (string)

    Client ID string. Default is the hostname.

  • addrs ([]string)

    List of brokers addresses.

  • metadata_retries (int)

    How many times to retry a metadata request when a partition is in the middle of leader election. Default is 3.

  • wait_for_election (uint32)

    How long to wait for leader election to finish between retries (in milliseconds). Default is 250.

  • background_refresh_frequency (uint32)

    How frequently the client will refresh the cluster metadata in the background (in milliseconds). Default is 600000 (10 minutes). Set to 0 to disable.

  • max_message_bytes (uint32)

    Maximum size of a single message. Defaults to Heka’s maximum record size.

  • max_open_reqests (int)

    How many outstanding requests the broker is allowed to have before blocking attempts to send. Default is 4.

  • dial_timeout (uint32)

    How long to wait for the initial connection to succeed before timing out and returning an error (in milliseconds). Default is 60000 (1 minute).

  • read_timeout (uint32)

    How long to wait for a response before timing out and returning an error (in milliseconds). Default is 60000 (1 minute).

  • write_timeout (uint32)

    How long to wait for a transmit to succeed before timing out and returning an error (in milliseconds). Default is 60000 (1 minute).

  • partitioner (string)

    Chooses the partition to send messages to. The valid values are Random, RoundRobin, Hash. Default is Random.

  • hash_variable (string)

    The message variable used for the Hash partitioner only. The variables are restricted to Type, Logger, Hostname, Payload or any of the message’s dynamic field values. All dynamic field values will be converted to a string representation. Field specifications are the same as with the Message Matcher Syntax e.g. Fields[foo][0][0].

  • topic_variable (string)

    The message variable used as the Kafka topic (cannot be used in conjunction with the ‘topic’ configuration). The variable restrictions are the same as the hash_variable.

  • topic (string)

    A static Kafka topic (cannot be used in conjunction with the ‘topic_variable’ configuration).

  • required_acks (string)

    The level of acknowledgement reliability needed from the broker. The valid values are NoResponse, WaitForLocal, WaitForAll. Default is WaitForLocal.

  • timeout (uint32)

    The maximum duration the broker will wait for the receipt of the number of RequiredAcks (in milliseconds). This is only relevant when RequiredAcks is set to WaitForAll. Default is no timeout.

  • compression_codec (string)

    The type of compression to use on messages. The valid values are None, GZIP, Snappy. Default is None.

  • max_buffer_time (uint32)

    The maximum duration to buffer messages before triggering a flush to the broker (in milliseconds). Default is 1.

  • max_buffered_bytes (uint32)

    The threshold number of bytes buffered before triggering a flush to the broker. Default is 1.

  • back_pressure_threshold_bytes (uint32)

    The maximum number of bytes allowed to accumulate in the buffer before back-pressure is applied to QueueMessage. Without this, queueing messages too fast will cause the producer to construct requests larger than the MaxRequestSize (100 MiB). Default is 50 * 1024 * 1024 (50 MiB), cannot be more than (MaxRequestSize - 10 KiB).

New in version 0.11.

  • use_tls (bool, optional):

    Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

  • tls (TlsConfig, optional):

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

Example (send various Fxa messages to a static Fxa topic):

[FxaKafkaOutput]
type = "KafkaOutput"
message_matcher = "Logger == 'FxaAuthWebserver' || Logger == 'FxaAuthServer'"
topic = "Fxa"
addrs = ["localhost:9092"]
encoder = "ProtobufEncoder"

Log Output

Plugin Name: LogOutput

Logs messages to stdout using Go’s log package. Honors hekad’s global log_flags setting.

Config:

<none>

Example:

[counter_output]
type = "LogOutput"
message_matcher = "Type == 'heka.counter-output'"
encoder = "PayloadEncoder"

Nagios Output

Plugin Name: NagiosOutput

Specialized output plugin that listens for Nagios external command message types and delivers passive service check results to Nagios using either HTTP requests made to the Nagios cmd.cgi API or the use of the send_ncsa binary. The message payload must consist of a state followed by a colon and then the message e.g., “OK:Service is functioning properly”. The valid states are: OK|WARNING|CRITICAL|UNKNOWN. Nagios must be configured with a service name that matches the Heka plugin instance name and the hostname where the plugin is running.

Config:

  • url (string, optional):

    An HTTP URL to the Nagios cmd.cgi. Defaults to http://localhost/nagios/cgi-bin/cmd.cgi.

  • username (string, optional):

    Username used to authenticate with the Nagios web interface. Defaults to empty string.

  • password (string, optional):

    Password used to authenticate with the Nagios web interface. Defaults to empty string.

  • response_header_timeout (uint, optional):

    Specifies the amount of time, in seconds, to wait for a server’s response headers after fully writing the request. Defaults to 2.

  • nagios_service_description (string, optional):

    Must match Nagios service’s service_description attribute. Defaults to the name of the output.

  • nagios_host (string, optional):

    Must match the hostname of the server in nagios. Defaults to the Hostname attribute of the message.

  • send_nsca_bin (string, optional):

    New in version 0.5.

    Use send_nsca program, as provided, rather than sending HTTP requests. Not supplying this value means HTTP will be used, and any other send_nsca_* settings will be ignored.

  • send_nsca_args ([]string, optional):

    New in version 0.5.

    Arguments to use with send_nsca, usually at least the nagios hostname, e.g. [“-H”, “nagios.somehost.com”]. Defaults to an empty list.

  • send_nsca_timeout (int, optional):

    New in version 0.5.

    Timeout for the send_nsca command, in seconds. Defaults to 5.

  • use_tls (bool, optional):

    New in version 0.5.

    Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

  • tls (TlsConfig, optional):

    New in version 0.5.

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

Example configuration to output alerts from SandboxFilter plugins:

[NagiosOutput]
url = "http://localhost/nagios/cgi-bin/cmd.cgi"
username = "nagiosadmin"
password = "nagiospw"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'nagios-external-command' && Fields[payload_name] == 'PROCESS_SERVICE_CHECK_RESULT'"

Example Lua code to generate a Nagios alert:

inject_payload("nagios-external-command", "PROCESS_SERVICE_CHECK_RESULT", "OK:Alerts are working!")

Sandbox Output

New in version 0.9.

Plugin Name: SandboxOutput

The SandboxOutput provides a flexible execution environment for data encoding and transmission without the need to recompile Heka. See Sandbox.

Config:

  • The common output configuration parameter ‘encoder’ is ignored since all data transformation should happen in the plugin.

  • Common Sandbox Parameters

  • timer_event_on_shutdown (bool):

    True if the sandbox should have its timer_event function called on shutdown.

Example

[SandboxFileOutput]
type = "SandboxOutput"
filename = "fileoutput.lua"

[SandboxFileOutput.config]
path = "mylog.txt"

SMTP Output

Plugin Name: SmtpOutput

New in version 0.5.

Outputs a Heka message in an email. The message subject is the plugin name and the message content is controlled by the payload_only setting. The primary purpose is for email alert notifications e.g., PagerDuty.

Config:

  • send_from (string)

    The email address of the sender. (default: “heka@localhost.localdomain”)

  • send_to (array of strings)

    An array of email addresses where the output will be sent to.

  • subject (string)

    Custom subject line of email. (default: “Heka [SmtpOutput]”)

  • host (string)

    SMTP host to send the email to (default: “127.0.0.1:25”)

  • auth (string)

    SMTP authentication type: “none”, “Plain”, “CRAMMD5” (default: “none”)

  • user (string, optional)

    SMTP user name

  • password (string, optional)

    SMTP user password

New in version 0.9.

  • send_interval (uint, optional)

    Minimum time interval between each email, in seconds. First email in an interval goes out immediately, subsequent messages in the same interval are concatenated and all sent when the interval expires. Defaults to 0, meaning all emails are sent immediately.

Example:

[FxaAlert]
type = "SmtpOutput"
message_matcher = "((Type == 'heka.sandbox-output' && Fields[payload_type] == 'alert') || Type == 'heka.sandbox-terminated') && Logger =~ /^Fxa/"
send_from = "heka@example.com"
send_to = ["alert@example.com"]
auth = "Plain"
user = "test"
password = "testpw"
host = "localhost:25"
encoder = "AlertEncoder"

TCP Output

Plugin Name: TcpOutput

Output plugin that delivers Heka message data to a listening TCP connection. Can be used to deliver messages from a local running Heka agent to a remote Heka instance set up as an aggregator and/or router, or to any other arbitrary listening TCP server that knows how to process the encoded data.

Config:

  • address (string):

    An IP address:port to which we will send our output data.

  • use_tls (bool, optional):

    Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

New in version 0.5.

  • tls (TlsConfig, optional):

    A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

New in version 0.6.

  • local_address (string, optional):

    A local IP address to use as the source address for outgoing traffic to this destination. Cannot currently be combined with TLS connections.

  • encoder (string, optional):

    Specifies which of the registered encoders should be used for converting Heka messages to binary data that is sent out over the TCP connection. Defaults to the always available “ProtobufEncoder”.

  • use_framing (bool, optional):

    Specifies whether or not the encoded data sent out over the TCP connection should be delimited by Heka’s Stream Framing. Defaults to true if a ProtobufEncoder is used, false otherwise.

  • keep_alive (bool):

    Specifies whether or not TCP keepalive should be used for established TCP connections. Defaults to false.

  • keep_alive_period (int):

    Time duration in seconds that a TCP connection will be maintained before keepalive probes start being sent. Defaults to 7200 (i.e. 2 hours).

New in version 0.10.

  • use_buffering (bool, optional):

    Buffer records to a disk-backed buffer on the Heka server before sending them out over the TCP connection. Defaults to true.

  • buffering (QueueBufferConfig, optional):

    All of the buffering config options are set to the standard default options, except for cursor_update_count, which is set to 50 instead of the standard default of 1.

  • reconnect_after (int, optional):

    Re-establish the TCP connection after the specified number of successfully delivered messages. Defaults to 0 (no reconnection).

Example:

[aggregator_output]
type = "TcpOutput"
address = "heka-aggregator.mydomain.com:55"
local_address = "127.0.0.1"
message_matcher = "Type != 'logfile' && Type !~ /^heka\./'"

UDP Output

New in version 0.7.

Plugin Name: UdpOutput

Output plugin that delivers Heka message data to a specified UDP or Unix datagram socket location.

Config:

  • net (string, optional):

    Network type to use for communication. Must be one of “udp”, “udp4”, “udp6”, or “unixgram”. “unixgram” option only available on systems that support Unix datagram sockets. Defaults to “udp”.

  • address (string):

    Address to which we will be sending the data. Must be IP:port for net types of “udp”, “udp4”, or “udp6”. Must be a path to a Unix datagram socket file for net type “unixgram”.

  • local_address (string, optional):

    Local address to use on the datagram packets being generated. Must be IP:port for net types of “udp”, “udp4”, or “udp6”. Must be a path to a Unix datagram socket file for net type “unixgram”.

  • encoder (string):

    Name of registered encoder plugin that will extract and/or serialized data from the Heka message.

  • max_message_size (int):

    Maximum size of message that is allowed to be sent via UdpOutput. Messages which exceed this limit will be dropped. Defaults to 65507 (the limit for UDP packets in IPv4).

Example:

[PayloadEncoder]

[UdpOutput]
address = "myserver.example.com:34567"
encoder = "PayloadEncoder"

Whisper Output

Plugin Name: WhisperOutput

WhisperOutput plugins parse the “statmetric” messages generated by a StatAccumulator and write the extracted counter, timer, and gauge data out to a graphite compatible whisper database file tree structure.

Config:

  • base_path (string):

    Path to the base directory where the whisper file tree will be written. Absolute paths will be honored, relative paths will be calculated relative to the Heka base directory. Defaults to “whisper” (i.e. “$(BASE_DIR)/whisper”).

  • default_agg_method (int):

    Default aggregation method to use for each whisper output file. Supports the following values:

    1. Unknown aggregation method.
    2. Aggregate using averaging. (default)
    3. Aggregate using summation.
    4. Aggregate using last received value.
    5. Aggregate using maximum value.
    6. Aggregate using minimum value.
  • default_archive_info ([][]int):

    Default specification for new whisper db archives. Should be a sequence of 3-tuples, where each tuple describes a time interval’s storage policy: [<offset> <# of secs per datapoint> <# of datapoints>] (see whisper docs for more info). Defaults to:

    [ [0, 60, 1440], [0, 900, 8], [0, 3600, 168], [0, 43200, 1456]]
    

    The above defines four archive sections. The first uses 60 seconds for each of 1440 data points, which equals one day of retention. The second uses 15 minutes for each of 8 data points, for two hours of retention. The third uses one hour for each of 168 data points, or 7 days of retention. Finally, the fourth uses 12 hours for each of 1456 data points, representing two years of data.

  • folder_perm (string):

    Permission mask to be applied to folders created in the whisper database file tree. Must be a string representation of an octal integer. Defaults to “700”.

Example:

[WhisperOutput]
message_matcher = "Type == 'heka.statmetric'"
default_agg_method = 3
default_archive_info = [ [0, 30, 1440], [0, 900, 192], [0, 3600, 168], [0, 43200, 1456] ]
folder_perm = "755"

See Also

hekad(1), hekad.config(5)