Plugin Name: AMQPInput
Connects to a remote AMQP broker (RabbitMQ) and retrieves messages from the specified queue. As AMQP is dynamically programmable, the broker topology needs to be specified in the plugin configuration.
Config:
An AMQP connection string formatted per the RabbitMQ URI Spec.
AMQP exchange name
AMQP exchange type (fanout, direct, topic, or headers).
Whether the exchange should be configured as a durable exchange. Defaults to non-durable.
Whether the exchange is deleted when all queues have finished and there is no publishing. Defaults to auto-delete.
The message routing key used to bind the queue to the exchange. Defaults to empty string.
How many messages to fetch at once before message acks are sent. See RabbitMQ performance measurements for help in tuning this number. Defaults to 2.
Name of the queue to consume from, an empty string will have the broker generate a name for the queue. Defaults to empty string.
Whether the queue should be explicitly bound to the exchange. Not all exchanges require the consumer to define and bind their own queue. Defaults to true.
Whether the queue is durable or not. Defaults to non-durable.
Whether the queue is exclusive (only one consumer allowed) or not. Defaults to non-exclusive.
Whether the queue is deleted when the last consumer un-subscribes. Defaults to auto-delete.
Allows ability to specify TTL in milliseconds on Queue declaration for expiring messages. Defaults to undefined/infinite.
A sub-section that specifies the settings to be used for restart behavior. See Configuring Restarting Behavior
New in version 0.6.
An optional sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if URL uses the AMQPS URI scheme. See Configuring TLS.
New in version 0.9.
Whether the AMQP user is read-only. If this is true the exchange, queue and binding must be declared before starting Heka. Defaults to false.
Since many of these parameters have sane defaults, a minimal configuration to consume serialized messages would look like:
[AMQPInput]
url = "amqp://guest:guest@rabbitmq/"
exchange = "testout"
exchange_type = "fanout"
Or you might use a PayloadRegexDecoder to parse OSX syslog messages with the following:
[AMQPInput]
url = "amqp://guest:guest@rabbitmq/"
exchange = "testout"
exchange_type = "fanout"
decoder = "logparser"
[logparser]
type = "MultiDecoder"
subs = ["logline", "leftovers"]
[logline]
type = "PayloadRegexDecoder"
MatchRegex = '\w+ \d+ \d+:\d+:\d+ \S+ (?P<Reporter>[^\[]+)\[(?P<Pid>\d+)](?P<Sandbox>[^:]+)?: (?P Remaining>.*)'
[logline.MessageFields]
Type = "amqplogline"
Hostname = "myhost"
Reporter = "%Reporter%"
Remaining = "%Remaining%"
Logger = "%Logger%"
Payload = "%Remaining%"
[leftovers]
type = "PayloadRegexDecoder"
MatchRegex = '.*'
[leftovers.MessageFields]
Type = "drop"
Payload = ""
If the downstream heka messages are delimited by Heka’s Stream Framing, you will need to specify “HekaFramingSplitter” as the AMQPInput splitter. An example would look like:
[rsyslog-mq-input]
type = "AMQPInput"
url = "amqp://guest:guest@rabbitmq/"
exchange = "system-logs"
exchange_type = "topic"
exchange_durability = true
exchange_auto_delete = false
routing_key = "system.rsyslog"
queue = "rsyslog-logs"
queue_durability = true
queue_auto_delete = false
prefetch_count = 20
decoder = "rsyslog-multidecoder"
splitter = "HekaFramingSplitter"
[rsyslog-mq-input.retries]
max_delay = "180s"
delay = "30s"
max_retries = -1
[rsyslog-multidecoder]
type = "MultiDecoder"
subs = ["ProtobufDecoder", "rsyslog-decoder"]
cascade_strategy = "all"
log_sub_errors = true
[ProtobufDecoder]
[rsyslog-decoder]
type = "SandboxDecoder"
filename = "/usr/share/heka/lua_decoders/rsyslog.lua"
[rsyslog-decoder.config]
hostname_keep = true
template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'