Deploying a Kafka Cluster using Containers
There are many different Kafka distributions available, for example Strimzi or Apache
The instructions below use Strimzi and are provided as a simplified guide, not as configuration for a production system.
Important
Kafka and LogScale should be run on different instances and hardware. This will help ensure performance for both systems when ingesting data.
For production, a valid Kafka cluster should have at least six instances (pods/nodes), consisting of three brokers and three controllers. There also needs to be a minimum of three availability zones for an HA cluster. Additionally, there must be low latency (<300ms) between pods.
When configuring Kafka, each Kafka host must have its own unique node number. For example, in a three node cluster:
Host | Kafka ID |
---|---|
kafka1 | 1 |
kafka2 | 2 |
kafka3 | 3 |
When using Kafka in KRaft mode, you need to configure:
A node.id
parameter, orKAFKA_NODE_ID
variable in Docker to set the node ID.A unique a cluster ID. In ZooKeeper mode (only for versions of LogScale before 1.108), Kafka uses ZooKeeper and the broker.id
parameter, orKAFKA_BROKER_ID
variable in Docker is used to set the unique broker ID.
When configuring each node, ensure that the listener host and port number is accessible to other hosts and that the LogScale instances can reach them over the network. The default Kafka ports should be open and accessible between docker images. If in doubt, please refer to the Kafka documentation.
Installation of Kafka/KRaft using Strimzi
This section shows you how to install Kafka in KRaft mode on Kubernetes using Strimzi.
Ensure the prerequisites are in place:
Functioning Kubernetes cluster
Helm installed
kubectl configured
Install the Strimzi operator:
shell# Add Strimzi Helm repository helm repo add strimzi https://strimzi.io/charts/ helm repo update # Install Strimzi operator helm install strimzi strimzi/strimzi-kafka-operator -n <namespace>
Create the Kafka cluster:
yamlapiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: logscale spec: kafka: version: 3.5.0 replicas: 3 # These are the broker nodes listeners: - name: plain port: 9092 type: internal tls: false storage: type: persistent-claim size: 100Gi config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 process.roles: "broker" # Broker-only role node.roles: "broker" # Broker-only role controller.quorum.voters: "0@logscale-kafka-controller-0.logscale-kafka-controller.namespace.svc:9093,1@logscale-kafka-controller-1.logscale-kafka-controller.namespace.svc:9093,2@logscale-kafka-controller-2.logscale-kafka-controller.namespace.svc:9093" controller.listener.names: "CONTROLLER" inter.broker.protocol.version: "3.5" listeners: - name: CONTROLLER port: 9093 type: internal tls: true # Separate controller configuration kafkaController: replicas: 3 # These are the controller nodes storage: type: persistent-claim size: 20Gi config: process.roles: "controller" # Controller-only role node.roles: "controller" # Controller-only role
Click to collapse:
yamlapiVersion: v1 items: - apiVersion: cert-manager.io/v1 kind: Certificate metadata: annotations: meta.helm.sh/release-name: shared-kafka meta.helm.sh/release-namespace: shared-kafka creationTimestamp: "2025-02-26T17:57:42Z" generation: 1 labels: app.kubernetes.io/managed-by: Helm helm.toolkit.fluxcd.io/name: shared-kafka helm.toolkit.fluxcd.io/namespace: shared-kafka name: shared namespace: shared-kafka resourceVersion: "42394" uid: ba84f7eb-4c41-40ca-afc6-65abec80c049 spec: dnsNames: - shared-kafka-bootstrap.shared-kafka.svc.cluster.local - shared-kafka-bootstrap.shared-kafka.svc - shared-kafka-bootstrap - '*.shared-kafka-brokers.shared-kafka.svc.cluster.local' - '*.shared-kafka-brokers.shared-kafka.svc' issuerRef: kind: ClusterIssuer name: cluster-ca-issuer secretName: shared-listener-internal-tls usages: - server auth - client auth status: conditions: - lastTransitionTime: "2025-02-26T17:57:42Z" message: Certificate is up to date and has not expired observedGeneration: 1 reason: Ready status: "True" type: Ready notAfter: "2025-05-27T17:57:42Z" notBefore: "2025-02-26T17:57:42Z" renewalTime: "2025-04-27T17:57:42Z" revision: 1 - apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: annotations: meta.helm.sh/release-name: shared-kafka meta.helm.sh/release-namespace: shared-kafka strimzi.io/kraft: enabled strimzi.io/node-pools: enabled creationTimestamp: "2025-02-26T17:57:42Z" generation: 1 labels: app.kubernetes.io/instance: shared-kafka app.kubernetes.io/managed-by: Helm app.kubernetes.io/name: kafka-strimzi-cluster app.kubernetes.io/version: 5.2.1 helm.sh/chart: kafka-strimzi-cluster-5.2.1 helm.toolkit.fluxcd.io/name: shared-kafka helm.toolkit.fluxcd.io/namespace: shared-kafka name: shared namespace: shared-kafka resourceVersion: "15763933" uid: 588593c3-a25d-406d-ae00-2c5a61290581 spec: cruiseControl: brokerCapacity: inboundNetwork: 15728640KiB/s outboundNetwork: 15728640KiB/s resources: limits: cpu: 4 memory: 2Gi requests: cpu: 100m memory: 512Mi template: pod: affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/os operator: In values: - linux entityOperator: topicOperator: resources: limits: cpu: 300m memory: 256Mi requests: cpu: 100m memory: 128Mi userOperator: resources: limits: cpu: 300m memory: 256Mi requests: cpu: 100m memory: 128Mi kafka: authorization: superUsers: - shared-kadmin type: simple config: auto.create.topics.enable: false background.threads: 20 default.replication.factor: 3 min.insync.replicas: 4 num.io.threads: 4 num.network.threads: 2 num.recovery.threads.per.data.dir: 4 offsets.topic.replication.factor: 3 replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector socket.receive.buffer.bytes: -1 socket.send.buffer.bytes: -1 transaction.state.log.min.isr: 2 transaction.state.log.replication.factor: 3 listeners: - authentication: type: scram-sha-512 configuration: brokerCertChainAndKey: certificate: tls.crt key: tls.key secretName: shared-listener-internal-tls name: tls port: 9093 tls: true type: internal rack: topologyKey: topology.kubernetes.io/zone status: clusterId: OJhdWX-TSL6IvXcFOCMBSQ conditions: - lastTransitionTime: "2025-03-11T14:07:59.469197581Z" status: "True" type: Ready kafkaMetadataState: KRaft kafkaMetadataVersion: 3.9-IV0 kafkaNodePools: - name: controller - name: kafka kafkaVersion: 3.9.0 listeners: - addresses: - host: shared-kafka-bootstrap.shared-kafka.svc port: 9093 bootstrapServers: shared-kafka-bootstrap.shared-kafka.svc:9093 certificates: - | -----BEGIN CERTIFICATE----- MII.../ElE= -----END CERTIFICATE----- name: tls observedGeneration: 1 operatorLastSuccessfulVersion: 0.45.0 registeredNodeIds: - 0 - 1 - 2 - 3 - 4 - 5 - 6 - apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaNodePool metadata: annotations: meta.helm.sh/release-name: shared-kafka meta.helm.sh/release-namespace: shared-kafka creationTimestamp: "2025-02-26T17:57:42Z" generation: 1 labels: app.kubernetes.io/instance: shared-kafka app.kubernetes.io/managed-by: Helm app.kubernetes.io/name: kafka-strimzi-cluster app.kubernetes.io/version: 5.2.1 helm.sh/chart: kafka-strimzi-cluster-5.2.1 helm.toolkit.fluxcd.io/name: shared-kafka helm.toolkit.fluxcd.io/namespace: shared-kafka strimzi.io/cluster: shared name: controller namespace: shared-kafka resourceVersion: "42490" uid: 5799431d-fdfa-43ba-b0b8-c8a6c63d84b8 spec: jvmOptions: -XX: ExplicitGCInvokesConcurrent: "true" InitiatingHeapOccupancyPercent: "35" MaxGCPauseMillis: "20" UseG1GC: "true" -Xms: 5g -Xmx: 5g replicas: 3 resources: limits: cpu: 1 memory: 8Gi requests: cpu: 100m memory: 4Gi roles: - controller storage: class: cluster-block-base-ext4 deleteClaim: true size: 64Gi type: persistent-claim template: pod: affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/os operator: In values: - linux - key: karpenter.k8s.aws/instance-local-nvme operator: DoesNotExist podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: strimzi.io/pool-name operator: In values: - controller topologyKey: kubernetes.io/hostname topologySpreadConstraints: - labelSelector: matchLabels: strimzi.io/pool-name: controller matchLabelKeys: - pod-template-hash maxSkew: 2 topologyKey: topology.kubernetes.io/zone whenUnsatisfiable: ScheduleAnyway status: clusterId: OJhdWX-TSL6IvXcFOCMBSQ conditions: [] labelSelector: strimzi.io/cluster=shared,strimzi.io/name=shared-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=controller nodeIds: - 0 - 1 - 2 observedGeneration: 1 replicas: 3 roles: - controller - apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaNodePool metadata: annotations: meta.helm.sh/release-name: shared-kafka meta.helm.sh/release-namespace: shared-kafka creationTimestamp: "2025-02-26T17:57:42Z" generation: 1 labels: app.kubernetes.io/instance: shared-kafka app.kubernetes.io/managed-by: Helm app.kubernetes.io/name: kafka-strimzi-cluster app.kubernetes.io/version: 5.2.1 helm.sh/chart: kafka-strimzi-cluster-5.2.1 helm.toolkit.fluxcd.io/name: shared-kafka helm.toolkit.fluxcd.io/namespace: shared-kafka strimzi.io/cluster: shared name: kafka namespace: shared-kafka resourceVersion: "42491" uid: c3317442-a761-485c-a7dc-21bbe14ac5b4 spec: jvmOptions: -XX: ExplicitGCInvokesConcurrent: "true" InitiatingHeapOccupancyPercent: "35" MaxGCPauseMillis: "20" UseG1GC: "true" -Xms: 5g -Xmx: 5g replicas: 4 resources: limits: cpu: 4 memory: 12Gi requests: cpu: 4 memory: 12Gi roles: - broker storage: type: jbod volumes: - class: cluster-block-base-ext4 deleteClaim: true id: 0 kraftMetadata: shared size: 1Ti type: persistent-claim - class: cluster-block-base-ext4 deleteClaim: true id: 1 size: 1Ti type: persistent-claim - class: cluster-block-base-ext4 deleteClaim: true id: 2 size: 1Ti type: persistent-claim - class: cluster-block-base-ext4 deleteClaim: true id: 3 size: 1Ti type: persistent-claim template: pod: affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/os operator: In values: - linux - key: karpenter.k8s.aws/instance-local-nvme operator: DoesNotExist podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: strimzi.io/pool-name operator: In values: - kafka topologyKey: kubernetes.io/hostname topologySpreadConstraints: - labelSelector: matchLabels: strimzi.io/pool-name: kafka matchLabelKeys: - pod-template-hash maxSkew: 2 topologyKey: topology.kubernetes.io/zone whenUnsatisfiable: ScheduleAnyway status: clusterId: OJhdWX-TSL6IvXcFOCMBSQ conditions: [] labelSelector: strimzi.io/cluster=shared,strimzi.io/name=shared-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=kafka nodeIds: - 3 - 4 - 5 - 6 observedGeneration: 1 replicas: 4 roles: - broker - apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaRebalance metadata: annotations: meta.helm.sh/release-name: shared-kafka meta.helm.sh/release-namespace: shared-kafka creationTimestamp: "2025-02-26T17:57:42Z" generation: 1 labels: app.kubernetes.io/instance: shared-kafka app.kubernetes.io/managed-by: Helm app.kubernetes.io/name: kafka-strimzi-cluster app.kubernetes.io/version: 5.2.1 helm.sh/chart: kafka-strimzi-cluster-5.2.1 helm.toolkit.fluxcd.io/name: shared-kafka helm.toolkit.fluxcd.io/namespace: shared-kafka strimzi.io/cluster: shared name: shared namespace: shared-kafka resourceVersion: "160442" uid: 2556958d-19a2-44e4-a8ed-92790107b819 spec: goals: - NetworkInboundCapacityGoal - DiskCapacityGoal - RackAwareGoal - NetworkOutboundCapacityGoal - CpuCapacityGoal - ReplicaCapacityGoal skipHardGoalCheck: false status: conditions: - lastTransitionTime: "2025-02-26T20:49:28.145067841Z" status: "True" type: ProposalReady observedGeneration: 1 optimizationResult: afterBeforeLoadConfigMap: shared dataToMoveMB: 0 excludedBrokersForLeadership: [] excludedBrokersForReplicaMove: [] excludedTopics: [] intraBrokerDataToMoveMB: 0 monitoredPartitionsPercentage: 100 numIntraBrokerReplicaMovements: 0 numLeaderMovements: 0 numReplicaMovements: 0 onDemandBalancednessScoreAfter: 100 onDemandBalancednessScoreBefore: 100 provisionRecommendation: "" provisionStatus: RIGHT_SIZED recentWindows: 1 sessionId: 57a1874d-0108-48ca-a43d-9636566316d6 - apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: annotations: meta.helm.sh/release-name: partition-logscale meta.helm.sh/release-namespace: shared-kafka creationTimestamp: "2025-03-04T22:59:09Z" generation: 1 labels: app.kubernetes.io/managed-by: Helm helm.toolkit.fluxcd.io/name: partition-logscale helm.toolkit.fluxcd.io/namespace: shared-kafka strimzi.io/cluster: shared name: partition-logscale namespace: shared-kafka resourceVersion: "6078322" uid: d9487a21-ea9a-424f-ab58-1d626c5cc71a spec: authentication: type: scram-sha-512 authorization: acls: - operations: - Describe resource: type: cluster - host: '*' operations: - Delete - Describe - DescribeConfigs - Read - Write resource: name: partition-logscale patternType: prefix type: topic type: simple status: conditions: - lastTransitionTime: "2025-03-04T22:59:09.742170561Z" status: "True" type: Ready observedGeneration: 1 secret: partition-logscale username: partition-logscale
Apply the configuration:
shellkubectl apply -f kafka-kraft-cluster.yaml -n <namespace>
Verify the installation:
shellkubectl get kafka -n <namespace> kubectl get pods -n <namespace>
Verify Kafka is running
To verify that Kafka is running, use the following command:
Use nc to get the status of each ZooKeeper instance. The following must respond with either Leader or Follower for all instances
shell$
echo stat | nc 192.168.1.1 2181 | grep '^Mode: '
Optionally, use your favorite Kafka tools to validate the state of your Kafka cluster. You could list the topics using the following command, to get an empty list since this is a fresh install of Kafka:
shell$
kafka-topics.sh --zookeeper localhost:2181 --list