Disclaimer: While I usually reword several pieces of the content I take as notes from books like this one, many other pieces are just literal texts extracted from the book(s). The authors of those literal texts extracted from the books are the only owners of such content. If you're the co/author, publisher, or, in general, a copyright owner of this book or related material and consider the content of this website (https://lealceldeiro.com/) doesn't comply with your work's copyright policy, please get in touch .
Main notes taken from the book Kafka: The Definitive Guide.
Data within Kafka is stored durably, in order, and can be read deterministically.
In addition, the data can be distributed within the system to provide additional protections against failures, as well as significant opportunities for scaling performance.
The unit of data within Kafka is the message.
A message can have an optional piece of metadata, which is referred to as a key.
The message and the key are byte arrays and have no specific meaning to Kafka.
The offset, an integer value that continually increases, is another piece of metadata that Kafka adds to each message as it is produced.
A single Kafka server is called a broker.
Kafka brokers are designed to operate as part of a cluster.
Within a cluster of brokers, one broker will also function as the cluster controller.
A partition is owned by a single broker in the cluster, and that broker is called the leader of the partition.
A replicated partition is assigned to additional brokers, called followers of the partition.
For this to work, Docker must be installed.
The book starts by mentioning the installation of Kafka and ZooKeeper, but after KRaft was marked as Production Ready , only the Kafka image is actually needed when the version 2.8 or later of Apache Kafka is used.
The following example uses bitnami/kafka and is based in that official documentation examples.
This is to get the Apache Kafka server running inside the container accessed by another Apache Kafka client, running inside another container. This is possible because containers attached to the same network can communicate with each other using the container name as the hostname.
docker network create app-tier --driver bridge
Use the--network app-tier
argument to run the docker command to attach the Apache Kafka container to theapp-tier
network.
docker run -d -p 9094:9094 --name kafka-server --hostname kafka-server --network app-tier \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,EXTERNAL://localhost:9094 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:3.6.1
To create a cluster (with more than one broker) see
Setting up an Apache Kafka cluster
.
The following parameters are optional in this minimal setup, and are only required if the server will be accessed from the same local machine by another app running from the host machine.Also, after the container is (created) and ran the first time, then, in subsequent runs, only this is required to start the container:
-p 9094:9094
EXTERNAL://:9094
inKAFKA_CFG_LISTENERS
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,EXTERNAL://localhost:9094 \
- and,
EXTERNAL:PLAINTEXT
inKAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
docker container start kafka-server
This new container instance running the client will connect to the server created in the previous step.
docker run -it --rm --network app-tier bitnami/kafka:3.6.1 kafka-topics.sh --bootstrap-server kafka-server:9092 --list
At this point there shouldn't be any topics displayed.
docker run -it --rm --network app-tier bitnami/kafka:3.6.1 kafka-topics.sh --bootstrap-server kafka-server:9092 \
--create --replication-factor 1 --partitions 1 --topic topic1
docker run -it --rm --network app-tier bitnami/kafka:3.6.1 kafka-topics.sh --bootstrap-server kafka-server:9092 \
--describe --topic topic1
docker run -it --rm --network app-tier \
bitnami/kafka:3.6.1 kafka-console-producer.sh --bootstrap-server kafka-server:9092 --topic topic1
When the console waits for input (symbol>
visible) enter some message and hit Enter (once for every message). To finish producing messages send an end-of-file (EOF) character to close the client. In most common terminals, this is done withControl
+D
(^D
). If that doesn't work, then doCtrl
+C
(^C
).
docker run -it --rm --network app-tier \
bitnami/kafka:3.6.1 kafka-console-consumer.sh --bootstrap-server kafka-server:9092 \
--topic topic1 --from-beginning
run
command
for more details.
-d
: run container in background and print container ID-it
: instructs Docker to allocate a pseudo-TTY connected to the container's stdin;
creating an interactive bash shell in the container.
--rm
: automatically remove the container when it exits
broker.id
: it must be unique for each broker within a single Kafka clusterlisteners
: defined as <protocol>://<hostname>:<port>
, i.e.:
PLAINTEXT://localhost:9092,SSL://:9091
zookeeper.connect
: location of the ZooKeeper used for storing the broker metadata in the format
hostname:port/path
log.dirs
: comma-separated list of paths on disk where all the message log segments are persisted
num.recovery.threads.per.data.dir
: number of recovery threads per log directory
auto.create.topics.enable
: whether to create (or not) automatically topics under certain conditions
auto.leader.rebalance.enable
: this config can be specified to ensure leadership is balanced as much
as possible
delete.topic.enable
: this config can be set to prevent arbitrary deletions of topics (false:
disabling topic deletion)
Faster disk writes will equal lower produce latency.
Generally, observations show that HDD drives are typically more useful for clusters with very high storage needs but aren't accessed as often, while SSDs are better options if there is a very large number of client connections.
The total traffic for a cluster can be balanced across the cluster by having multiple partitions per topic, which will allow additional brokers to augment the available capacity if the density on a single broker will not suffice.
The normal mode of operation for a Kafka consumer is reading from the end of the partitions, where the consumer is caught up and lagging behind the producers very little, if at all. In this situation, the messages the consumer is reading are optimally stored in the system's page cache, resulting in faster reads than if the broker has to reread the messages from disk. Therefore, having more memory available to the system for page cache will improve the performance of consumer clients.
Kafka itself does not need much heap memory configured for the JVM. Even a broker that is handling 150000 messages per second and a data rate of 200 megabits per second can run with a 5GB heap.
It is not recommended to have Kafka running on a system with any other significant application, as it will have to share the use of the page cache. This will decrease the consumer performance for Kafka.
The available network throughput will specify the maximum amount of traffic that Kafka can handle. This can be a governing factor, combined with disk storage, for cluster sizing. To prevent the network from being a major governing factor, it is recommended to run with at least 10 Gb NICs (Network Interface Cards). Older machines with 1 Gb NICs are easily saturated and aren't recommended.
Processing power is not as important as disk and memory until you begin to scale Kafka very large, but it will affect overall performance of the broker to some extent.
Typically, the size of a cluster will be bound on the following key areas:
Currently —2023—, in a well-configured environment, it is recommended to not have more than 14000 partition replicas per broker and 1 million replicas per cluster.
Requirements in the broker configuration to allow multiple Kafka brokers to join a single cluster:
zookeeper.connect
parameterbroker.id
parameterThe current number of dirty pages can be determined by checking:
cat /proc/vmstat | egrep "dirty|writeback"
The most common choices for local filesystems are either Ext4 (fourth extended filesystem) or Extents File System (XFS), being XFS the preferred option because it outperforms Ext4 for most workloads with minimal tuning required.
As of this writing, the Garbage-First garbage collector (G1GC) is the recommended one, but it's worth checking the official Kafka documentation related to this topic to get the latest updates; and a best practice is to use if (G1GC) for anything for Java 1.8 and later.
Configuration options for G1GC used to adjust its performance:
MaxGCPauseMillis
: preferred pause time for each garbage-collection cycleInitiatingHeapOccupancyPercent
: specifies the percentage of the total heap that may be in use
before G1GC will start a collection cycle
A datacenter environment that has a concept of fault zones is preferable.
It is recommended to use tools that keep your cluster balanced properly to maintain rack awareness, such as Cruise Control .
Overall, the best practice is to have each Kafka broker in a cluster installed in a different rack, or at the very least not share single points of failure for infrastructure services such as power and network.
The source code of a sample Java Kafka Producer, based on the content from this chapter, can be found here.
Messages to Kafka are produced by creating a ProducerRecord
, which includes
When the ProducerRecord
is sent, the first thing the producer will do is serialize the key and value
objects to byte arrays, so they can be sent over the network.
If a partition isn't explicitly specified, the data is sent to a partitioner (which chooses a partition for the message, usually, based on the key).
Then, the record is added to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers.
Mandatory properties of a Kafka producer:
bootstrap.servers
: List of host:port pairs of brokers that the producer will use to establish
initial connection to the Kafka cluster.
key.serializer
: Name of a class that will be used to serialize the keys of the records that will be
produced to Kafka.
value.serializer
: Name of a class that will be used to serialize the values of the records that
will be produced to Kafka.
All the configuration options are available in the official docs .
Primary methods of sending messages:
Some of the parameters that have a significant impact on memory use, performance, and reliability of the producers:
client.id
: logical identifier for the client and the application it is used in
acks
: controls how many partition replicas must receive the record before the producer can consider
the write successful (valid values: 0
, 1
, all
)
max.block.ms
: controls how long the producer may block when calling send()
and when
explicitly requesting metadata via partitionsFor()
delivery.timeout.ms
: limits the amount of time spent from the point a record is ready for sending
(send()
returned successfully and the record is placed in a batch) until either the broker responds
or the client gives up, including time spent on retries
request.timeout.ms
: controls how long the producer will wait for a reply from the server when
sending data
retries
: control how many times the producer will retry sending the message before giving up and
notifying the client of an issue. retries=0
disables retrying
retry.backoff.ms
: controls the time to wait when retrying (time-lapse between one and the
subsequents retry) to send a message
linger.ms
: controls the amount of time to wait for additional messages before sending the current
batch
buffer.memory
: sets the amount of memory the producer will use to buffer messages waiting to be
sent to brokers
compression.type
: sets the compression algorithms to be used to compress the data before sending it
to the brokers. Valid values are snappy
, gzip
, lz4
, and zstd
batch.size
: controls the amount of memory in bytes used for each batch when
batching multiple messages together by the producer before sending them to the broker
Apache Avro is a language-neutral data serialization format.
Generating Avro classes can be done either using the `avro-tools.jar` or the Avro Maven plugin, both part of Apache Avro (see docs).
When partitioning keys is important, the easiest solution is to create topics with sufficient partitions, see How to Choose the Number of Topics/Partitions in a Kafka Cluster?
Records can also include headers. Record headers give us the ability to add some metadata about the Kafka record, without adding any extra information to the key/value pair of the record itself.
Interceptors can help us to modify the behavior of our Kafka client application without modifying its code.
Common use cases for producer interceptors include capturing monitoring and tracing information; enhancing the message with standard headers, especially for lineage tracking purposes; and redacting sensitive information.
Kafka brokers have the ability to limit the rate at which messages are produced and consumed. This is done via the quota mechanism. Kafka has three quota types: produce, consume, and request.
Produce and consume quotas limit the rate at which clients can send and receive data, measured in bytes per second. Request quotas limit the percentage of time the broker spends processing client requests.
When a client reaches its quota, the broker will start throttling the client's requests to prevent it from exceeding the quota. This means that the broker will delay responses to client requests; in most clients this will automatically reduce the request rate (since the number of in-flight requests is limited) and bring the client traffic down to a level allowed by the quota. To protect the broker from misbehaved clients sending additional requests while being throttled, the broker will also mute the communication channel with the client for the period of time needed to achieve compliance with the quota.
The source code of a sample Java Kafka Consumer, based on the content from this chapter, can be found here.
Applications that need to read data from Kafka use a KafkaConsumer to subscribe to Kafka topics and receive messages from these topics.
A consumer group must be created for each application that needs all the messages from one or more topics.
Consumers are added to an existing consumer group to scale the reading and processing of messages from the topics, so each additional consumer in a group will only get a subset of the messages.
Moving partition ownership from one consumer to another is called a rebalance.
Types of rebalances:
Consumers maintain membership in a consumer group and ownership of the partitions assigned to them by sending heartbeats to a Kafka broker designated as the group coordinator.
The first consumer to join the group becomes the group leader.
By default, the identity of a consumer as a member of its consumer group is transient, unless it's
configured with a unique group.instance.id
, which makes the consumer a static member of the
group.
If two consumers join the same group with the same group.instance.id, the second consumer will get an error saying that a consumer with this ID already exists.
To consume records from a Kafka broker: create a KafkaConsumer
(create a Java Properties
instance with the properties to be passed to the consumer; three mandatory properties:
bootstrap.servers
, key.deserializer
, and value.deserializer
).
bootstrap.servers
: connection string to a Kafka cluster
key.deserializer
and value.deserializer
: specify classes that turn Java objects to
byte arrays
Another common property is group.id
, and it specifies the consumer group the KafkaConsumer
instance belongs to.
At the heart of the Consumer API is a simple loop for polling the server for more data.
Multiple consumers that belong to the same group cannot coexist in the same thread, and there cannot be multiple threads safely use the same consumer. One consumer per thread is the rule.
To run multiple consumers in the same group in one application, each of them needs to run in its own thread.
It is useful to wrap the consumer logic in its own object and then use Java's ExecutorService
to start
multiple threads, each with its own consumer. See an example in this
Tutorial
.
All the consumer configuration is documented in the Apache Kafka documentation .
Assignment strategies (used to configure `partition.assignment.strategy` with one of
the org.apache.kafka.clients.consumer.*Assignor
values):
When it's known exactly which partitions the consumer should read, the consumer can be not subscribed to a topic, instead, it is assigned a few partitions. A consumer can either subscribe to topics (and be part of a consumer group) or assign itself partitions, but not both at the same time.
Example:
Duration timeout = Duration.ofMillis(100);
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record: records) {
// do some work with record
}
consumer.commitSync();
}
}
Kafka's AdminClient is asynchronous. It is useful for application developers who want to create topics on the fly and validate that the topics they are using are configured correctly for their application. It is also useful for operators and SREs who want to create tooling and automation around Kafka or need to recover from an incident.
Example:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// Do something with the AdminClient instance
admin.close(Duration.ofSeconds(30));
All AdminClient
configuration can be found at the
Apache Kafka officialdocumentation
.
Some noteworthy ones are:
client.dns.lookup
request.timeout.ms
AdminClient
result objects throw ExecutionException
when Kafka responds with an error.
This is because AdminClient
results are wrapped Future
objects, and those wrap exceptions.
The cause of ExecutionException
needs to be examined always to get the error that Kafka returned.
Kafka uses ZooKeeper's ephemeral node feature to elect a controller and to notify the controller when nodes join and leave the cluster. The controller is responsible for electing leaders among the partitions and replicas whenever it notices nodes join and leave the cluster. The controller uses the epoch number to prevent a "split brain" scenario where two nodes believe each is the current controller.
From the list of replicas for a partition displayed by the kafka-topics.sh
tool, the first replica in
the list is always the preferred leader.
This is true no matter who is the current leader and even if the replicas were reassigned to different brokers using the replica reassignment tool.
When replicas are manually reassigned, the first replica specified will be the preferred one.
Common types of client requests:
New brokers know how to handle old requests, but old brokers don't know how to handle new requests. Now, in release
0.10.0
, ApiVersionRequest
was added, which allows clients to ask the broker which versions
of each request are supported and to use the correct version accordingly. Clients that use this new capability
correctly will be able to talk to older brokers by using a version of the protocol that is supported by the broker
they are connecting to.
Reliability is not just a matter of specific Kafka features. An entire reliable system needs to be built, including the application architecture, the way applications use the producer and consumer APIs, producer and consumer configuration, topic configuration, and broker configuration. Making the system more reliable always has trade-offs in application complexity, performance, availability, or disk-space usage. By understanding all the options and common patterns and understanding requirements for each use case, more informed decisions can be made regarding how reliable the application and Kafka deployment need to be and which trade-offs make sense.
Guarantees provided by Kafka:
These guarantees can then be "tweaked" by changing Kafka's configuration, depending on the business needs and trade-offs made taking into account other important considerations, such as availability, high throughput, low latency, and hardware costs.
If we allow out-of-sync replicas to become leaders, we risk data loss and inconsistencies. If we don't allow them to become leaders, we face lower availability as we must wait for the original leader to become available before the partition is back online.
There are two important things that everyone who writes applications that produce to Kafka must pay attention to:
acks
configuration to match reliability requirementsSome failure conditions under which integration tests can be run:
Trogdor, the test framework for Apache Kafka could be used to simulate systems faults.
Burrow, the Kafka Consumer Lag Checking could be useful in helping monitoring of consumer lags.
Mechanisms that provide exactly-once guarantees:
When the idempotent producer is enabled, each message will include a unique identified producer ID (PID) and a sequence number, which together with the target topic and partition, uniquely identify each message.
The idempotent producer will only prevent duplicates caused by the retry mechanism of the producer itself, whether the retry is caused by producer, network, or broker errors. But nothing else.
Exactly-once processing means that consuming, processing, and producing are done atomically. Either the offset of the original message is committed and the result is successfully produced or neither of these things happen. We need to make sure that partial results—where the offset is committed but the result isn't produced, or vice versa—can't happen. To support this behavior, Kafka transactions introduce the idea of atomic multi-partition writes.
A transactional producer is simply a Kafka producer that is configured with a transactional.id
and has
been initialized using initTransactions()
.
Transactions provide exactly-once guarantees when used within chains of consume-process-produce stream processing tasks. In other contexts, transactions will either straight-out not work or will require additional effort in order to achieve the guarantees we want.
Two mistakes are assuming that exactly-once guarantees apply on actions other than producing to Kafka, and that consumers always read entire transactions and have information about transaction boundaries.
The most common and most recommended way to use transactions is to enable exactly-once guarantees in Kafka Streams.
To enable exactly-once guarantees for a Kafka Streams application, we simply set the
processing.guarantee
configuration to either exactly_once
or
exactly_once_beta
.
The main value Kafka provides to data pipelines is its ability to serve as a very large, reliable buffer between various stages in the pipeline.
bin/connect-distributed.sh config/connect-distributed.properties
The Debezium Project provides a collection of high-quality, open source, change capture connectors for a variety of databases.
Kafka can be looked at as a platform that can handle data integration (with Connect), application integration (with producers and consumers), and stream processing. Kafka could be a viable replacement for an ETL tool that only integrates data stores.
Apache Kafka's built-in cross-cluster replicator is called MirrorMaker.
Some principles that should guide these architectures:
The main benefit of this architecture is that data is always produced to the local datacenter and events from each datacenter are only mirrored once—to the central datacenter.
The main drawback of this architecture is the direct result of its benefits and simplicity. Processors in one regional datacenter can't access data in another.
One benefit of this architecture is the ability to serve users from a nearby datacenter, which typically has performance benefits, without sacrificing functionality due to limited availability of data.
Another benefit is redundancy and resilience. Since every datacenter has all the functionality, if one datacenter is unavailable, you can direct users to a remaining datacenter. This type of failover only requires network redirects of users, typically the easiest and most transparent type of failover.
The main drawback of this architecture is the challenge in avoiding conflicts when data is read and updated asynchronously in multiple locations.
The benefits of this setup are simplicity in setup and the fact that it can be used in pretty much any use case.
The disadvantages are waste of a good cluster and the fact that failover between Kafka clusters is, in fact, much harder than it looks.
One advantage of this architecture is in the synchronous replication—some types of business simply require that their DR site is always 100% synchronized with the primary site.
Other advantage is that both datacenters and all brokers in the cluster are used. There is no waste like in active-standby architectures.
This architecture is limited in the type of disasters it protects against. It only protects from datacenter failures, not any kind of application or Kafka failures. The operational complexity is also limited. This architecture demands physical infrastructure that not all companies can provide.
MirrorMaker is highly configurable. In addition to the cluster settings to define the topology, Kafka Connect, and connector settings, every configuration property of the underlying producer, consumers, and admin client used by MirrorMaker can be customized.
Example: start MirrorMaker with the configuration options specified in the properties file:
bin/connect-mirror-maker.sh etc/kafka/connect-mirror-maker.properties
Any number of these processes can be started to form a dedicated MirrorMaker cluster that is scalable and
fault-tolerant. The processes mirroring to the same cluster will find each other and balance load between them
automatically. Usually when running MirrorMaker in a production environment, we'd want to run MirrorMaker as a
service, running in the background with `nohup` and redirecting its console output to a log file. The tool also
has -daemon
as a command-line option that should do that for us.
Production deployment systems like Ansible, Puppet, Chef, and Salt are often used to automate deployment and manage the many configuration options. MirrorMaker may also be run inside a Docker container. MirrorMaker is completely stateless and doesn't require any disk storage (all the data and state are stored in Kafka itself).
When deploying MirrorMaker in production, it is important to monitor it as follows:
Each Kafka security protocol combines a transport layer (PLAINTEXT or SSL) with an optional authentication layer (SSL or SASL):
PLAINTEXT
SSL
SASL_PLAINTEXT
SASL_SSL
Example: Configure SSL for the inter-broker and internal listeners, and SASL_SSL for the external listener:
listeners=EXTERNAL://:9092,INTERNAL://10.0.0.2:9093,BROKER://10.0.0.2:9094
advertised.listeners=EXTERNAL://broker1.example.com:9092,INTERNAL://broker1.local:9093,BROKER://broker1.local:9094
listener.security.protocol.map=EXTERNAL:SASL_SSL,INTERNAL:SSL,BROKER:SSL
inter.broker.listener.name=BROKER
Then clients are configured with a security protocol and bootstrap servers that determine the broker listener. Metadata returned to clients contains only the endpoints corresponding to the same listener as the bootstrap servers:
security.protocol=SASL_SSL
bootstrap.servers=broker1.example.com:9092,broker2.example.com:9092
Kafka brokers support the following SASL mechanisms out of the box:
GSSAPI
PLAIN
SCRAM-SHA-256 and SCRAM-SHA-512
OAUTHBEARER
Kafka uses the Java Authentication and Authorization Service (JAAS) for configuring SASL.
Generic Security Service Application Program Interface (GSS-API) is a framework for providing security services to applications using different authentication mechanisms.
SCRAM applies a one-way cryptographic hash function on the password combined with a random salt to avoid the actual password being transmitted over the wire or stored in a database.
Kafka provides safeguards by supporting only the strong hashing algorithms SHA-256 and SHA-512 and avoiding weaker algorithms like SHA-1. This is combined with a high default iteration count of 4,096 and unique random salts for every stored key to limit the impact if ZooKeeper security is compromised.
Kafka supports SASL/OAUTHBEARER for client authentication, enabling integration with third-party OAuth servers. The built-in implementation of OAUTHBEARER uses unsecured JSON Web Tokens (JWTs) and is not suitable for production use. Custom callbacks can be added to integrate with standard OAuth servers to provide secure authentication using the OAUTHBEARER mechanism in production deployments.
Because the built-in implementation of SASL/OAUTHBEARER in Kafka does not validate tokens, it only requires the login module to be specified in the JAAS configuration. If the listener is used for inter-broker communication, details of the token used for client connections initiated by brokers must also be provided.
The option unsecuredLoginStringClaim_sub
is the subject claim that determines the KafkaPrincipal for
the connection by default. Example:
sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
listener.name.external.oauthbearer.sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required unsecuredLoginStringClaim_sub="kafka";
Clients must be configured with the subject claim option `unsecuredLoginStringClaim_sub`. Other claims and token lifetime may also be configured:
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=\
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required unsecuredLoginStringClaim_sub="John";
To integrate Kafka with third-party OAuth servers for using bearer tokens in production, Kafka clients must be
configured with sasl.login.callback.handler.class
to acquire tokens from the OAuth server using the
long-term password or a refresh token.
If OAUTHBEARER is used for inter-broker communication, brokers must also be configured with a login callback handler to acquire tokens for client connections created by the broker for inter-broker communication.
To create and validate delegation tokens, all brokers must be configured with the same master key using the
configuration option delegation.token.master.key
.
Delegation tokens also are suitable for production use only in deployments where ZooKeeper is secure.
Kafka brokers support reauthentication for connections authenticated using SASL using the configuration option
connections.max.reauth.ms
.
Kafka brokers manage access control using a customizable authorizer.
Kafka has a built-in authorizer, `AclAuthorizer`, that can be enabled by configuring the authorizer class name as follows:
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
Super users are granted access for all operations on all resources without any restrictions and cannot be denied access using Deny ACLs.
Since AclAuthorizer stores ACLs in ZooKeeper, access to ZooKeeper should be restricted. Deployments without a secure ZooKeeper can implement custom authorizers to store ACLs in a secure external database.
Restricting user access using the principle of least privilege can limit exposure if a user is compromised.
ACLs should be removed immediately when a user principal is no longer in use, for instance, when a person leaves the organization.
Long-running applications can be configured with service credentials rather than credentials associated with a specific user to avoid any disruption when employees leave the organization.
Kafka brokers can be configured to generate comprehensive _log4j_ logs for auditing and debugging. The logging level
as well as the appenders used for logging and their configuration options can be specified in
log4j.properties
.
Kafka supports externalizing passwords in a secure store.
Customizable configuration providers can be configured for Kafka brokers and clients to retrieve passwords from a secure third-party password store.
Passwords may also be stored in encrypted form in configuration files with custom configuration providers that perform decryption.
Sensitive broker configuration options can also be stored encrypted in ZooKeeper using the Kafka configs tool without using custom providers.
While Apache Kafka implements authentication and authorization to control topic operations, default configurations do not restrict the use of these tools. This means that these CLI tools can be used without any authentication required, which will allow operations such as topic changes to be executed with no security check or audit.
The kafka-topics.sh
tool provides access to most topic operations. It allows us to create,
modify, delete, and list information about topics in the cluster. While some topic configurations are possible
through this command, they have been deprecated, and it is recommended to use the more robust method of using the
kafka-config.sh
tool for configuration changes.
The kafka-consumer-groups.sh
tool helps manage and gain insight into the consumer groups that are
consuming from topics in the cluster.
The kafka-configs.sh
is the main tool for modifying all the configurations for topics, clients,
brokers, and more that can be updated dynamically during runtime without having to shut down or redeploy a cluster.
To manually produce or consume some sample messages these tools are provided: kafka-console-consumer.sh
and kafka-console-producer.sh
.
Consuming the earliest message from the __consumer_offsets
topic:
kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic __consumer_offsets --from-beginning --max-messages 1
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
--consumer-property exclude.internal.topics=false
This will show something like this:
[my-group-name,my-topic,0]::[OffsetMetadata[1,NO_METADATA]
CommitTime 1623034799990 ExpirationTime 1623639599990]
Processed a total of 1 messages
A default Kafka installation contains a few scripts for working with the management of partitions. One of these tools allows for the reelection of leader replicas; another is a low-level utility for assigning partitions to brokers. Together these tools can assist in situations where a more manual hands-on approach to balance message traffic within a cluster of Kafka brokers is needed.
The kafka-leader-election.sh
utility can be used to trigger manually a new leader election.
Start a preferred leader election for all topics in a cluster:
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --all-topic-partitions
It is also possible to start elections on specific partitions or topics. This can be done by passing in a topic name with the `--topic` option and a partition with the `--partition` option directly. It is also possible to pass in a list of several partitions to be elected through a JSON file with the topic names.
Start a preferred replica election with a specified list of partitions in a file named partitions.json
,
with the following content:
{
"partitions": [
{
"partition": 1,
"topic": "my-topic"
},
{
"partition": 2,
"topic": "foo"
}
]
}
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file partitions.json
The kafka-reassign-partitions.sh
can be used to change the replica assignments manually for a
partition.
The kafka-dump-log.sh
tool can be used to decode the log segments for a partition.
The kafka-replica-verification.sh
tool can be used to validate that the replicas for a topic's
partitions are the same across the cluster.
If the metrics are consumed by automation, they should be very specific. It's OK to have a large number of metrics, each describing small details, because computers process a lot of data easily. The more specific the data is, the easier it is to create automation that acts on it, because the data does not leave as much room for interpretation as to its meaning. On the other hand, if the metrics will be consumed by humans, presenting a large number of metrics will be overwhelming. This becomes even more important when defining alerts based on those measurements.
Two ways to do a health check:
A service-level indicator (SLI) is a metric that describes one aspect of a service's reliability.
A service-level objective (SLO), which can also be called a service-level threshold (SLT), combines an SLI with a target value.
The term operational-level agreement (OLA) is less frequently used. It describes agreements between multiple internal services or support providers in the overall delivery of an SLA.
Further reads to dive deeper into these topics are:
Categories of cluster problems:
The under-replicated partitions measurement, provided on each broker in a cluster, gives a count of the number of partitions for which the broker is the leader replica, where the follower replicas are not caught up. This measurement provides insight into a number of problems with the Kafka cluster, from a broker being down to resource exhaustion.
List under-replicated partitions in a cluster:
kafka-topics.sh --bootstrap-server kafka1.example.com:9092/kafka-cluster --describe --under-replicated
Cluster-level problems are usually either Unbalanced load or resource exhaustion.
Host level problems can be any of:
Configuration management system, such as Chef or Puppet can be used to maintain consistent configurations across the operating systems running the Kafka brokers.
In general, the number of request handler threads should be set equal to the number of processors in the system, including hyperthreaded processors.
Two loggers write to separate files on disk: kafka.controller
and
kafka.server.ClientQuotaManager
, both at the INFO
level.
The kafka.request.logger
is also useful when debugging issues with Kafka, turned on at either
DEBUG
or TRACE
level.
The overall producer metrics bean provides attributes describing everything from the sizes of the message batches to the memory buffer utilization.
The overall consumer bean has metrics regarding the lower-level network operations, and the fetch manager bean has metrics regarding bytes, request, and record rates.
Unlike the producer client, the metrics provided by the consumer are useful to look at but not useful for setting up alerts on.
A Kafka broker does not use error codes in the response to indicate a client is being throttled. So, it's not obvious to and application that throttling is happening without monitoring the metrics that are provided to show the amount of time that the client is being throttled. The metrics that must be monitored are:
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=<THE_CLIENTID>,attribute
fetch-throttle-time-avg
kafka.producer:type=producer-metrics,client-id=<THE_CLIENTID>,attribute produce-throttle-time-avg
For Kafka consumers, the most important thing to monitor is the consumer lag (the difference between the last message produced in a specific partition and the last message processed by the consumer).
The preferred method of consumer lag monitoring is to have an external process that can watch both the state of the partition on the broker, tracking the offset of the most recently produced message, and the state of the consumer, tracking the last offset the consumer group has committed for the partition.
Burrow can be used to monitor consumer groups. More on this can be checked at this blog post .
Xinfra Monitor can be used to continually produce and consume data from a topic that is spread across all brokers in a cluster. This way, we can monitor if we can produce messages to the broker(s) and if we can consume them. It measures the availability of both produce and consume requests on each broker, as well as the total produce to consume latency.
Starting from version 0.10.0, Kafka includes a stream processing library as part of its collection of client libraries, called Kafka Streams (Streams API). It allows developers to consume, process, and produce events in their apps, without relying on an external processing framework.
Data stream (event stream, streaming data) is an abstraction that represents an unbounded data. Here, unbounded means infinite and ever-growing. That's because the dataset is unbounded, as, over time, new records keep arriving. Some of its characteristics are:
Stream processing refers to the continuous processing of event streams. It's a programming paradigm, just like request-response and batch processing are.
In a databases context, online transaction processing (OLTP) is the equivalente to request-response is.
The collection of all the information that can be handled by the stream processing pipeline, such as counting the number of events by type, moving averages, and joining two streams to create an enriched stream of information is called a state. In this case it's not enough to look at each event by itself; we need to keep track of more information — how many events of each type did we see this hour, all events that require joining, sums, averages, etc.
Kafka Streams uses Kafka’s transactions to implement exactly-once guarantees for stream processing applications.
The exactly-once guarantees provided by the Kafka Streams library can be enabled by setting
processing.guarantee
to exactly_once
.
equi-join
and foreign-key join
.
Apache Kafka has two stream APIs: a low-level Processor API and a high-level Streams DSL (see presentation ).
Some examples:
Every streams application implements and executes one topology (or DAG — or directed acyclic graph).
The main testing tool for Kafka Streams applications is TopologyTestDriver
.
Two popular integration test frameworks used for testing Kafka application are: EmbeddedKafkaCluster and Testcontainers.
The blog post Testing Kafka Streams — A Deep Dive has deep explanations and detailed code examples of topologies and tests.
Kafka Streams handles scenarios where there are dependencies between tasks (i.e.: because a processing step requires an input from multiple partitions) by assigning all the partitions needed for one join to the same task so the task can consume from all the relevant partitions and perform the join independently.
Different types of applications call for different stream processing solutions:
There are also some global considerations that should taken into account: