Kafka Configuration
LogScale uses Apache Kafka internally for queuing incoming messages and for storing shared state when running LogScale in a cluster setup. This page describes how to configure Kafka.
LogScale has built-in API endpoints for controlling Kafka. Using the API it is possible to specify partition size and replication factor on the ingest queue. It's also possible to use other Kafka tools, such as the command-line tools included in the Kafka distribution.
Note
Make sure to not apply compression inside Kafka to the queues below.
LogScale compresses the messages when relevant. Letting Kafka apply
compression as well slows down the system and also adds problems with GC
due to use of JNI in case LZ4 is applied. Setting
compression.type
to
producer
should be applied to these
queues.
Topic Management
It is possible to use Kafka in two modes; either LogScale manages its Kafka topics (this is the default), or it does not. If LogScale is managing, it will create topics if they do not exist, and will look at the topic configurations and manage those as well. If LogScale is not managing the Kafka topics, it will not create topics or change configurations; you must create and properly configure the topics listed in the Topics section in Kafka.
To disable LogScale's automatic management, and manage topics manually,
set the configuration flag KAFKA_MANAGED_BY_HUMIO
to
false
.
Topic Configuration
We recommend that the ingest queue is configured with a backstop limit on how much data it will retain. During normal operation, LogScale will delete data from the ingest queue once it is no longer needed, but there are error scenarios
where this might not happen. The most common examples we have seen of this not happening include:
Bucket Storage is configured, and LogScale is configured for
USING_EPHEMERAL_DISKS=true
, and the bucket storage solution is experiencing downtime. In this configuration, logs are retained in Kafka until they can be replicated to bucket storage.The LogScale cluster is partially down, such that ingest is still flowing into Kafka, but digest work is not being performed as the responsible nodes are not running. This can happen if most of the LogScale cluster is down, and may also happen in clusters making use of node roles if all nodes capable of digest work are down.
Since the result of data flowing into Kafka and not being deleted is that Kafka will fill its disk and eventually crash, which will pull down LogScale alongside it, we recommend setting a limit on retention in Kafka for the ingest queue such that the disk will never be completely filled.
We recommend setting the limit to be 90% of the disk size, divided by
the partition count for the queue. For example, for a setup where Kafka
is on a 1TB disk, and the LogScale ingest queue has 24 partitions, we
would recommend setting the limit to 1TB / 24 =
41666666700
bytes.
The limit is configured via Kafka's management tooling. In your Kafka
install directory, you should find a bin
directory.
Within that directory, run the command
$ ./kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name QUEUE_NAME_HERE
--add-config retention.bytes=YOUR_LIMIT_HERE
The queue name is determined by the KAFKA_QUEUE_PREFIX
setting, which is empty by default. For example, assuming a 1TB disk and
24 partitions, and KAFKA_QUEUE_PREFIX=test-
, you would
run
$ ./kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name test-humio-ingest
--add-config retention.bytes=41666666700
Kafka Client Properties
It is possible to add extra Kafka configuration properties to LogScale's
Kafka consumers and Kafka producers by pointing to a properties file
using EXTRA_KAFKA_CONFIGS_FILE
. For example, this enables
LogScale to connect to a Kafka cluster using SSL and SASL. Remember to
map the configuration file into the LogScale Docker container if running
LogScale in a Docker container.
Retention Settings
Show ingest queue configuration. This only shows properties set
specifically for the topic — not the default ones specified in
kafka.properties
<kafka_dir>/bin/kafka-configs.sh --zookeeper $ZOOKEEPER_HOST:2181 --entity-name humio-ingest --entity-type topics --describe
Set retention on the ingest queue to 7 days.
<kafka_dir>/bin/kafka-configs.sh --zookeeper $ZOOKEEPER_HOST:2181 --entity-name humio-ingest --entity-type topics --alter --add-config retention.ms=604800000
Set retention on the ingest queue to 1GB (per partition)
<kafka_dir>/bin/kafka-configs.sh --zookeeper $ZOOKEEPER_HOST:2181 --entity-name humio-ingest --entity-type topics --alter --add-config retention.bytes=1073741824
Note
The setting retention.bytes
is per partition. By default
LogScale has 24 partitions for ingest.
Broker Settings
If you use the Kafka brokers only for LogScale, you can configure the Kafka brokers to allow large messages on all topics. This example allows up to 100 MB in each message. Larger sizes require more memory for replication.
# max message size for all topics by default:
message.max.bytes=104857600
Sample Kafka Configuration
It is important to set log.dirs
to the location
where Kafka should store the data. Without such a setting, Kafka
defaults to /tmp/kafka-logs
, which is very likely
NOT where you want it.
Important
The log.dirs
parameter is the location of the
Kafka data, not execution logs.
When deploying Kafka using ZooKeeper using the Tarball installation you need to configure the two services with a unique host ID number:
For Kafka this is the
broker.id
configuration value in theserver.properties
file:ini# The id of the broker. This must be set to a unique integer for each broker. broker.id=1
For ZooKeeper this is a file called
myid
in the ZooKeeper data directory that contains the node ID number. You can create it using:shell$
echo 1 >/kafka/zookeeper/myid
When creating a multi-node Kafka cluster these numbers must be unique for each host:
Host |
Kafka broker.id
|
ZooKeeper myid
|
---|---|---|
kafka1 | 1 | 1 |
kafka2 | 2 | 2 |
kafka3 | 3 | 3 |
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://kafka1:9092
#use compression
compression.type=producer
############################# Log Basics #############################
# A comma-separated list of directories under which to store log files
log.dirs=/kafka/kafka
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=48
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1000073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
auto.create.topics.enable=false
unclean.leader.election.enable=false
############################# ZooKeeper #############################
zookeeper.connect=kafka1:2181
Sample ZooKeeper Configuration
The example below shows the typical
zookeeper.properties
configuration file for a
three-node ZooKeeper cluster:
dataDir=/kafka/zookeeper
clientPort=2181
clientPortAddress=kafka1
tickTime=2000
initLimit=5
syncLimit=2
autopurge.purgeInterval=1
admin.enableServer=false
4lw.commands.whitelist=*
server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888
Where:
dataDir=/kafka/zookeeper
The directory where the ZooKeeper snapshot data will be stored. Although ZooKeeper is an in-memory system, ZooKeeper periodically saves this data to a snapshot so that the data can be reloaded in the event of a failure or reboot. Do not place this information on a ephemeral disk or filesystem such as
/tmp
as these systems are cleaned during reboot.clientPort=2181
The client TCP/IP port to listen for connections.
clientPortAddress=kafka1
The address to listen on. This should be a public IP address accessible from the network to other hosts and not
localhost
tickTime=2000
The time for a tick, measured in milliseconds. In this case, a 2s tick.
initLimit=5
The timeout before ZooKeeper raises a warning during initialization if it can't reach other hosts. This is measured in the number of ticks, so in this case, 5 ticks at 2000 would be 10 seconds.
syncLimit=2
How far a node can be out of sync compared to other hosts before reporting an issue. Measure in the number of ticks; in this example 4 seconds.
autopurge.purgeInterval=1
Automatically removes old snapshots stored on disk.
admin.enableServer=false
Disables administration.
4lw.commands.whitelist=*
Enables four-letter word commands.
server.1=kafka1:2888:3888
The node ID, hostname and port numbers of each node within the ZooKeeper cluster. Note that the number after
server.
must match the number in themyid
file and thebroker.id
parameter in Kafka.
Tip
This file can be copied to all the nodes within your ZooKeeper cluster as the configuration should be the same on each host.
In addition to this file, the myid
within the
configured dataDir
(for example,
/kafka/zookeeper/myid
in the above configuration)
must contain the node ID of the current host. For example, if this host
is kafka1
then you would use the following command to
set the node id:
$ echo 1 >/kafka/zookeeper/myid