Kafka Connect Log Format
Kafka Connect is a framework for connecting Kafka with other systems such as LogScale. If you have your data in Kafka consider this approach for sending data to LogScale. We have worked with Confluent, achieved Gold Verification, and are now able to offer our new Kafka Connector which uses our fast and efficient HEC endpoint. You can get started by visiting the Confluent Hub page or the GitHub repository.
LogScale HEC Kafka Connector
This guide provides step-by-step guidance on how to build, integrate and operate the LogScale HEC connector within the Kafka platform.
The purpose of the LogScale HEC Sink connector is to read messages from a Kafka topic and submit them as events to the HTTP Event Collector (HEC) of a running LogScale system.
The LogScale HEC connector uses maven to build and test itself. The
version of Kafka to build for is indicated in the
pom.xml file by the line:
Out of the box, Kafka 2.2.0 is supported. This can (and should) be changed to match your current Kafka or Confluent Platform version; to check which version this is, refer to the Confluent Platform Versions page.
Scripts are provided to automatically build and package the connector
compiles and packages the connector, with the resulting "uber jar"
Alternatively, you can run:
mvn -DskipTests=true clean install mvn -DskipTests=true assembly:assembly -DdescriptorId=jar-with-dependencies
Installing on plain Kafka
To install the connector for "plain Kafka", copy the uber jar
kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar into the
KAFKA_HOME/libs/ folder. Set your configuration properties in
Installing on the Confluent platform
To install the connector for the Confluent platform, build the uber jar and copy it into the proper directory
mkdir /CONFLUENT_HOME/share/java/kafka-connect-hec-sink cp target/kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar /CONFLUENT_HOME/share/java/kafka-connect-hec-sink/.
See the Install Connectors Confluent page for more information.
Creating a Confluent Hub Archive
This connector utilizes the
plugin to create a Confluent Hub compatible archive. Use
mvn package to create the archive
For an example configuration using standalone mode, refer to
for distributed mode, refer to
URL to the LogScale HEC endpoint, like
||Name of the LogScale repo you wish to send data to, such as sandbox. This configuration element must be a non-empty string and is required.|
The ingest token as supplied by your LogScale
installation, for example,
||Maximum number of events to send per call to the HEC endpoint. This configuration element must be an integer greater than zero and is required.|
||When set, defines the name of the field which will automatically be set to hold the Kafka topic name the event originated from. It may be useful to use a tag, like #httpd. This configuration element must be a non-empty string and is optional.|
||When set, defines the name of the field which will automatically be set to the partition of the Kafka topic the event originated from. This configuration element must be a non-empty string and is optional.|
||Maximum number of times a call to the HEC endpoint will be retried before failing (and throwing an exception). This configuration element is optional, with a default value of 10 retries.|
NOTE: There are many more potentially useful Kafka Connect worker configuration options, with further information on handling errors (parsing and otherwise).
Data & Schema
Keys & Values
keys are currently
ignored. Values are converted to HEC-compatible JSON based on
connector configuration (see below examples).
Connectors require key and value converters
These determine the method by which the HEC Sink handles each message and are described below.
Regarding schema evolution, because this connector converts any given Avro message to JSON on-the-fly (within the constraints of the Avro data types given above) for submission to the LogScale HEC endpoint, schema evolution is not an issue, and there are no restrictions other than being limited to the aforementioned supported Avro data types.
Metrics are exposed through JMX as well as dumped to standard out every 30 seconds.
More information about the method used to generate these metrics can be found here.
||Number of active sink tasks.|
||Number of flushes requested by Connect.|
||Number of parsing errors. NOTE: these parse errors are in coercing the sink records into LogScale HEC records, not during initial Kafka ingest. See notes regarding parsing errors & logging in the configuration section above.|
||Number of records put.|
||Number of connector task starts.|
||Number of connector task stops.|
||Failed HTTP requests to LogScale HEC endpoint.|
||Number of records posted to LogScale HEC endpoint.|
||Number of times connector had to wait for a batch to post to LogScale as a result of a requested flush from Connect.|
||Batch sizes received from Kafka.|
||Batch sizes submitted to LogScale HEC endpoint.|
||Full end-to-end processing time.|
||HEC endpoint POST time.|
If the metric
reflects values consistently smaller than the configuration property
humio.hec.buffer_size and you are sure
there is sufficient throughput on the assigned topic partition to fill
the buffer, check the Kafka configuration property
max.poll.records (its default is 500),
you may have to increase it.
HEC Event Field Support & Techniques
The LogScale HEC endpoint supports several more fields which are not explicitly handled by this connector. The techniques outlined below for each field may give you some ideas on how to use these fields by way of Kafka Connector's Single Message Transformations. Alternatively, these fields may also be set (or modified) by way of LogScale's Parsers.
SMT Rename Example:
You can use the below technique to rename a field using SMT, for example, if your messages already have a timestamp:
transforms=replacefield transforms.replacefield.renames=other_time_field:time transforms.replacefield.type=org.apache.kafka.connect.transforms.ReplaceField$Value
SMT Field Insertion Example:
You can use the below technique to leverage SMT to insert a field with a static value. For example, if you wish to configure events to use a specific time zone, you can set a static value:
transforms=insert_tz transforms.insert_tz.static.field=timezone transforms.insert_tz.static.value=Europe/Copenhagen transforms.insert_tz.type=org.apache.kafka.connect.transforms.InsertField$Value
We have created a Docker Compose setup to streamline testing of the LogScale HEC sink connector. There are two ways you can use it: completely integrated with and managed by the test framework, or running the docker compose environment separately and running the supplied tests and testing the connector itself is standalone mode.
Unit tests currently cover the internal
Record object functionality,
as well as each of
schema and data conversion functionality.
The end-to-end test performs the following steps (assuming managed docker):
extracts the host names and ports for all required services (Kafka Broker, Kafka Connect, Schema Registry, and LogScale) from the
src/test/resources/docker-compose.ymlDocker Compose file;
constructs the relevant URLs, property objects and other identifiers to start the system and perform the tests;
starts the Docker containers described in
src/test/resources/docker-compose.ymland waits for them to successfully start;
extracts the LogScale ingest token from the
humio-datadirectory mounted from within the running docker container;
registers (or updates) the sink connector with Kafka Connect;
pushes 10 unique messages to the configured Kafka topic, flushes the Kafka producer, and waits for them to have been processed;
performs a count() query against the LogScale docker instance, verifying the correct number of messages has been received;
performs a check to ensure a unique token placed in the messages is present in the output of each message;
shuts down the docker containers.
If any of the above steps fail, the test as a whole fails.
Distributed Testing with Managed Docker Compose
In this context, "managed Docker Compose" means the test framework
handles the work of starting, managing, and stopping the Docker
Compose element, as well as automatically configuring all dependencies
based on data in
(hostnames, ports, ingest tokens).
Unit & end-to-end integration test
src/test/resources/docker/docker-compose.ymland verify the port assignments are unused and available on your machine. If they are not, be sure to ensure any ports that were changed are reflected in references made by other services.
src/test/resources/config/json_sink_connector.json, editing configuration properties if necessary.
bin/compile.shif you have not already; this will build the "uber jar" required for the managed instance of Connect.
mvn testto run the full test suite (including unit tests).
Results specific to distributed testing will be contained within the
EndToEndJsonTest test output. For
further insight as to its mechanics and what/how everything is being
tested, refer to
In this context, the assumption will be that you are managing the
docker compose element yourself, with the tests assuming it's running.
This is generally only useful if you want to run the connector in
standalone mode and generate some load with
JsonDataPump, or test it by some other
means to suit your needs.
Things to know:
Killing docker and restarting it "can" retain state, depending on how it was stopped. When in doubt, run bin/stop-docker-instance.sh, and restart it. This will ensure you start with a blank slate.
Before running the connector in standalone mode, but after docker has been started, run the utility
bin/get-ingest-token.sh. Ouptut will look similar to this:
$ bin/get-ingest-token.sh defaulting to humio index (repo) sandbox extracting sandbox ingest key from global data snapshot... located sandbox key: sandbox_e5W4sRju9jCXqMsEULfKvZnc LaBtqQmFXSOrKhG4HuYyk4JZiov2BGhuyB2GitW6dgNi
The last line of the output (e.g.,
should be placed in the file
the value for the
humio.hec.ingest_token property. If
you see errors here, it probably cannot find data in
./humio-data, which is mounted from
the humio service running in docker; stop and restart the docker
services with the scripts provided in
bin (see things to know above).
Unit and end-to-end integration test:
Start docker with
Execute tests with the environment variable
$ MANAGE_DOCKER=false mvn test
Stop docker with bin/stop-docker-instance.sh.
Testing the HEC connector with JsonDataPump:
Start docker with bin/start-docker-instance.sh.
config/HECStandaloneSinkConnector.properties, editing port assignments if necessary.
Inspect connector properties in
config/HECSinkConnector.properties. If you have not updated the ingest token (see things to know above), do so now.
Inspect the standalone worker properties in
config/HECStandaloneSinkConnector.properties, ensuring the Kafka broker port is correct (if you haven't edited the
docker-compose.ymlfile, you're good to go), and ensure that the
rest.portport is not already in use on your machine.
Start the connector in standalone mode with bin/start-connector-standalone.sh.
Generate messages for the configured topic (e.g., hectopic, the value in the configuration properties if you've not changed it) by runningshell
This utility will run until killed!
Stop docker with bin/stop-docker-instance.sh.
For more information see Real-Time Streaming Log Management for Apache Kafka with Humio