Kafka Connect Log Format
Kafka Connect is a framework for connecting Kafka with other systems such as LogScale. If you have your data in Kafka consider this approach for sending data to LogScale. We have worked with Confluent, achieved Gold Verification, and are now able to offer our new Kafka Connector which uses our fast and efficient HEC endpoint. You can get started by visiting the Confluent Hub page or the GitHub repository.
LogScale HEC Kafka Connector
This guide provides step-by-step guidance on how to build, integrate and operate the LogScale HEC connector within the Kafka platform.
The purpose of the LogScale HEC Sink connector is to read messages from a Kafka topic and submit them as events to the Ingesting with HTTP Event Collector (HEC) of a running LogScale system.
Installation
The LogScale HEC connector uses maven to build and test itself. The
version of Kafka to build for is indicated in the
pom.xml
file by the line:
<kafka.version>2.2.0</kafka.version>
Out of the box, Kafka 2.2.0 is supported. This can (and should) be changed to match your current Kafka or Confluent Platform version; to check which version this is, refer to the Confluent Platform Versions page.
Scripts are provided to automatically build and package the connector jar.
bin/compile.sh
automatically compiles and
packages the connector, with the resulting "uber jar" located at
target/kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar
.
Alternatively, you can run:
mvn -DskipTests=true clean install
mvn -DskipTests=true assembly:assembly -DdescriptorId=jar-with-dependencies
Installing on plain Kafka
To install the connector for "plain Kafka", copy the uber jar
kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar into the
KAFKA_HOME/libs/ folder. Set your configuration properties in
KAFKA_HOME/config/connect-distributed.properties
and
KAFKA_HOME/config/connect-standalone.properties
.
Installing on the Confluent platform
To install the connector for the Confluent platform, build the uber jar and copy it into the proper directory
mkdir /CONFLUENT_HOME/share/java/kafka-connect-hec-sink
cp target/kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar /CONFLUENT_HOME/share/java/kafka-connect-hec-sink/.
See the Install Connectors Confluent page for more information.
Creating a Confluent Hub Archive
This connector utilizes the
kafka-connect-maven-plugin
maven plugin
to create a Confluent Hub compatible archive. Use
mvn package
to create the archive
target/components/packages/humio-kafka-connect-hec-sink-1.0-SNAPSHOT.zip
.
Configuration
For an example configuration using standalone mode, refer to
config/HECSinkConnector.properties
, and
for distributed mode, refer to
src/test/resources/config/json_sink_connector.json
.
Configuration Properties
Property | Description |
---|---|
humio.hec.url
|
URL to the LogScale HEC endpoint, like
http://machine:3000/api/v1/ingest/hec . This
configuration element must be a non-empty string and is
required.
|
humio.repo
| Name of the LogScale repo you wish to send data to, such as sandbox. This configuration element must be a non-empty string and is required. |
humio.hec.ingest_token
|
The ingest token as supplied by your LogScale
installation, for example,
2dWGywwQOtncDPt8c41ccycfWcWG .
You can find this value within the LogScale interface for a
specific repo under Settings. This configuration element must be
a non-empty string and is required.
|
humio.hec.buffer_size
| Maximum number of events to send per call to the HEC endpoint. This configuration element must be an integer greater than zero and is required. |
humio.hec.fields.topic
| When set, defines the name of the field which will automatically be set to hold the Kafka topic name the event originated from. It may be useful to use a tag, like #httpd. This configuration element must be a non-empty string and is optional. |
humio.hec.fields.partition
| When set, defines the name of the field which will automatically be set to the partition of the Kafka topic the event originated from. This configuration element must be a non-empty string and is optional. |
humio.hec.fields.use_kafka_timestamp
|
When true , will automatically
set the time field of every
event sent to the HEC endpoint to the Kafka message time value.
This configuration element must be either
true or
false and is optional.
|
humio.hec.retry.max
| Maximum number of times a call to the HEC endpoint will be retried before failing (and throwing an exception). This configuration element is optional, with a default value of 10 retries. |
humio.hec.retry.delay_sec
|
Related to humio.hec.retry.max ,
retries use an exponential backoff strategy with an initial
delay of
humio.hec.retry.delay_sec
seconds and is optional. Default value is 10 seconds.
|
humio.hec.ignore_parsing_errors
|
When true , ignore parsing errors
during operation as a result of transforming the message into a
LogScale HEC message. If a message cannot be parsed, it will be
skipped. Topic name, partition id, and Kafka offset are logged
(DEBUG level). Also see metric
com.humio.connect.hec.HECSinkTask.parsing-errors .
Default value is false . NOTE:
you may be more interested in the Kafka Connect configuration
option errors.tolerance , see
link below.
|
humio.hec.log_parsing_errors
|
When true , log parsing errors
(INFO level) as a result of
transforming the message into a LogScale HEC message. Topic
name, partition id, kafka offset, & native Kafka Connect
sinkRecord and
sinkRecord.value (as strings)
are logged. Also see metric
com.humio.connect.hec.HECSinkTask.parsing-errors .
Default value is true . NOTE: you
may be more interested in the Kafka Connect configuration option
errors.log.enable &
errors.log.include.messages , see
link below.
|
NOTE: There are many more potentially useful Kafka Connect worker configuration options, with further information on handling errors (parsing and otherwise).
Data & Schema
Keys & Values
Kafka message keys
are currently
ignored. Values are converted to HEC-compatible JSON based on connector
configuration (see below examples).
Schema Configuration
Connectors require key and value converters
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
These determine the method by which the HEC Sink handles each message and are described below.
Converter | Description |
---|---|
org.apache.kafka.connect.storage.StringConverter
|
With StringConverter , messages
are placed in the HEC event as a raw string if JSON parsing
fails. If all messages on your Kafka topic(s) are JSON, you
should use JsonConverter , as it
will be more efficient.
|
org.apache.kafka.connect.json.JsonConverter
|
With JsonConverter , messages are
placed in the HEC event as the given JSON object without
modification.
|
io.confluent.connect.avro.AvroConverter
|
With AvroConverter , messages are
converted to JSON and placed in the HEC event. Currently the
connector handles SinkRecord
records (Structs) with support for the following types:
INT8 ,
INT16 ,
INT32 ,
INT64 ,
FLOAT32 ,
FLOAT64 ,
BOOLEAN ,
STRING ,
ARRAY ,
MAP , &
STRUCT , with full support for
nested value structures. BYTES
is not currently supported. We also do not currently support
maps with non-string keys.
|
Note
Regarding schema evolution, because this connector converts any given Avro message to JSON on-the-fly (within the constraints of the Avro data types given above) for submission to the LogScale HEC endpoint, schema evolution is not an issue, and there are no restrictions other than being limited to the aforementioned supported Avro data types.
Kafka Connect Metrics
Metrics are exposed through JMX as well as dumped to standard out every 30 seconds.
More information about the method used to generate these metrics can be found here.
Counters
Name | Description |
---|---|
com.humio.connect.hec.HECSinkTask.active-tasks
| Number of active sink tasks. |
com.humio.connect.hec.HECSinkTask.flushes
| Number of flushes requested by Connect. |
com.humio.connect.hec.HECSinkTask.parsing-errors
| Number of parsing errors. NOTE: these parse errors are in coercing the sink records into LogScale HEC records, not during initial Kafka ingest. See notes regarding parsing errors & logging in the configuration section above. |
com.humio.connect.hec.HECSinkTask.put-records
| Number of records put. |
com.humio.connect.hec.HECSinkTask.task-starts
| Number of connector task starts. |
com.humio.connect.hec.HECSinkTask.task-stops
| Number of connector task stops. |
com.humio.connect.hec.client.HECClientImpl.failed-hec-requests
| Failed HTTP requests to LogScale HEC endpoint. |
com.humio.connect.hec.client.HECClientImpl.posted-records
| Number of records posted to LogScale HEC endpoint. |
com.humio.connect.hec.service.HECServiceImpl.flush-lock-waits
| Number of times connector had to wait for a batch to post to LogScale as a result of a requested flush from Connect. |
Histograms
Name | Description |
---|---|
com.humio.connect.hec.HECSinkTask.put-batch-sizes
| Batch sizes received from Kafka. |
com.humio.connect.hec.client.HECClientImpl.posted-batch-sizes
| Batch sizes submitted to LogScale HEC endpoint. |
Timers
Name | Description |
---|---|
com.humio.connect.hec.HECSinkTask.put-calls
| Full end-to-end processing time. |
com.humio.connect.hec.client.HECClientImpl.hec-http-requests
| HEC endpoint POST time. |
If the metric
com.humio.connect.hec.HECSinkTask.put-batch-sizes
reflects values consistently smaller than the configuration property
humio.hec.buffer_size
and you are sure
there is sufficient throughput on the assigned topic partition to fill
the buffer, check the Kafka configuration property
max.poll.records
(its default is 500),
you may have to increase it.
HEC Event Field Support & Techniques
The LogScale HEC endpoint supports several more fields which are not explicitly handled by this connector. The techniques outlined below for each field may give you some ideas on how to use these fields by way of Kafka Connector's Single Message Transformations. Alternatively, these fields may also be set (or modified) by way of LogScale's Parsers.
SMT Rename Example: time
You can use the below technique to rename a field using SMT, for example, if your messages already have a timestamp:
transforms=replacefield
transforms.replacefield.renames=other_time_field:time
transforms.replacefield.type=org.apache.kafka.connect.transforms.ReplaceField$Value
SMT Field Insertion Example: timezone
,
sourcetype
,
source
and
host
You can use the below technique to leverage SMT to insert a field with a static value. For example, if you wish to configure events to use a specific time zone, you can set a static value:
transforms=insert_tz
transforms.insert_tz.static.field=timezone
transforms.insert_tz.static.value=Europe/Copenhagen
transforms.insert_tz.type=org.apache.kafka.connect.transforms.InsertField$Value
Testing
We have created a Docker Compose setup to streamline testing of the LogScale HEC sink connector. There are two ways you can use it: completely integrated with and managed by the test framework, or running the docker compose environment separately and running the supplied tests and testing the connector itself is standalone mode.
Unit Tests
Unit tests currently cover the internal
Record
object functionality,
HECSinkConnectorConfig
instantiation, as
well as each of AvroConverter
,
JsonRawStringRecordConverter
&
JsonSchemalessRecordConverter
for schema
and data conversion functionality.
End-to-End Test
The end-to-end test performs the following steps (assuming managed docker):
extracts the host names and ports for all required services (Kafka Broker, Kafka Connect, Schema Registry, and LogScale) from the
src/test/resources/docker-compose.yml
Docker Compose file;constructs the relevant URLs, property objects and other identifiers to start the system and perform the tests;
starts the Docker containers described in
src/test/resources/docker-compose.yml
and waits for them to successfully start;extracts the LogScale ingest token from the
humio-data
directory mounted from within the running docker container;registers (or updates) the sink connector with Kafka Connect;
pushes 10 unique messages to the configured Kafka topic, flushes the Kafka producer, and waits for them to have been processed;
performs a count() query against the LogScale docker instance, verifying the correct number of messages has been received;
performs a check to ensure a unique token placed in the messages is present in the output of each message;
shuts down the docker containers.
If any of the above steps fail, the test as a whole fails.
Pre-Requisites
Docker
Maven
Java 8+
Distributed Testing with Managed Docker Compose
In this context, "managed Docker Compose" means the test framework
handles the work of starting, managing, and stopping the Docker Compose
element, as well as automatically configuring all dependencies based on
data in
src/test/resources/docker/docker-compose.yml
(hostnames, ports, ingest tokens).
Unit & end-to-end integration test
Inspect
src/test/resources/docker/docker-compose.yml
and verify the port assignments are unused and available on your machine. If they are not, be sure to ensure any ports that were changed are reflected in references made by other services.Inspect
src/test/resources/config/json_sink_connector.json
, editing configuration properties if necessary.Execute
bin/compile.sh
if you have not already; this will build the "uber jar" required for the managed instance of Connect.Execute
mvn test
to run the full test suite (including unit tests).
Results specific to distributed testing will be contained within the
EndToEndJsonTest
test output. For
further insight as to its mechanics and what/how everything is being
tested, refer to
the
source.
Standalone testing
In this context, the assumption will be that you are managing the docker
compose element yourself, with the tests assuming it's running. This is
generally only useful if you want to run the connector in standalone
mode and generate some load with
JsonDataPump
, or test it by some other
means to suit your needs.
Things to know:
Killing docker and restarting it "can" retain state, depending on how it was stopped. When in doubt, run bin/stop-docker-instance.sh, and restart it. This will ensure you start with a blank slate.
Before running the connector in standalone mode, but after docker has been started, run the utility
bin/get-ingest-token.sh
. Ouptut will look similar to this:
$ bin/get-ingest-token.sh
defaulting to humio index (repo) sandbox
extracting sandbox ingest key from global data snapshot...
located sandbox key: sandbox_e5W4sRju9jCXqMsEULfKvZnc
LaBtqQmFXSOrKhG4HuYyk4JZiov2BGhuyB2GitW6dgNi
The last line of the output (e.g.
LaBtqQmFXSOrKhG4HuYyk4JZiov2BGhuyB2GitW6dgNi
)
should be placed in the file
config/HECSinkConnector.properties
as
the value for the humio.hec.ingest_token
property. If you see errors here, it probably cannot find data in
./humio-data
, which is mounted from the
humio service running in docker; stop and restart the docker services
with the scripts provided in bin
(see
things to know above).
Unit and end-to-end integration test:
Start docker with
bin/start-docker-instance.sh
.Execute tests with the environment variable
MANAGE_DOCKER=false
, e.g.
$ MANAGE_DOCKER=false mvn test
Stop docker with bin/stop-docker-instance.sh.
Testing the HEC connector with JsonDataPump:
Start docker with bin/start-docker-instance.sh.
Inspect
config/HECStandaloneSinkConnector.properties
, editing port assignments if necessary.Inspect connector properties in
config/HECSinkConnector.properties
. If you have not updated the ingest token (see things to know above), do so now.Inspect the standalone worker properties in
config/HECStandaloneSinkConnector.properties
, ensuring the Kafka broker port is correct (if you haven't edited thedocker-compose.yml
file, you're good to go), and ensure that therest.port
port is not already in use on your machine.Start the connector in standalone mode with bin/start-connector-standalone.sh.
Generate messages for the configured topic (e.g. hectopic, the value in the configuration properties if you've not changed it) by running
shellbin/json-data-pump.sh hectopic
Note
This utility will run until killed!
Stop docker with bin/stop-docker-instance.sh.
For more information see Real-Time Streaming Log Management for Apache Kafka with LogScale