# Configure the Kafka Producer

The Kafka Producer exposes the following options to help fine tune its behavior.

Name Default Value Description

`eventTopic`

`events`

Name of the topic used for events. Set this to an empty string to disable forwarding events.

`alarmTopic`

`alarms`

Name of the topic used for alarms. Set this to an empty string to disable forwarding alarms.

`alarmFeedbackTopic`

`alarmFeedback`

Name of the topic used for alarm. feedback. Set this to an empty string to disable forwarding alarm feedback.

`nodeTopic`

`nodes`

Name of the topic used for nodes. Set this to an empty string to disable forwarding nodes. Set this to an empty string to disable forwarding topologies.

`topologyVertexTopic`

`vertices`

Name of the topic used for topology vertices.

`topologyEdgeTopic`

`edges`

Name of the topic used for topology edges.

`metricTopic`

`metrics`

Name of the topic used for metrics.

`eventFilter`

`-`

A Spring SpEL expression (see below) used to filter events. Set this to an empty string to disable filtering, and forward all events.

`alarmFilter`

`-`

A Spring SpEL expression (see below) used to filter alarms. Set this to an empty string to disable filtering, and forward all alarms.

`forward.metrics`

`false`

Set this value to `true` to enable forwarding of metrics.

`nodeRefreshTimeoutMs`

`300000` (5 minutes)

Number of milliseconds to wait before looking up a node in the database again. Decrease this value to improve accuracy at the cost of additional database look ups.

`alarmSyncIntervalMs`

`300000` (5 minutes)

Number of milliseconds at which the contents of the alarm topic will be synchronized with the local database. Decrease this to improve accuracy at the cost of additional database look ups. Set this value to 0 to disable alarm synchronization.

`suppressIncrementalAlarms`

`true`

Suppresses forwarding alarms that differ only by count or last event time. Set this to `false` to prevent suppressing these alarms.

`kafkaSendQueueCapacity`

`1000`

The capacity for the queue of Kafka messages that is used when a Kafka message is pushed but Kafka is unavailable.

`startAlarmSyncWithCleanState`

`false`

Set this to `true` to force the Kafka Streams client to start with a clean state on every boot.

## Configure Filtering

Use filtering to selectively forward events and/or alarms to the Kafka topics.

Filtering is performed using a Spring SpEL expression, which is evaluated against each object to determine if it should be forwarded. The expression must return a boolean value i.e., `true` or `false`.

### Enable Event Filtering

To enable event filtering, set the value of the `eventFilter` property to a valid SpEL expression.

``````$ssh -p 8101 admin@localhost ... admin@opennms()> config:edit org.opennms.features.kafka.producer admin@opennms()> config:property-set eventFilter 'getUei().equals("uei.opennms.org/internal/discovery/newSuspect")' admin@opennms()> config:update`````` In the example above, the filter is configured such that only events with the given UEI are forwarded. Consult the source code of the `org.opennms.netmgt.xml.event.OnmsEvent` class in your distribution for a complete list of available properties. ### Enable Alarm Filtering To enable alarm filtering, set the value of the `alarmFilter` property to a valid SpEL expression. ``````$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:property-set alarmFilter 'getTTicketId() != null'

In the example above, the filter is configured such that only alarms that are associated with a ticket ID are forwarded. Consult the source code of the `org.opennms.netmgt.model.OnmsAlarm` class in your distribution for a complete list of available properties.

## Enable Metric Forwarding

To enable metric forwarding, set the value of the `forward.metrics` property to `true`.

 The producer’s code sends a full CollectionSet per message to Kafka. Large network equipment might have hundreds of thousands of metrics that could generate messages over the Kafka limit (1MB in size by default). You must increase `max.request.size` at the producer level and `message.max.bytes` and `replica.fetch.max.bytes` at the broker level to avoid problems with the consumers.

## Configure Topic Names

By default five topics are created i.e., `events`, `alarms`, `nodes`,`vertices`, and `edges` . To change these, you can use:

``````$ssh -p 8101 admin@localhost ... admin@opennms()> config:edit org.opennms.features.kafka.producer admin@opennms()> config:property-set eventTopic "" admin@opennms()> config:property-set nodeTopic "opennms-nodes" admin@opennms()> config:update`````` In the example above, we disable event forwarding by setting an empty topic name and change the node topic name to `opennms-nodes`. ``````$ ssh -p 8101 admin@localhost
...