mozilla

Filters

Common Filter Parameters

There are some configuration options that are universally available to all Heka filter plugins. These will be consumed by Heka itself when Heka initializes the plugin and do not need to be handled by the plugin-specific initialization code.

  • message_matcher (string, optional):

    Boolean expression, when evaluated to true passes the message to the filter for processing. Defaults to matching nothing. See: Message Matcher Syntax

  • message_signer (string, optional):

    The name of the message signer. If specified only messages with this signer are passed to the filter for processing.

  • ticker_interval (uint, optional):

    Frequency (in seconds) that a timer event will be sent to the filter. Defaults to not sending timer events.

Circular Buffer Delta Aggregator

New in version 0.5.

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). The purpose is to recreate the view at a larger scope in each level of the aggregation i.e., host view -> datacenter view -> service level view.

Config:

  • enable_delta (bool, optional, default false)

    Specifies whether or not this aggregator should generate cbuf deltas.

  • anomaly_config(string) - (see Anomaly Detection Module)

    A list of anomaly detection specifications. If not specified no anomaly detection/alerting will be performed.

Example Heka Configuration

[TelemetryServerMetricsAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsAggregator.config]
enable_delta = false
anomaly_config = 'roc("Request Statistics", 1, 15, 0, 1.5, true, false)'

CBuf Delta Aggregator By Hostname

New in version 0.5.

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). Each column from the source circular buffer will become its own graph. i.e., ‘Error Count’ will become a graph with each host being represented in a column.

Config:

  • max_hosts (uint)

    Pre-allocates the number of host columns in the graph(s). If the number of active hosts exceed this value, the plugin will terminate.

  • rows (uint)

    The number of rows to keep from the original circular buffer. Storing all the data from all the hosts is not practical since you will most likely run into memory and output size restrictions (adjust the view down as necessary).

  • host_expiration (uint, optional, default 120 seconds)

    The amount of time a host has to be inactive before it can be replaced by a new host.

Example Heka Configuration

[TelemetryServerMetricsHostAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_host_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsHostAggregator.config]
max_hosts = 5
rows = 60
host_expiration = 120

CounterFilter

Once per ticker interval a CounterFilter will generate a message of type heka .counter-output. The payload will contain text indicating the number of messages that matched the filter’s message_matcher value during that interval (i.e. it counts the messages the plugin received). Every ten intervals an extra message (also of type heka.counter-output) goes out, containing an aggregate count and average per second throughput of messages received.

Config:

  • ticker_interval (int, optional):

    Interval between generated counter messages, in seconds. Defaults to 5.

Example:

[CounterFilter]
message_matcher = "Type != 'heka.counter-output'"

Frequent Items

New in version 0.5.

Calculates the most frequent items in a data stream.

Config:

  • message_variable (string)

    The message variable name containing the items to be counted.

  • max_items (uint, optional, default 1000)

    The maximum size of the sample set (higher will produce a more accurate list).

  • min_output_weight (uint, optional, default 100)

    Used to reduce the long tail output by only outputting the higher frequency items.

  • reset_days (uint, optional, default 1)

    Resets the list after the specified number of days (on the UTC day boundary). A value of 0 will never reset the list.

Example Heka Configuration

[FxaAuthServerFrequentIP]
type = "SandboxFilter"
filename = "lua_filters/frequent_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerFrequentIP.config]
message_variable = "Fields[remote_addr]"
max_items = 10000
min_output_weight = 100
reset_days = 1

Heka Memory Statistics

New in version 0.6.

Graphs the Heka memory statistics using the heka.memstat message generated by pipeline/report.go.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config(string) - (see Anomaly Detection Module)

Example Heka Configuration

[HekaMemstat]
type = "SandboxFilter"
filename = "lua_filters/heka_memstat.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'heka.memstat'"

Heka Message Schema

New in version 0.5.

Generates documentation for each unique message in a data stream. The output is a hierarchy of Logger, Type, EnvVersion, and a list of associated message field attributes including their counts (number in the brackets). This plugin is meant for data discovery/exploration and should not be left running on a production system.

Config:

<none>

Example Heka Configuration

[SyncMessageSchema]
type = "SandboxFilter"
filename = "lua_filters/heka_message_schema.lua"
ticker_interval = 60
preserve_data = false
message_matcher = "Logger =~ /^Sync/"

Example Output

Sync-1_5-Webserver [54600]
slf [54600]
-no version- [54600]
upstream_response_time (mismatch)
http_user_agent (string)
body_bytes_sent (number)
remote_addr (string)
request (string)
upstream_status (mismatch)
status (number)
request_time (number)
request_length (number)
Sync-1_5-SlowQuery [37]
mysql.slow-query [37]
-no version- [37]
Query_time (number)
Rows_examined (number)
Rows_sent (number)
Lock_time (number)

HTTP Status Graph

New in version 0.5.

Graphs HTTP status codes using the numeric Fields[status] variable collected from web server access logs.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config(string) - (see Anomaly Detection Module)

Example Heka Configuration

[FxaAuthServerHTTPStatus]
type = "SandboxFilter"
filename = "lua_filters/http_status.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerHTTPStatus.config]
sec_per_row = 60
rows = 1440
anomaly_config = 'roc("HTTP Status", 1, 15, 0, 1.5, true, false)'

MySQL Slow Query

New in version 0.6.

Graphs MySQL slow query data produced by the MySQL Slow Query Log Decoder.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config(string) - (see Anomaly Detection Module)

Example Heka Configuration

[Sync-1_5-SlowQueries]
type = "SandboxFilter"
message_matcher = "Logger == 'Sync-1_5-SlowQuery'"
ticker_interval = 60
filename = "lua_filters/mysql_slow_query.lua"

    [Sync-1_5-SlowQueries.config]
    anomaly_config = 'mww_nonparametric("Statistics", 5, 15, 10, 0.8)'

StatFilter

Filter plugin that accepts messages of a specfied form and uses extracted message data to generate statsd-style numerical metrics in the form of Stat objects that can be consumed by a StatAccumulator.

Config:

  • Metric:

    Subsection defining a single metric to be generated:

    • type (string):

      Metric type, supports “Counter”, “Timer”, “Gauge”.

    • name (string):

      Metric name, must be unique.

    • value (string):

      Expression representing the (possibly dynamic) value that the StatFilter should emit for each received message.

  • stat_accum_name (string):

    Name of a StatAccumInput instance that this StatFilter will use as its StatAccumulator for submitting generate stat values. Defaults to “StatAccumInput”.

Example (Assuming you had TransformFilter inserting messages as above):

[StatAccumInput]
ticker_interval = 5

[StatsdInput]
address = "127.0.0.1:29301"

[Hits]
type = "StatFilter"
message_matcher = 'Type == "ApacheLogfile"'

[Hits.Metric.bandwidth]
type = "Counter"
name = "httpd.bytes.%Hostname%"
value = "%Bytes%"

[Hits.Metric.method_counts]
type = "Counter"
name = "httpd.hits.%Method%.%Hostname%"
value = "1"

Note

StatFilter requires an available StatAccumInput to be running.

SandboxFilter

The sandbox filter provides an isolated execution environment for data analysis. Any output generated by the sandbox is injected into the payload of a new message for further processing or to be output.

Config:

Example:

[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
filename = "counter.lua"
preserve_data = true
profile = false

    [hekabench_counter.config]
    rows = 1440
    sec_per_row = 60

SandboxManagerFilter

The SandboxManagerFilter provides dynamic control (start/stop) of sandbox filters in a secure manner without stopping the Heka daemon. Commands are sent to a SandboxManagerFilter using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.

SandboxManagerFilter Settings

  • Common Filter Parameters

  • working_directory (string):

    The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.

  • module_directory (string):

    The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules.

  • max_filters (uint):

    The maximum number of filters this manager can run.

New in version 0.5.

  • memory_limit (uint):

    The number of bytes managed sandboxes are allowed to consume before being terminated (max 8MiB, default max).

  • instruction_limit (uint):

    The number of instructions managed sandboxes are allowed the execute during the process_message/timer_event functions before being terminated (max 1M, default max).

  • output_limit (uint):

    The number of bytes managed sandbox output buffers can hold before before being terminated (max 63KiB, default max). Anything less than 64B is set to 64B.

Example

[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
# message_matcher = "Type == 'heka.control.sandbox'" # automatic default setting
max_filters = 100

Unique Items

New in version 0.6.

Counts the number of unique items per day e.g. active daily users by uid.

Config:

  • message_variable (string, required)

    The Heka message variable containing the item to be counted.

  • title (string, optional, default “Estimated Unique Daily message_variable”)

    The graph title for the cbuf output.

  • enable_delta (bool, optional, default false)

    Specifies whether or not this plugin should generate cbuf deltas. Deltas should be enabled when sharding is used; see: Circular Buffer Delta Aggregator.

Example Heka Configuration

[FxaActiveDailyUsers]
type = "SandboxFilter"
filename = "lua_filters/unique_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'FxaAuth' && Type == 'request.summary' && Fields[path] == '/v1/certificate/sign' && Fields[errno] == 0"

    [FxaActiveDailyUsers.config]
    message_variable = "Fields[uid]"
    title = "Estimated Active Daily Users"