DEVOPS

Collecting Kafka Performance Metrics with OpenTelemetry

In a previous blog post, "Monitoring Kafka Performance with Splunk," we discussed key performance metrics to monitor different components in Kafka. This blog is focused on how to collect and monitor Kafka performance metrics with Splunk Infrastructure Monitoring using OpenTelemetry, a vendor-neutral and open framework to export telemetry data. In this step-by-step getting-started blog, we will:

  • Deploy Kafka in Kubernetes
  • Configure and deploy Splunk OpenTelemetry Connector to automatically discover Kafka brokers
  • (Optional) Enrich broker metrics with Prometheus exporter
  • (Optional) Monitor consumer lag
     

Step1: Deploy Kafka in Kubernetes

Kafka exposes health and performance metrics via Java Management Extensions (JMX), so the first step is to enable JMX metrics on Kafka brokers. 

You can enable JMX metrics by setting KAFKA_JMX_OPTS by one of the following methods depending on the choice of deployment:

Edit Kafka startup script, typically located at bin/kafka-run-class.sh, to include the following parameters to KAFKA_JMX_OPTS

-Djava.rmi.server.hostname=
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=
-Dcom.sun.management.jmxremote.rmi.port=


The port and the rmi.port should be set to the same value, which will allow the Splunk OpenTelemetry Connector to connect using a single port. Parameter local.only should be set to false unless you have the Otel Connector running as a sidecar or on the same instance as the host, where the host is reachable on ‘localhost’

Finally, you want to set the server.hostname as the IP address that will be used to connect to the JMX Url. If you provide the value to be “localhost” , the connection by an external service (like the Otel Connector) can only be made by specifying “localhost” in the agent config, i.e. the agent must be on the same machine or same container to access the JMX url on localhost.

For Kubernetes, since we have the Otel Connector running as a DaemonSet, it needs to connect via the Kafka Broker’s Pod IP address. Thus, we need to set the IP of the pod as the server.hostname using the Downward API. In this example, we do so by setting the Pod IP as an environment variable called ‘MY_POD_IP’

Example Kafka Broker deployment (requires a zookeeper deployment + service as well, reference):

kind: Deployment
apiVersion: apps/v1
metadata:
  name: kafka-broker0
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
      id: "0"
  template:
    metadata:
      labels:
        app: kafka
        id: "0"
    spec:
      containers:
        - name: kafka
          image: wurstmeister/kafka
          ports:
          - containerPort: 9092
          env:
          - name: KAFKA_ADVERTISED_PORT
            value: "9092"
          - name: KAFKA_ZOOKEEPER_CONNECT
            value: zoo1:2181
          - name: KAFKA_BROKER_ID
            value: "0"
          - name: KAFKA_CREATE_TOPICS
            value: test-topic:1:1
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          - name: KAFKA_ADVERTISED_HOST_NAME
            value: $(MY_POD_IP)
          - name: JMX_PORT
            value: "8299"
          - name: KAFKA_JMX_OPTS      
            value: "-Dcom.sun.management.jmxremote
                   -Dcom.sun.management.jmxremote.local.only=false
                   -Dcom.sun.management.jmxremote.rmi.port=8299
                   -Dcom.sun.management.jmxremote.ssl=false
                   -Dcom.sun.management.jmxremote.authenticate=false
                   -Djava.rmi.server.hostname=$(MY_POD_IP)"


Step 2: Deploying Splunk OpenTelemetry Connector as a DaemonSet

Splunk OpenTelemetry Connector can automatically discover Kafka deployments. To configure the connector, create a file – values.yaml – and add the following configuration:

otelAgent:
  config:
    receivers:
      receiver_creator:
        receivers:
          smartagent/kafka:
            rule: type == "pod" && name matches "kafka"
            config:
              endpoint: '`endpoint`:8299'
              clusterName: kafka-test


Finally deploy Splunk OpenTelemetry Connector using Helm:

$ helm repo add splunk-otel-collector-chart https://signalfx.github.io/splunk-otel-collector-chart

$ helm repo update

$ helm install my-splunk-otel-connector --set splunkAccessToken='' --set="splunkRealm=,clusterName=kafka-cluster"
--values values.yaml splunk-otel-collector-chart/splunk-otel-collector


That's it! Kafka performance metrics will start streaming into Splunk Infrastructure Monitoring, which automatically discovers Kafka components and provides out-of-the-box dashboards for instant visibility.

Fig 1: Performance metrics for a specific broker

Java Virtual Machine metrics are collected using MBeans via Java Management Extensions (JMX)

Fig 2: Java Virtual Machine performance metrics

Step 3: (Optional) Enriching with Additional Metrics Using Prometheus Exporter

The GenericJMX plugin does not pick Dynamic MBeans, requiring a new MBean for each topic classified by the broker id and the topic partition that the broker is responsible for. Imagine adding MBeans each time a new broker or a topic is added. Prometheus JMX exporter provides an elegant solution by using regular expressions to dynamically pick up MBeans and expose them as Prometheus metrics.

The JVM running Kafka brokers needs to access the Prometheus exporter jar and config files. Edit Kafka environment variable KAFKA_JMX_OPTS

KAFKA_JMX_OPTS: -javaagent:jmx_prometheus_javaagent-0.14.0.jar=8060:config.yml 


Here, javaagent is referring to the Prometheus JMX exporter jar file. We are also setting 8060 as our HTTP port where the JMX exporter will be exposing all the MBeans as Prometheus metrics and finally, the Kafka configuration file – config.yml

For containerized deployments, expose port 8060 so that the HTTP port is reachable from outside the container.

Instead of mounting Prometheus exporter jar and config files to the image, you can alternatively use PersistentVolumes in Kubernetes: 

- env 
        - name: KAFKA_JMX_OPTS
          value: "-javaagent:/jmxexporter/jmx_prometheus_javaagent-0.14.0.jar=8060:/jmxexporter/config.yml"
          image: <insert-kafka-image>
          name: kafka
          ports:
            - containerPort: 9092
            - containerPort: 8060
          volumeMounts:
            - mountPath: /jmxexporter
              name: jmx-exporter
      volumes:
        - hostPath:
            path: /home/userxyz/jmx-exporter
            type: Directory
          name: jmx-exporter


Prometheus metrics will be avilable at:

$ curl localhost:8060/metrics
# for outside the cluster, use container pod ip - 
$ curl :8060/metrics


You would get a pretty long output, including the metrics that we care about:

$ kubectl exec -it kafka-broker0-7bcf8bdd96-dhjx4 -- curl localhost:8060/metrics | grep fetcher
# HELP kafka_server_replicafetchermanager_minfetchrate Attribute exposed for management (kafka.server<>Value)
# TYPE kafka_server_replicafetchermanager_minfetchrate gauge
kafka_server_replicafetchermanager_minfetchrate{clientId="Replica",} 0.0
# HELP kafka_server_replicafetchermanager_maxlag Attribute exposed for management (kafka.server<>Value)
# TYPE kafka_server_replicafetchermanager_maxlag gauge
kafka_server_replicafetchermanager_maxlag{clientId="Replica",} 0.0
# HELP kafka_server_replicafetchermanager_failedpartitionscount Attribute exposed for management (kafka.server<>Value)
# TYPE kafka_server_replicafetchermanager_failedpartitionscount gauge
kafka_server_replicafetchermanager_failedpartitionscount{clientId="Replica",} 0.0
# HELP kafka_server_replicafetchermanager_deadthreadcount Attribute exposed for management (kafka.server<>Value)
# TYPE kafka_server_replicafetchermanager_deadthreadcount gauge
kafka_server_replicafetchermanager_deadthreadcount{clientId="Replica",} 0.0


Splunk Infrastructure Monitoring can automatically scrape Prometheus metrics. Confiuge the Splunk OpenTelemetry Connector with the endpoint details:

receiver_creator/kafka:
  receivers:
    prometheus_simple:
      rule: type == "pod" && name matches "kafka"
      config:
        endpoint: '`endpoint`:8060'
  watch_observers:
  - k8s_observer


You now have visibility into dynamically generated metrics from JMX for any new topics or brokers that might come online.

Step 4: (Optional) Monitor Consumer Lag

Consumer lag metrics quantify the difference between the latest data written to topics and the data read by consumer applications. Monitoring consumer lag allows us to identify slow or stuck consumers that aren't keeping up with the latest data available in a topic. Since Kafka 0.9.x, Kafka uses a topic called __consumer_offsets to store binary data about the offset consumer by each consumer-group per topic per partition.

Splunk has contributed Kafka Metrics Reciever to OpenTelemetry in order to extract consumer offset information using OpenTelemetry Collector.

If have deployed splunk-otel-collector (v0.26.0+), you can simply add the following to your configuration values.yaml file:

otelK8sClusterReceiver:
  config:
    receivers:
      kafkametrics:
        brokers: kafka-service:9092
        protocol_version: 2.0.0
        scrapers:
          - brokers
          - topics
          - consumers
    service:
      pipelines:
        metrics:
          receivers:
          - prometheus
          - k8s_cluster
          - kafkametrics


Using Kubernetes Service for brokers, you can avoid changing IP addresses when a new broker spins up or gets shuffled with a new pod IP.

Here are the set of metrics that are output by this OpenTelemetry receiver: 

Topic related metrics collected by the go kafka client (Sarama by shopify):

  • kafka.topic.partitions [# of partitions in topic, dims: topic]
  • kafka.partition.current_offset [Current offset of partition, dims: topic,partition]
  • kafka.partition.oldest_offset [Oldest offset of partition, dims: topic,partition]
  • kafka.partition.replicas [# of replicas for partition, dims: topic, partition]
  • kafka.partition.replicas_in_sync [# of synced replicas of partition, dims: topic,partition]
     

Consumer group related metrics collected by reading from the __consumer_offsets topic:

  • kafka.consumer_group.members [Count of members in consumer group, dims: group]
  • kafka.consumer_group.offset [Current offset of consumer group at partition, dims: group, topic, partition]
  • kafka.consumer_group.lag [Current approx lag of consumer group at partition, dims: group, topic, partition]
  • kafka.consumer_group.lag_sum [Current approx sum of consumer group lag across all partitions of topic, dims: group, topic]
  • kafka.consumer_group.offset_sum [Sum of consumer group offset across partitions, dims: group, topic]
     

You can instantly start monitoring consumer lag metrics within Splunk Infrastructure Monitoring:

Start Monitoring Your Kafka Cluster

This blog covered how to collect key performance metrics from Kafka cluster using OpenTelemetry Collector and get instant visibility using Splunk Infrastructure Monitoring.

Don't forget to check out my next blog post, Distributed Tracing for Kafka Clients with OpenTelemetry and Splunk APM, to learn to enable distributed tracing for Kafka clients with OpenTelemetry and Splunk APM.

You can get started by signing up for a free 14 day trial of Splunk Infrastructure Monitoring and check out our documentation for details about additional Kafka performance metrics. 


This blog was co-authored with Amit Sharma

Harnit Singh
Posted by

Harnit Singh

I've worked as a Sr. Software Engineer at Sun Microsystems/Oracle. Then I joined SignalFx as their first sales engineer hire.

TAGS

Collecting Kafka Performance Metrics with OpenTelemetry

Show All Tags
Show Less Tags

Join the Discussion