Disclaimer: While I usually reword several pieces of the content I take as notes from books like this one, many other pieces are just literal texts extracted from the book(s). The authors of those literal texts extracted from the books are the only owners of such content. If you're the co/author, publisher, or, in general, a copyright owner of this book or related material and consider the content of this website (https://lealceldeiro.com/) doesn't comply with your work's copyright policy, please get in touch .


Main notes taken from the book Kafka: The Definitive Guide.


Chapter 1: Meet Kafka

Data within Kafka is stored durably, in order, and can be read deterministically.

In addition, the data can be distributed within the system to provide additional protections against failures, as well as significant opportunities for scaling performance.

The unit of data within Kafka is the message.

A message can have an optional piece of metadata, which is referred to as a key.

The message and the key are byte arrays and have no specific meaning to Kafka.

The offset, an integer value that continually increases, is another piece of metadata that Kafka adds to each message as it is produced.

A single Kafka server is called a broker.

Kafka brokers are designed to operate as part of a cluster.

Within a cluster of brokers, one broker will also function as the cluster controller.

A partition is owned by a single broker in the cluster, and that broker is called the leader of the partition.

A replicated partition is assigned to additional brokers, called followers of the partition.

Some pros

Use Cases

Chapter 2: Installing Kafka

My own experience running Kafka locally with Docker

For this to work, Docker must be installed.

The book starts by mentioning the installation of Kafka and ZooKeeper, but after KRaft was marked as Production Ready , only the Kafka image is actually needed when the version 2.8 or later of Apache Kafka is used.

The following example uses bitnami/kafka and is based in that official documentation examples.

General Broker Parameters

Selecting Hardware

Factors that can contribute to overall performance bottlenecks

Disk throughput and capacity

Faster disk writes will equal lower produce latency.

Generally, observations show that HDD drives are typically more useful for clusters with very high storage needs but aren't accessed as often, while SSDs are better options if there is a very large number of client connections.

The total traffic for a cluster can be balanced across the cluster by having multiple partitions per topic, which will allow additional brokers to augment the available capacity if the density on a single broker will not suffice.

Memory

The normal mode of operation for a Kafka consumer is reading from the end of the partitions, where the consumer is caught up and lagging behind the producers very little, if at all. In this situation, the messages the consumer is reading are optimally stored in the system's page cache, resulting in faster reads than if the broker has to reread the messages from disk. Therefore, having more memory available to the system for page cache will improve the performance of consumer clients.

Kafka itself does not need much heap memory configured for the JVM. Even a broker that is handling 150000 messages per second and a data rate of 200 megabits per second can run with a 5GB heap.

It is not recommended to have Kafka running on a system with any other significant application, as it will have to share the use of the page cache. This will decrease the consumer performance for Kafka.

Networking

The available network throughput will specify the maximum amount of traffic that Kafka can handle. This can be a governing factor, combined with disk storage, for cluster sizing. To prevent the network from being a major governing factor, it is recommended to run with at least 10 Gb NICs (Network Interface Cards). Older machines with 1 Gb NICs are easily saturated and aren't recommended.

CPU

Processing power is not as important as disk and memory until you begin to scale Kafka very large, but it will affect overall performance of the broker to some extent.

Configuring Kafka Clusters

Typically, the size of a cluster will be bound on the following key areas:

Currently —2023—, in a well-configured environment, it is recommended to not have more than 14000 partition replicas per broker and 1 million replicas per cluster.

Requirements in the broker configuration to allow multiple Kafka brokers to join a single cluster:

  1. All brokers must have the same configuration for the zookeeper.connect parameter
  2. All brokers in the cluster must have a unique value for the broker.id parameter

The current number of dirty pages can be determined by checking:


cat /proc/vmstat | egrep "dirty|writeback"

The most common choices for local filesystems are either Ext4 (fourth extended filesystem) or Extents File System (XFS), being XFS the preferred option because it outperforms Ext4 for most workloads with minimal tuning required.

Production Concerns

JVM Garbage Collector Options

As of this writing, the Garbage-First garbage collector (G1GC) is the recommended one, but it's worth checking the official Kafka documentation related to this topic to get the latest updates; and a best practice is to use if (G1GC) for anything for Java 1.8 and later.

Configuration options for G1GC used to adjust its performance:

Datacenter Layout

A datacenter environment that has a concept of fault zones is preferable.

It is recommended to use tools that keep your cluster balanced properly to maintain rack awareness, such as Cruise Control .

Overall, the best practice is to have each Kafka broker in a cluster installed in a different rack, or at the very least not share single points of failure for infrastructure services such as power and network.

Chapter 3: Kafka Producers: Writing Messages to Kafka

Source code

The source code of a sample Java Kafka Producer, based on the content from this chapter, can be found here.

Producer Overview

Messages to Kafka are produced by creating a ProducerRecord, which includes

When the ProducerRecord is sent, the first thing the producer will do is serialize the key and value objects to byte arrays, so they can be sent over the network.

If a partition isn't explicitly specified, the data is sent to a partitioner (which chooses a partition for the message, usually, based on the key).

Then, the record is added to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers.

Constructing a Kafka Producer

Mandatory properties of a Kafka producer:

All the configuration options are available in the official docs .

Primary methods of sending messages:

Configuring Producers

Some of the parameters that have a significant impact on memory use, performance, and reliability of the producers:

Serializers

Apache Avro

Apache Avro is a language-neutral data serialization format.

Generating Avro classes can be done either using the `avro-tools.jar` or the Avro Maven plugin, both part of Apache Avro (see docs).

Partitions

When partitioning keys is important, the easiest solution is to create topics with sufficient partitions, see How to Choose the Number of Topics/Partitions in a Kafka Cluster?

Headers

Records can also include headers. Record headers give us the ability to add some metadata about the Kafka record, without adding any extra information to the key/value pair of the record itself.

Interceptors

Interceptors can help us to modify the behavior of our Kafka client application without modifying its code.

Common use cases for producer interceptors include capturing monitoring and tracing information; enhancing the message with standard headers, especially for lineage tracking purposes; and redacting sensitive information.

Quotas and Throttling

Kafka brokers have the ability to limit the rate at which messages are produced and consumed. This is done via the quota mechanism. Kafka has three quota types: produce, consume, and request.

Produce and consume quotas limit the rate at which clients can send and receive data, measured in bytes per second. Request quotas limit the percentage of time the broker spends processing client requests.

When a client reaches its quota, the broker will start throttling the client's requests to prevent it from exceeding the quota. This means that the broker will delay responses to client requests; in most clients this will automatically reduce the request rate (since the number of in-flight requests is limited) and bring the client traffic down to a level allowed by the quota. To protect the broker from misbehaved clients sending additional requests while being throttled, the broker will also mute the communication channel with the client for the period of time needed to achieve compliance with the quota.

Chapter 4: Kafka Consumers: Reading Data from Kafka

The source code of a sample Java Kafka Consumer, based on the content from this chapter, can be found here.

Applications that need to read data from Kafka use a KafkaConsumer to subscribe to Kafka topics and receive messages from these topics.

Consumers and Consumer Groups

A consumer group must be created for each application that needs all the messages from one or more topics.

Consumers are added to an existing consumer group to scale the reading and processing of messages from the topics, so each additional consumer in a group will only get a subset of the messages.

Consumer Groups and Partition Rebalance

Moving partition ownership from one consumer to another is called a rebalance.

Types of rebalances:

Consumers maintain membership in a consumer group and ownership of the partitions assigned to them by sending heartbeats to a Kafka broker designated as the group coordinator.

The first consumer to join the group becomes the group leader.

By default, the identity of a consumer as a member of its consumer group is transient, unless it's configured with a unique group.instance.id, which makes the consumer a static member of the group.

If two consumers join the same group with the same group.instance.id, the second consumer will get an error saying that a consumer with this ID already exists.

To consume records from a Kafka broker: create a KafkaConsumer (create a Java Properties instance with the properties to be passed to the consumer; three mandatory properties: bootstrap.servers, key.deserializer, and value.deserializer).

Another common property is group.id, and it specifies the consumer group the KafkaConsumer instance belongs to.

Creating a Kafka Consumer

The Poll Loop

At the heart of the Consumer API is a simple loop for polling the server for more data.

Thread Safety

Multiple consumers that belong to the same group cannot coexist in the same thread, and there cannot be multiple threads safely use the same consumer. One consumer per thread is the rule.

To run multiple consumers in the same group in one application, each of them needs to run in its own thread. It is useful to wrap the consumer logic in its own object and then use Java's ExecutorService to start multiple threads, each with its own consumer. See an example in this Tutorial .

Configuring Consumers

All the consumer configuration is documented in the Apache Kafka documentation .

Assignment strategies (used to configure `partition.assignment.strategy` with one of the org.apache.kafka.clients.consumer.*Assignor values):

Standalone Consumer: Why and How to Use a Consumer Without a Group

When it's known exactly which partitions the consumer should read, the consumer can be not subscribed to a topic, instead, it is assigned a few partitions. A consumer can either subscribe to topics (and be part of a consumer group) or assign itself partitions, but not both at the same time.

Example:


Duration timeout = Duration.ofMillis(100);
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos) {
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    }
    consumer.assign(partitions);

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);

        for (ConsumerRecord<String, String> record: records) {
            // do some work with record
        }
        consumer.commitSync();
    }
}

Chapter 5: Managing Apache Kafka Programmatically

Kafka's AdminClient is asynchronous. It is useful for application developers who want to create topics on the fly and validate that the topics they are using are configured correctly for their application. It is also useful for operators and SREs who want to create tooling and automation around Kafka or need to recover from an incident.

AdminClient Lifecycle: Creating, Configuring, and Closing

Example:


Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// Do something with the AdminClient instance
admin.close(Duration.ofSeconds(30));

All AdminClient configuration can be found at the Apache Kafka officialdocumentation .

Some noteworthy ones are:

AdminClient result objects throw ExecutionException when Kafka responds with an error. This is because AdminClient results are wrapped Future objects, and those wrap exceptions. The cause of ExecutionException needs to be examined always to get the error that Kafka returned.

Chapter 6: Kafka Internals

Kafka uses ZooKeeper's ephemeral node feature to elect a controller and to notify the controller when nodes join and leave the cluster. The controller is responsible for electing leaders among the partitions and replicas whenever it notices nodes join and leave the cluster. The controller uses the epoch number to prevent a "split brain" scenario where two nodes believe each is the current controller.

From the list of replicas for a partition displayed by the kafka-topics.sh tool, the first replica in the list is always the preferred leader.

This is true no matter who is the current leader and even if the replicas were reassigned to different brokers using the replica reassignment tool.

When replicas are manually reassigned, the first replica specified will be the preferred one.

Common types of client requests:

New brokers know how to handle old requests, but old brokers don't know how to handle new requests. Now, in release 0.10.0, ApiVersionRequest was added, which allows clients to ask the broker which versions of each request are supported and to use the correct version accordingly. Clients that use this new capability correctly will be able to talk to older brokers by using a version of the protocol that is supported by the broker they are connecting to.

Chapter 7: Reliable Data Delivery

Reliability is not just a matter of specific Kafka features. An entire reliable system needs to be built, including the application architecture, the way applications use the producer and consumer APIs, producer and consumer configuration, topic configuration, and broker configuration. Making the system more reliable always has trade-offs in application complexity, performance, availability, or disk-space usage. By understanding all the options and common patterns and understanding requirements for each use case, more informed decisions can be made regarding how reliable the application and Kafka deployment need to be and which trade-offs make sense.

Guarantees provided by Kafka:

These guarantees can then be "tweaked" by changing Kafka's configuration, depending on the business needs and trade-offs made taking into account other important considerations, such as availability, high throughput, low latency, and hardware costs.

If we allow out-of-sync replicas to become leaders, we risk data loss and inconsistencies. If we don't allow them to become leaders, we face lower availability as we must wait for the original leader to become available before the partition is back online.

There are two important things that everyone who writes applications that produce to Kafka must pay attention to:

Some failure conditions under which integration tests can be run:

Trogdor, the test framework for Apache Kafka could be used to simulate systems faults.

Burrow, the Kafka Consumer Lag Checking could be useful in helping monitoring of consumer lags.

Chapter 8: Exactly-Once Semantics

Mechanisms that provide exactly-once guarantees:

  1. idempotent producer - which avoids duplicates that are caused by the retry mechanism
  2. transactions - form the basis of exactly-once semantics in Kafka Streams

When the idempotent producer is enabled, each message will include a unique identified producer ID (PID) and a sequence number, which together with the target topic and partition, uniquely identify each message.

The idempotent producer will only prevent duplicates caused by the retry mechanism of the producer itself, whether the retry is caused by producer, network, or broker errors. But nothing else.

Exactly-once processing means that consuming, processing, and producing are done atomically. Either the offset of the original message is committed and the result is successfully produced or neither of these things happen. We need to make sure that partial results—where the offset is committed but the result isn't produced, or vice versa—can't happen. To support this behavior, Kafka transactions introduce the idea of atomic multi-partition writes.

A transactional producer is simply a Kafka producer that is configured with a transactional.id and has been initialized using initTransactions().

Transactions provide exactly-once guarantees when used within chains of consume-process-produce stream processing tasks. In other contexts, transactions will either straight-out not work or will require additional effort in order to achieve the guarantees we want.

Two mistakes are assuming that exactly-once guarantees apply on actions other than producing to Kafka, and that consumers always read entire transactions and have information about transaction boundaries.

The most common and most recommended way to use transactions is to enable exactly-once guarantees in Kafka Streams.

To enable exactly-once guarantees for a Kafka Streams application, we simply set the processing.guarantee configuration to either exactly_once or exactly_once_beta.

Chapter 9: Building Data Pipelines

The main value Kafka provides to data pipelines is its ability to serve as a very large, reliable buffer between various stages in the pipeline.

Run Kafka Connect


bin/connect-distributed.sh config/connect-distributed.properties

The Debezium Project provides a collection of high-quality, open source, change capture connectors for a variety of databases.

Kafka can be looked at as a platform that can handle data integration (with Connect), application integration (with producers and consumers), and stream processing. Kafka could be a viable replacement for an ETL tool that only integrates data stores.

Chapter 10: Cross-Cluster Data Mirroring

Apache Kafka's built-in cross-cluster replicator is called MirrorMaker.

Use Cases of Cross-Cluster Mirroring

Multicluster Architectures

Some principles that should guide these architectures:

Hub-and-Spoke Architecture

The main benefit of this architecture is that data is always produced to the local datacenter and events from each datacenter are only mirrored once—to the central datacenter.

The main drawback of this architecture is the direct result of its benefits and simplicity. Processors in one regional datacenter can't access data in another.

Active-Active Architecture

One benefit of this architecture is the ability to serve users from a nearby datacenter, which typically has performance benefits, without sacrificing functionality due to limited availability of data.

Another benefit is redundancy and resilience. Since every datacenter has all the functionality, if one datacenter is unavailable, you can direct users to a remaining datacenter. This type of failover only requires network redirects of users, typically the easiest and most transparent type of failover.

The main drawback of this architecture is the challenge in avoiding conflicts when data is read and updated asynchronously in multiple locations.

Active-Standby Architecture

The benefits of this setup are simplicity in setup and the fact that it can be used in pretty much any use case.

The disadvantages are waste of a good cluster and the fact that failover between Kafka clusters is, in fact, much harder than it looks.

Stretch Clusters

One advantage of this architecture is in the synchronous replication—some types of business simply require that their DR site is always 100% synchronized with the primary site.

Other advantage is that both datacenters and all brokers in the cluster are used. There is no waste like in active-standby architectures.

This architecture is limited in the type of disasters it protects against. It only protects from datacenter failures, not any kind of application or Kafka failures. The operational complexity is also limited. This architecture demands physical infrastructure that not all companies can provide.

Apache Kafka's MirrorMaker

MirrorMaker is highly configurable. In addition to the cluster settings to define the topology, Kafka Connect, and connector settings, every configuration property of the underlying producer, consumers, and admin client used by MirrorMaker can be customized.

Example: start MirrorMaker with the configuration options specified in the properties file:


bin/connect-mirror-maker.sh etc/kafka/connect-mirror-maker.properties

Any number of these processes can be started to form a dedicated MirrorMaker cluster that is scalable and fault-tolerant. The processes mirroring to the same cluster will find each other and balance load between them automatically. Usually when running MirrorMaker in a production environment, we'd want to run MirrorMaker as a service, running in the background with `nohup` and redirecting its console output to a log file. The tool also has -daemon as a command-line option that should do that for us.

Production deployment systems like Ansible, Puppet, Chef, and Salt are often used to automate deployment and manage the many configuration options. MirrorMaker may also be run inside a Docker container. MirrorMaker is completely stateless and doesn't require any disk storage (all the data and state are stored in Kafka itself).

When deploying MirrorMaker in production, it is important to monitor it as follows:

Chapter 11: Securing Kafka

Some security procedures applied in Kafka to establish and maintain confidentiality/integrity/availability of data

A secure deployment must guarantee

Security Protocols

Each Kafka security protocol combines a transport layer (PLAINTEXT or SSL) with an optional authentication layer (SSL or SASL):

Example: Configure SSL for the inter-broker and internal listeners, and SASL_SSL for the external listener:


listeners=EXTERNAL://:9092,INTERNAL://10.0.0.2:9093,BROKER://10.0.0.2:9094
advertised.listeners=EXTERNAL://broker1.example.com:9092,INTERNAL://broker1.local:9093,BROKER://broker1.local:9094
listener.security.protocol.map=EXTERNAL:SASL_SSL,INTERNAL:SSL,BROKER:SSL
inter.broker.listener.name=BROKER

Then clients are configured with a security protocol and bootstrap servers that determine the broker listener. Metadata returned to clients contains only the endpoints corresponding to the same listener as the bootstrap servers:


security.protocol=SASL_SSL
bootstrap.servers=broker1.example.com:9092,broker2.example.com:9092

SASL

Kafka brokers support the following SASL mechanisms out of the box:

Kafka uses the Java Authentication and Authorization Service (JAAS) for configuring SASL.

SASL/GSSAPI

Generic Security Service Application Program Interface (GSS-API) is a framework for providing security services to applications using different authentication mechanisms.

SASL/SCRAM

SCRAM applies a one-way cryptographic hash function on the password combined with a random salt to avoid the actual password being transmitted over the wire or stored in a database.

Kafka provides safeguards by supporting only the strong hashing algorithms SHA-256 and SHA-512 and avoiding weaker algorithms like SHA-1. This is combined with a high default iteration count of 4,096 and unique random salts for every stored key to limit the impact if ZooKeeper security is compromised.

SASL/OAUTHBEARER

Kafka supports SASL/OAUTHBEARER for client authentication, enabling integration with third-party OAuth servers. The built-in implementation of OAUTHBEARER uses unsecured JSON Web Tokens (JWTs) and is not suitable for production use. Custom callbacks can be added to integrate with standard OAuth servers to provide secure authentication using the OAUTHBEARER mechanism in production deployments.

Because the built-in implementation of SASL/OAUTHBEARER in Kafka does not validate tokens, it only requires the login module to be specified in the JAAS configuration. If the listener is used for inter-broker communication, details of the token used for client connections initiated by brokers must also be provided.

The option unsecuredLoginStringClaim_sub is the subject claim that determines the KafkaPrincipal for the connection by default. Example:


sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
listener.name.external.oauthbearer.sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required unsecuredLoginStringClaim_sub="kafka";

Clients must be configured with the subject claim option `unsecuredLoginStringClaim_sub`. Other claims and token lifetime may also be configured:


sasl.mechanism=OAUTHBEARER
sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required unsecuredLoginStringClaim_sub="John";

To integrate Kafka with third-party OAuth servers for using bearer tokens in production, Kafka clients must be configured with sasl.login.callback.handler.class to acquire tokens from the OAuth server using the long-term password or a refresh token.

If OAUTHBEARER is used for inter-broker communication, brokers must also be configured with a login callback handler to acquire tokens for client connections created by the broker for inter-broker communication.

To create and validate delegation tokens, all brokers must be configured with the same master key using the configuration option delegation.token.master.key.

Delegation tokens also are suitable for production use only in deployments where ZooKeeper is secure.

Reauthentication

Kafka brokers support reauthentication for connections authenticated using SASL using the configuration option connections.max.reauth.ms.

Authorization

Kafka brokers manage access control using a customizable authorizer.

Kafka has a built-in authorizer, `AclAuthorizer`, that can be enabled by configuring the authorizer class name as follows:


authorizer.class.name=kafka.security.authorizer.AclAuthorizer

Super users are granted access for all operations on all resources without any restrictions and cannot be denied access using Deny ACLs.

Security Considerations

Since AclAuthorizer stores ACLs in ZooKeeper, access to ZooKeeper should be restricted. Deployments without a secure ZooKeeper can implement custom authorizers to store ACLs in a secure external database.

Restricting user access using the principle of least privilege can limit exposure if a user is compromised.

ACLs should be removed immediately when a user principal is no longer in use, for instance, when a person leaves the organization.

Long-running applications can be configured with service credentials rather than credentials associated with a specific user to avoid any disruption when employees leave the organization.

Auditing

Kafka brokers can be configured to generate comprehensive _log4j_ logs for auditing and debugging. The logging level as well as the appenders used for logging and their configuration options can be specified in log4j.properties.

Securing the platform

Kafka supports externalizing passwords in a secure store.

Customizable configuration providers can be configured for Kafka brokers and clients to retrieve passwords from a secure third-party password store.

Passwords may also be stored in encrypted form in configuration files with custom configuration providers that perform decryption.

Sensitive broker configuration options can also be stored encrypted in ZooKeeper using the Kafka configs tool without using custom providers.

Chapter 12: Administering Kafka

While Apache Kafka implements authentication and authorization to control topic operations, default configurations do not restrict the use of these tools. This means that these CLI tools can be used without any authentication required, which will allow operations such as topic changes to be executed with no security check or audit.

Topic Operations

The kafka-topics.sh tool provides access to most topic operations. It allows us to create, modify, delete, and list information about topics in the cluster. While some topic configurations are possible through this command, they have been deprecated, and it is recommended to use the more robust method of using the kafka-config.sh tool for configuration changes.

Consumer Groups

The kafka-consumer-groups.sh tool helps manage and gain insight into the consumer groups that are consuming from topics in the cluster.

Dynamic Configuration Changes

The kafka-configs.sh is the main tool for modifying all the configurations for topics, clients, brokers, and more that can be updated dynamically during runtime without having to shut down or redeploy a cluster.

Producing and Consuming

To manually produce or consume some sample messages these tools are provided: kafka-console-consumer.sh and kafka-console-producer.sh.

Example

Consuming the earliest message from the __consumer_offsets topic:


kafka-console-consumer.sh --bootstrap-server localhost:9092
                          --topic __consumer_offsets --from-beginning --max-messages 1
                          --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
                          --consumer-property exclude.internal.topics=false

This will show something like this:


[my-group-name,my-topic,0]::[OffsetMetadata[1,NO_METADATA]
CommitTime 1623034799990 ExpirationTime 1623639599990]
Processed a total of 1 messages

Partition Management

A default Kafka installation contains a few scripts for working with the management of partitions. One of these tools allows for the reelection of leader replicas; another is a low-level utility for assigning partitions to brokers. Together these tools can assist in situations where a more manual hands-on approach to balance message traffic within a cluster of Kafka brokers is needed.

The kafka-leader-election.sh utility can be used to trigger manually a new leader election.

Example

Start a preferred leader election for all topics in a cluster:


kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --all-topic-partitions

It is also possible to start elections on specific partitions or topics. This can be done by passing in a topic name with the `--topic` option and a partition with the `--partition` option directly. It is also possible to pass in a list of several partitions to be elected through a JSON file with the topic names.

Example

Start a preferred replica election with a specified list of partitions in a file named partitions.json, with the following content:


{
    "partitions": [
        {
            "partition": 1,
            "topic": "my-topic"
        },
        {
            "partition": 2,
            "topic": "foo"
        }
    ]
}

kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file partitions.json

The kafka-reassign-partitions.sh can be used to change the replica assignments manually for a partition.

The kafka-dump-log.sh tool can be used to decode the log segments for a partition.

The kafka-replica-verification.sh tool can be used to validate that the replicas for a topic's partitions are the same across the cluster.

Unsafe Operations

Chapter 13: Monitoring Kafka

If the metrics are consumed by automation, they should be very specific. It's OK to have a large number of metrics, each describing small details, because computers process a lot of data easily. The more specific the data is, the easier it is to create automation that acts on it, because the data does not leave as much room for interpretation as to its meaning. On the other hand, if the metrics will be consumed by humans, presenting a large number of metrics will be overwhelming. This becomes even more important when defining alerts based on those measurements.

Application Health Checks

Two ways to do a health check:

Service-Level Objectives (SLOs)

A service-level indicator (SLI) is a metric that describes one aspect of a service's reliability.

A service-level objective (SLO), which can also be called a service-level threshold (SLT), combines an SLI with a target value.

The term operational-level agreement (OLA) is less frequently used. It describes agreements between multiple internal services or support providers in the overall delivery of an SLA.

Further reads to dive deeper into these topics are:

Kafka Broker Metrics

Diagnosing Cluster Problems

Categories of cluster problems:

The under-replicated partitions measurement, provided on each broker in a cluster, gives a count of the number of partitions for which the broker is the leader replica, where the follower replicas are not caught up. This measurement provides insight into a number of problems with the Kafka cluster, from a broker being down to resource exhaustion.

List under-replicated partitions in a cluster:


kafka-topics.sh --bootstrap-server kafka1.example.com:9092/kafka-cluster --describe --under-replicated

Cluster-level problems are usually either Unbalanced load or resource exhaustion.

Host level problems can be any of:

Configuration management system, such as Chef or Puppet can be used to maintain consistent configurations across the operating systems running the Kafka brokers.

In general, the number of request handler threads should be set equal to the number of processors in the system, including hyperthreaded processors.

Logging

Two loggers write to separate files on disk: kafka.controller and kafka.server.ClientQuotaManager, both at the INFO level.

The kafka.request.logger is also useful when debugging issues with Kafka, turned on at either DEBUG or TRACE level.

Client Monitoring

Producer Metrics

The overall producer metrics bean provides attributes describing everything from the sizes of the message batches to the memory buffer utilization.

Consumer Metrics

The overall consumer bean has metrics regarding the lower-level network operations, and the fetch manager bean has metrics regarding bytes, request, and record rates.

Unlike the producer client, the metrics provided by the consumer are useful to look at but not useful for setting up alerts on.

Quotas

A Kafka broker does not use error codes in the response to indicate a client is being throttled. So, it's not obvious to and application that throttling is happening without monitoring the metrics that are provided to show the amount of time that the client is being throttled. The metrics that must be monitored are:

Lag Monitoring

For Kafka consumers, the most important thing to monitor is the consumer lag (the difference between the last message produced in a specific partition and the last message processed by the consumer).

The preferred method of consumer lag monitoring is to have an external process that can watch both the state of the partition on the broker, tracking the offset of the most recently produced message, and the state of the consumer, tracking the last offset the consumer group has committed for the partition.

Burrow can be used to monitor consumer groups. More on this can be checked at this blog post .

Xinfra Monitor can be used to continually produce and consume data from a topic that is spread across all brokers in a cluster. This way, we can monitor if we can produce messages to the broker(s) and if we can consume them. It measures the availability of both produce and consume requests on each broker, as well as the total produce to consume latency.

Chapter 14: Stream Processing

Starting from version 0.10.0, Kafka includes a stream processing library as part of its collection of client libraries, called Kafka Streams (Streams API). It allows developers to consume, process, and produce events in their apps, without relying on an external processing framework.

What Is Stream Processing?

Data stream (event stream, streaming data) is an abstraction that represents an unbounded data. Here, unbounded means infinite and ever-growing. That's because the dataset is unbounded, as, over time, new records keep arriving. Some of its characteristics are:

Stream processing refers to the continuous processing of event streams. It's a programming paradigm, just like request-response and batch processing are.

In a databases context, online transaction processing (OLTP) is the equivalente to request-response is.

The collection of all the information that can be handled by the stream processing pipeline, such as counting the number of events by type, moving averages, and joining two streams to create an enriched stream of information is called a state. In this case it's not enough to look at each event by itself; we need to keep track of more information — how many events of each type did we see this hour, all events that require joining, sums, averages, etc.

Kafka Streams uses Kafka’s transactions to implement exactly-once guarantees for stream processing applications. The exactly-once guarantees provided by the Kafka Streams library can be enabled by setting processing.guarantee to exactly_once.

Stream Processing Design Patterns

The Stream Processing Library

Apache Kafka has two stream APIs: a low-level Processor API and a high-level Streams DSL (see presentation ).

Some examples:

Kafka Streams: Architecture Overview

Every streams application implements and executes one topology (or DAG — or directed acyclic graph).

The main testing tool for Kafka Streams applications is TopologyTestDriver.

Two popular integration test frameworks used for testing Kafka application are: EmbeddedKafkaCluster and Testcontainers.

The blog post Testing Kafka Streams — A Deep Dive has deep explanations and detailed code examples of topologies and tests.

Kafka Streams handles scenarios where there are dependencies between tasks (i.e.: because a processing step requires an input from multiple partitions) by assigning all the partitions needed for one join to the same task so the task can consume from all the relevant partitions and perform the join independently.

Use Cases for Stream Processing

How to Choose a Stream Processing Framework to use with Apache Kafka

Different types of applications call for different stream processing solutions:

There are also some global considerations that should taken into account:

Books that Cover more in Depth Stream Processing