Configure the Kafka Producer

The Kafka Producer exposes the following options to help fine-tune its behavior.

Name Description Default

eventTopic

Name of the topic used for events. Set this to an empty string to disable forwarding events.

events

alarmTopic

Name of the topic used for alarms. Set this to an empty string to disable forwarding alarms.

alarms

alarmFeedbackTopic

Name of the topic used for alarm feedback. Set this to an empty string to disable forwarding alarm feedback.

alarmFeedback

nodeTopic

Name of the topic used for nodes. Set this to an empty string to disable forwarding nodes. Set this to an empty string to disable forwarding topologies.

nodes

topologyVertexTopic

Name of the topic used for topology vertices.

vertices

topologyEdgeTopic

Name of the topic used for topology edges.

edges

metricTopic

Name of the topic used for metrics.

metrics

eventFilter

A Spring SpEL expression (see below) used to filter events. Set this to an empty string to disable filtering, and forward all events.

none

alarmFilter

A Spring SpEL expression (see below) to filter alarms. Set this to an empty string to disable filtering, and forward all alarms.

none

forward.metrics

Set this value to true to enable forwarding of metrics.

false

nodeRefreshTimeoutMs

Number of milliseconds to wait before looking up a node in the database again. Decrease this value to improve accuracy at the cost of additional database lookups.

300000 (5 minutes)

suppressIncrementalAlarms

Suppresses forwarding alarms that differ only by count or last event time. Set this to false to prevent suppressing these alarms.

true

kafkaSendQueueCapacity

The capacity for the queue of Kafka messages that is used when a Kafka message is pushed but Kafka is unavailable.

1000

startAlarmSyncWithCleanState

Set this to true to force the Kafka Streams client to start with a clean state on every boot.

false

alarmSync

Set this to false to disable synchronization of the alarms topics. This is automatically disabled when alarm forwarding is not enabled.

true

Configure filtering

Use filtering to selectively forward events and/or alarms to the Kafka topics.

Filtering is performed using a Spring SpEL expression, which is evaluated against each object to determine if it should be forwarded. The expression must return a boolean value.

Enable event filtering

To enable event filtering, set the value of the eventFilter property to a valid SpEL expression.

$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer
admin@opennms()> config:property-set eventFilter 'getUei().equals("uei.opennms.org/internal/discovery/newSuspect")'
admin@opennms()> config:update

In the example above, the filter is configured such that only events with the given UEI are forwarded. Consult the source code of the org.opennms.netmgt.xml.event.Event class in your distribution for a complete list of available properties.

Enable Alarm Filtering

To enable alarm filtering, set the value of the alarmFilter property to a valid SpEL expression.

$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer
admin@opennms()> config:property-set alarmFilter 'getTTicketId() != null'
admin@opennms()> config:update

In the example above, the filter is configured such that only alarms that are associated with a ticket ID are forwarded. Consult the source code of the org.opennms.netmgt.model.OnmsAlarm class in your distribution for a complete list of available properties.

Enable metric forwarding

To enable metric forwarding, set the value of the forward.metrics property to true.

The producer’s code sends a full CollectionSet per message to Kafka. Large network equipment might have hundreds of thousands of metrics that could generate messages over the Kafka limit (1MB in size by default). You must increase max.request.size at the producer level and message.max.bytes and replica.fetch.max.bytes at the broker level to avoid problems with the consumers.
$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer
admin@opennms()> config:property-set forward.metrics true
admin@opennms()> config:update

Enable exclusive metric forwarding

Once metric forwarding is enabled, you can use this as the exclusive persistence strategy by setting the following system property:

echo 'org.opennms.timeseries.strategy=osgi' > "$OPENNMS_HOME/etc/opennms.properties.d/kafka-for-metrics.properties"

Configure topic names

By default five topics are created i.e., events, alarms, nodes,vertices, and edges . To change these, you can use:

$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer
admin@opennms()> config:property-set eventTopic ""
admin@opennms()> config:property-set nodeTopic "opennms-nodes"
admin@opennms()> config:update

In the example above, we disable event forwarding by setting an empty topic name and change the node topic name to opennms-nodes.

$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer
admin@opennms()> config:property-set topologyVertexTopic "opennms-bridge-vertex"
admin@opennms()> config:property-set topologyEdgeTopic "opennms-edge-vertex"
admin@opennms()> config:update

In the example above, we set the vertex and edge topics to be different by default.

Configure the alarm sync streams client

When producing alarms to a topic, we also automatically enable a Kafka Stream client to help synchronize the contents of the topic and ensure eventual consistency with the database.

The streams client takes different properties than the producer and requires a separate configuration map.

We automatically pull known properties from the producer configuration, allowing it to work without further configuration in most cases.

If your producer takes a special configuration directive, or you would like to tune the behavior of the stream client, you can set properties in org.opennms.features.kafka.producer.streams:

$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer.streams
admin@opennms()> config:property-set default.dsl.store rocksDB
admin@opennms()> config:update

Any property set in org.opennms.features.kafka.producer.streams will override those inherited from org.opennms.features.kafka.producer.client.