Kafka Configuration

LogScale uses Apache Kafka internally for queuing incoming messages and for storing shared state when running LogScale in a cluster setup. This page describes how to configure Kafka.

LogScale has built-in API endpoints for controlling Kafka. Using the API it is possible to specify partition size and replication factor on the ingest queue. It's also possible to use other Kafka tools, such as the command-line tools included in the Kafka distribution.

Note

Make sure to not apply compression inside Kafka to the queues below. LogScale compresses the messages when relevant. Letting Kafka apply compression as well slows down the system and also adds problems with GC due to use of JNI in case LZ4 is applied. Setting compression.type to producer should be applied to these queues.

Topic Management

It is possible to use Kafka in two modes; either LogScale manages its Kafka topics (this is the default), or it does not. If LogScale is managing, it will create topics if they do not exist, and will look at the topic configurations and manage those as well. If LogScale is not managing the Kafka topics, it will not create topics or change configurations; you must create and properly configure the topics listed in the Topics section in Kafka.

To disable LogScale's automatic management, and manage topics manually, set the configuration flag KAFKA_MANAGED_BY_HUMIO to false.

Topic Configuration

We recommend that the ingest queue is configured with a backstop limit on how much data it will retain. During normal operation, LogScale will delete data from the ingest queue once it is no longer needed, but there are error scenarios

where this might not happen. The most common examples we have seen of this not happening include:

  • Bucket Storage is configured, and LogScale is configured for USING_EPHEMERAL_DISKS=true, and the bucket storage solution is experiencing downtime. In this configuration, logs are retained in Kafka until they can be replicated to bucket storage.

  • The LogScale cluster is partially down, such that ingest is still flowing into Kafka, but digest work is not being performed as the responsible nodes are not running. This can happen if most of the LogScale cluster is down, and may also happen in clusters making use of node roles if all nodes capable of digest work are down.

Since the result of data flowing into Kafka and not being deleted is that Kafka will fill its disk and eventually crash, which will pull down LogScale alongside it, we recommend setting a limit on retention in Kafka for the ingest queue such that the disk will never be completely filled.

We recommend setting the limit to be 90% of the disk size, divided by the partition count for the queue. For example, for a setup where Kafka is on a 1TB disk, and the LogScale ingest queue has 24 partitions, we would recommend setting the limit to 1TB / 24 = 41666666700 bytes.

The limit is configured via Kafka's management tooling. In your Kafka install directory, you should find a bin directory. Within that directory, run the command

$ ./kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name QUEUE_NAME_HERE --add-config retention.bytes=YOUR_LIMIT_HERE

The queue name is determined by the KAFKA_QUEUE_PREFIX setting, which is empty by default. For example, assuming a 1TB disk and 24 partitions, and KAFKA_QUEUE_PREFIX=test-, you would run

$ ./kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name test-humio-ingest --add-config retention.bytes=41666666700

Kafka Client Properties

It is possible to add extra Kafka configuration properties to LogScale's Kafka consumers and Kafka producers by pointing to a properties file using EXTRA_KAFKA_CONFIGS_FILE. For example, this enables LogScale to connect to a Kafka cluster using SSL and SASL. Remember to map the configuration file into the LogScale Docker container if running LogScale in a Docker container.

Retention Settings

Show ingest queue configuration. This only shows properties set specifically for the topic — not the default ones specified in kafka.properties

logscale
<kafka_dir>/bin/kafka-configs.sh --zookeeper $ZOOKEEPER_HOST:2181 --entity-name humio-ingest --entity-type topics --describe

Set retention on the ingest queue to 7 days.

logscale
<kafka_dir>/bin/kafka-configs.sh --zookeeper $ZOOKEEPER_HOST:2181 --entity-name humio-ingest --entity-type topics --alter --add-config retention.ms=604800000

Set retention on the ingest queue to 1GB (per partition)

logscale
<kafka_dir>/bin/kafka-configs.sh --zookeeper $ZOOKEEPER_HOST:2181 --entity-name humio-ingest --entity-type topics --alter --add-config retention.bytes=1073741824

Note

The setting retention.bytes is per partition. By default LogScale has 24 partitions for ingest.

Broker Settings

If you use the Kafka brokers only for LogScale, you can configure the Kafka brokers to allow large messages on all topics. This example allows up to 100 MB in each message. Larger sizes require more memory for replication.

default
# max message size for all topics by default:
message.max.bytes=104857600

Sample Kafka Configuration

It is important to set log.dirs to the location where Kafka should store the data. Without such a setting, Kafka defaults to /tmp/kafka-logs, which is very likely NOT where you want it.

Important

The log.dirs parameter is the location of the Kafka data, not execution logs.

When deploying Kafka using ZooKeeper using the Tarball installation you need to configure the two services with a unique host ID number:

  • For Kafka this is the broker.id configuration value in the server.properties file:

    ini
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=1
  • For ZooKeeper this is a file called myid in the ZooKeeper data directory that contains the node ID number. You can create it using:

    shell
    $ echo 1 >/kafka/zookeeper/myid

When creating a multi-node Kafka cluster these numbers must be unique for each host:

Host Kafka broker.id ZooKeeper myid
kafka1 1 1
kafka2 2 2
kafka3 3 3
ini
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

listeners=PLAINTEXT://kafka1:9092
#use compression
compression.type=producer

############################# Log Basics #############################

# A comma-separated list of directories under which to store log files
log.dirs=/kafka/kafka

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=48

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1000073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
auto.create.topics.enable=false
unclean.leader.election.enable=false

############################# ZooKeeper #############################
zookeeper.connect=kafka1:2181

Sample ZooKeeper Configuration

The example below shows the typical zookeeper.properties configuration file for a three-node ZooKeeper cluster:

ini
dataDir=/kafka/zookeeper
clientPort=2181
clientPortAddress=kafka1
tickTime=2000
initLimit=5
syncLimit=2
autopurge.purgeInterval=1
admin.enableServer=false
4lw.commands.whitelist=*
server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888

Where:

  • dataDir=/kafka/zookeeper

    The directory where the ZooKeeper snapshot data will be stored. Although ZooKeeper is an in-memory system, ZooKeeper periodically saves this data to a snapshot so that the data can be reloaded in the event of a failure or reboot. Do not place this information on a ephemeral disk or filesystem such as /tmp as these systems are cleaned during reboot.

  • clientPort=2181

    The client TCP/IP port to listen for connections.

  • clientPortAddress=kafka1

    The address to listen on. This should be a public IP address accessible from the network to other hosts and not localhost

  • tickTime=2000

    The time for a tick, measured in milliseconds. In this case, a 2s tick.

  • initLimit=5

    The timeout before ZooKeeper raises a warning during initialization if it can't reach other hosts. This is measured in the number of ticks, so in this case, 5 ticks at 2000 would be 10 seconds.

  • syncLimit=2

    How far a node can be out of sync compared to other hosts before reporting an issue. Measure in the number of ticks; in this example 4 seconds.

  • autopurge.purgeInterval=1

    Automatically removes old snapshots stored on disk.

  • admin.enableServer=false

    Disables administration.

  • 4lw.commands.whitelist=*

    Enables four-letter word commands.

  • server.1=kafka1:2888:3888

    The node ID, hostname and port numbers of each node within the ZooKeeper cluster. Note that the number after server. must match the number in the myid file and the broker.id parameter in Kafka.

Tip

This file can be copied to all the nodes within your ZooKeeper cluster as the configuration should be the same on each host.

In addition to this file, the myid within the configured dataDir (for example, /kafka/zookeeper/myid in the above configuration) must contain the node ID of the current host. For example, if this host is kafka1 then you would use the following command to set the node id:

shell
$ echo 1 >/kafka/zookeeper/myid