Kafka Connect Log Format

Kafka Connect is a framework for connecting Kafka with other systems such as LogScale. If you have your data in Kafka consider this approach for sending data to LogScale. We have worked with Confluent, achieved Gold Verification, and are now able to offer our new Kafka Connector which uses our fast and efficient HEC endpoint. You can get started by visiting the Confluent Hub page or the GitHub repository.

LogScale HEC Kafka Connector

This guide provides step-by-step guidance on how to build, integrate and operate the LogScale HEC connector within the Kafka platform.

The purpose of the LogScale HEC Sink connector is to read messages from a Kafka topic and submit them as events to the HTTP Event Collector (HEC) of a running LogScale system.

Installation

The LogScale HEC connector uses maven to build and test itself. The version of Kafka to build for is indicated in the pom.xml file by the line:

xml
<kafka.version>2.2.0</kafka.version>

Out of the box, Kafka 2.2.0 is supported. This can (and should) be changed to match your current Kafka or Confluent Platform version; to check which version this is, refer to the Confluent Platform Versions page.

Scripts are provided to automatically build and package the connector jar. bin/compile.sh automatically compiles and packages the connector, with the resulting "uber jar" located at target/kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar.

Alternatively, you can run:

logscale
mvn -DskipTests=true clean install
mvn -DskipTests=true assembly:assembly -DdescriptorId=jar-with-dependencies

Installing on plain Kafka

To install the connector for "plain Kafka", copy the uber jar kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar into the KAFKA_HOME/libs/ folder. Set your configuration properties in KAFKA_HOME/config/connect-distributed.properties and KAFKA_HOME/config/connect-standalone.properties.

Installing on the Confluent platform

To install the connector for the Confluent platform, build the uber jar and copy it into the proper directory

logscale
mkdir /CONFLUENT_HOME/share/java/kafka-connect-hec-sink
cp target/kafka-connect-hec-sink-1.0-SNAPSHOT-jar-with-dependencies.jar /CONFLUENT_HOME/share/java/kafka-connect-hec-sink/.

See the Install Connectors Confluent page for more information.

Creating a Confluent Hub Archive

This connector utilizes the kafka-connect-maven-plugin maven plugin to create a Confluent Hub compatible archive. Use mvn package to create the archive target/components/packages/humio-kafka-connect-hec-sink-1.0-SNAPSHOT.zip.

Configuration

For an example configuration using standalone mode, refer to config/HECSinkConnector.properties, and for distributed mode, refer to src/test/resources/config/json_sink_connector.json.

Configuration Properties

Property Description
humio.hec.url URL to the LogScale HEC endpoint, like http://machine:3000/api/v1/ingest/hec. This configuration element must be a non-empty string and is required.
humio.repo Name of the LogScale repo you wish to send data to, such as sandbox. This configuration element must be a non-empty string and is required.
humio.hec.ingest_token The ingest token as supplied by your LogScale installation, for example, 2dWGywwQOtnQIgrMfbH0CrHna7zcDPt8c41ccycfWcWG. You can find this value within the LogScale interface for a specific repo under Settings. This configuration element must be a non-empty string and is required.
humio.hec.buffer_size Maximum number of events to send per call to the HEC endpoint. This configuration element must be an integer greater than zero and is required.
humio.hec.fields.topic When set, defines the name of the field which will automatically be set to hold the Kafka topic name the event originated from. It may be useful to use a tag, like #httpd. This configuration element must be a non-empty string and is optional.
humio.hec.fields.partition When set, defines the name of the field which will automatically be set to the partition of the Kafka topic the event originated from. This configuration element must be a non-empty string and is optional.
humio.hec.fields.use_kafka_timestamp When true, will automatically set the time field of every event sent to the HEC endpoint to the Kafka message time value. This configuration element must be either true or false and is optional.
humio.hec.retry.max Maximum number of times a call to the HEC endpoint will be retried before failing (and throwing an exception). This configuration element is optional, with a default value of 10 retries.
humio.hec.retry.delay_sec Related to humio.hec.retry.max, retries use an exponential backoff strategy with an initial delay of humio.hec.retry.delay_sec seconds and is optional. Default value is 10 seconds.
humio.hec.ignore_parsing_errors When true, ignore parsing errors during operation as a result of transforming the message into a LogScale HEC message. If a message cannot be parsed, it will be skipped. Topic name, partition id, and Kafka offset are logged (DEBUG level). Also see metric com.humio.connect.hec.HECSinkTask.parsing-errors. Default value is false. NOTE: you may be more interested in the Kafka Connect configuration option errors.tolerance, see link below.
humio.hec.log_parsing_errors When true, log parsing errors (INFO level) as a result of transforming the message into a LogScale HEC message. Topic name, partition id, kafka offset, & native Kafka Connect sinkRecord and sinkRecord.value (as strings) are logged. Also see metric com.humio.connect.hec.HECSinkTask.parsing-errors. Default value is true. NOTE: you may be more interested in the Kafka Connect configuration option errors.log.enable & errors.log.include.messages, see link below.

NOTE: There are many more potentially useful Kafka Connect worker configuration options, with further information on handling errors (parsing and otherwise).

Data & Schema

Keys & Values

Kafka message keys are currently ignored. Values are converted to HEC-compatible JSON based on connector configuration (see below examples).

Schema Configuration

Connectors require key and value converters

ini
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

These determine the method by which the HEC Sink handles each message and are described below.

Converter Description
org.apache.kafka.connect.storage.StringConverter With StringConverter, messages are placed in the HEC event as a raw string if JSON parsing fails. If all messages on your Kafka topic(s) are JSON, you should use JsonConverter, as it will be more efficient.
org.apache.kafka.connect.json.JsonConverter With JsonConverter, messages are placed in the HEC event as the given JSON object without modification.
io.confluent.connect.avro.AvroConverter With AvroConverter, messages are converted to JSON and placed in the HEC event. Currently the connector handles SinkRecord records (Structs) with support for the following types: INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, ARRAY, MAP, & STRUCT, with full support for nested value structures. BYTES is not currently supported. We also do not currently support maps with non-string keys.

Note

Regarding schema evolution, because this connector converts any given Avro message to JSON on-the-fly (within the constraints of the Avro data types given above) for submission to the LogScale HEC endpoint, schema evolution is not an issue, and there are no restrictions other than being limited to the aforementioned supported Avro data types.

Kafka Connect Metrics

Metrics are exposed through JMX as well as dumped to standard out every 30 seconds.

More information about the method used to generate these metrics can be found here.

Counters

Name Description
com.humio.connect.hec.HECSinkTask.active-tasks Number of active sink tasks.
com.humio.connect.hec.HECSinkTask.flushes Number of flushes requested by Connect.
com.humio.connect.hec.HECSinkTask.parsing-errors Number of parsing errors. NOTE: these parse errors are in coercing the sink records into LogScale HEC records, not during initial Kafka ingest. See notes regarding parsing errors & logging in the configuration section above.
com.humio.connect.hec.HECSinkTask.put-records Number of records put.
com.humio.connect.hec.HECSinkTask.task-starts Number of connector task starts.
com.humio.connect.hec.HECSinkTask.task-stops Number of connector task stops.
com.humio.connect.hec.client.HECClientImpl.failed-hec-requests Failed HTTP requests to LogScale HEC endpoint.
com.humio.connect.hec.client.HECClientImpl.posted-records Number of records posted to LogScale HEC endpoint.
com.humio.connect.hec.service.HECServiceImpl.flush-lock-waits Number of times connector had to wait for a batch to post to LogScale as a result of a requested flush from Connect.

Histograms

Name Description
com.humio.connect.hec.HECSinkTask.put-batch-sizes Batch sizes received from Kafka.
com.humio.connect.hec.client.HECClientImpl.posted-batch-sizes Batch sizes submitted to LogScale HEC endpoint.

Timers

Name Description
com.humio.connect.hec.HECSinkTask.put-calls Full end-to-end processing time.
com.humio.connect.hec.client.HECClientImpl.hec-http-requests HEC endpoint POST time.

If the metric com.humio.connect.hec.HECSinkTask.put-batch-sizes reflects values consistently smaller than the configuration property humio.hec.buffer_size and you are sure there is sufficient throughput on the assigned topic partition to fill the buffer, check the Kafka configuration property max.poll.records (its default is 500), you may have to increase it.

HEC Event Field Support & Techniques

The LogScale HEC endpoint supports several more fields which are not explicitly handled by this connector. The techniques outlined below for each field may give you some ideas on how to use these fields by way of Kafka Connector's Single Message Transformations. Alternatively, these fields may also be set (or modified) by way of LogScale's Parsers.

SMT Rename Example: time

You can use the below technique to rename a field using SMT, for example, if your messages already have a timestamp:

ini
transforms=replacefield
transforms.replacefield.renames=other_time_field:time
transforms.replacefield.type=org.apache.kafka.connect.transforms.ReplaceField$Value

SMT Field Insertion Example: timezone, sourcetype, source and host

You can use the below technique to leverage SMT to insert a field with a static value. For example, if you wish to configure events to use a specific time zone, you can set a static value:

ini
transforms=insert_tz
transforms.insert_tz.static.field=timezone
transforms.insert_tz.static.value=Europe/Copenhagen
transforms.insert_tz.type=org.apache.kafka.connect.transforms.InsertField$Value

Testing

We have created a Docker Compose setup to streamline testing of the LogScale HEC sink connector. There are two ways you can use it: completely integrated with and managed by the test framework, or running the docker compose environment separately and running the supplied tests and testing the connector itself is standalone mode.

Unit Tests

Unit tests currently cover the internal Record object functionality, HECSinkConnectorConfig instantiation, as well as each of AvroConverter, JsonRawStringRecordConverter & JsonSchemalessRecordConverter for schema and data conversion functionality.

End-to-End Test

The end-to-end test performs the following steps (assuming managed docker):

  • extracts the host names and ports for all required services (Kafka Broker, Kafka Connect, Schema Registry, and LogScale) from the src/test/resources/docker-compose.yml Docker Compose file;

  • constructs the relevant URLs, property objects and other identifiers to start the system and perform the tests;

  • starts the Docker containers described in src/test/resources/docker-compose.yml and waits for them to successfully start;

  • extracts the LogScale ingest token from the humio-data directory mounted from within the running docker container;

  • registers (or updates) the sink connector with Kafka Connect;

  • pushes 10 unique messages to the configured Kafka topic, flushes the Kafka producer, and waits for them to have been processed;

  • performs a count() query against the LogScale docker instance, verifying the correct number of messages has been received;

  • performs a check to ensure a unique token placed in the messages is present in the output of each message;

  • shuts down the docker containers.

If any of the above steps fail, the test as a whole fails.

Pre-Requisites

  • Docker

  • Maven

  • Java 8+

Distributed Testing with Managed Docker Compose

In this context, "managed Docker Compose" means the test framework handles the work of starting, managing, and stopping the Docker Compose element, as well as automatically configuring all dependencies based on data in src/test/resources/docker/docker-compose.yml (hostnames, ports, ingest tokens).

Unit & end-to-end integration test

  • Inspect src/test/resources/docker/docker-compose.yml and verify the port assignments are unused and available on your machine. If they are not, be sure to ensure any ports that were changed are reflected in references made by other services.

  • Inspect src/test/resources/config/json_sink_connector.json, editing configuration properties if necessary.

  • Execute bin/compile.sh if you have not already; this will build the "uber jar" required for the managed instance of Connect.

  • Execute mvn test to run the full test suite (including unit tests).

Results specific to distributed testing will be contained within the EndToEndJsonTest test output. For further insight as to its mechanics and what/how everything is being tested, refer to the source.

Standalone testing

In this context, the assumption will be that you are managing the docker compose element yourself, with the tests assuming it's running. This is generally only useful if you want to run the connector in standalone mode and generate some load with JsonDataPump, or test it by some other means to suit your needs.

Things to know:

  • Killing docker and restarting it "can" retain state, depending on how it was stopped. When in doubt, run bin/stop-docker-instance.sh, and restart it. This will ensure you start with a blank slate.

  • Before running the connector in standalone mode, but after docker has been started, run the utility bin/get-ingest-token.sh. Ouptut will look similar to this:

logscale
$ bin/get-ingest-token.sh
defaulting to humio index (repo) sandbox
extracting sandbox ingest key from global data snapshot...
located sandbox key: sandbox_e5W4sRju9jCXqMsEULfKvZnc
LaBtqQmFXSOrKhG4HuYyk4JZiov2BGhuyB2GitW6dgNi

The last line of the output (e.g. LaBtqQmFXSOrKhG4HuYyk4JZiov2BGhuyB2GitW6dgNi) should be placed in the file config/HECSinkConnector.properties as the value for the humio.hec.ingest_token property. If you see errors here, it probably cannot find data in ./humio-data, which is mounted from the humio service running in docker; stop and restart the docker services with the scripts provided in bin (see things to know above).

Unit and end-to-end integration test:

  • Start docker with bin/start-docker-instance.sh.

  • Execute tests with the environment variable MANAGE_DOCKER=false, e.g.

logscale
$ MANAGE_DOCKER=false mvn test

Stop docker with bin/stop-docker-instance.sh.

Testing the HEC connector with JsonDataPump:

  • Start docker with bin/start-docker-instance.sh.

  • Inspect config/HECStandaloneSinkConnector.properties, editing port assignments if necessary.

  • Inspect connector properties in config/HECSinkConnector.properties. If you have not updated the ingest token (see things to know above), do so now.

  • Inspect the standalone worker properties in config/HECStandaloneSinkConnector.properties, ensuring the Kafka broker port is correct (if you haven't edited the docker-compose.yml file, you're good to go), and ensure that the rest.port port is not already in use on your machine.

  • Start the connector in standalone mode with bin/start-connector-standalone.sh.

  • Generate messages for the configured topic (e.g. hectopic, the value in the configuration properties if you've not changed it) by running

    shell
    bin/json-data-pump.sh hectopic

    Note

    This utility will run until killed!

  • Stop docker with bin/stop-docker-instance.sh.

For more information see Real-Time Streaming Log Management for Apache Kafka with LogScale