Technologie

Kafka 101 Tutorial - Erste Schritte mit Confluent Kafka

29. März 2023

Button Github Button Linkedin Button Twitter


Wir zeigen, wie man einen lokalen Kafka-Cluster mit Docker Containern betreibt. Wir werden auch zeigen, wie man mit Hilfe der CLI Ereignisse aus Kafka produziert und konsumiert.

Die Infrastruktur

Der Cluster wird mit Confluent-Images eingerichtet. Insbesondere werden wir 4 Dienste einrichten:

  1. Zookeeper
  2. Kafka Server (der Broker)
  3. Confluent Schema Registry (zur Verwendung in einem späteren Artikel…)
  4. Confluent Control Center (die Benutzeroberfläche zur Interaktion mit dem Cluster)

Beachten Sie, dass mit Kafka 3.4 die Möglichkeit eingeführt wird, einen Kafka-Cluster vom Zookeeper- in den KRaft-Modus zu verschieben. Zum Zeitpunkt der Erstellung dieses Artikels hat Confluent das neue Docker-Image mit Kafka 3.4 noch nicht veröffentlicht. Daher verwenden wir in diesem Tutorial weiterhin Zookeeper. Eine Diskussion über Zookeeper und KRaft finden Sie in diesem Artikel.

Die Dienste werden mithilfe von docker-compose hochgefahren und orchestriert. Lassen Sie uns kurz die Konfigurationen durchgehen.

Zookeeper

Wie üblich müssen wir Zookeeper mit unserem Kafka-Cluster verbinden. Zookeeper ist für die Speicherung von Metadaten über den Cluster zuständig (z. B. wo sich die Partitionen befinden, welche Replik die führende ist, usw.). Dieser “zusätzliche” Dienst, der immer zusammen mit einem Kafka-Cluster gestartet werden muss, wird bald veraltet sein, da die Verwaltung der Metadaten vollständig in den Kafka-Cluster integriert wird, indem der neue Kafka-Raft-Metadatenmodus, abgekürzt KRaft, verwendet wird.

Die Confluent-Implementierung von Zookeeper bietet ein paar Konfigurationen, die hier verfügbar sind.

Insbesondere müssen wir Zookeper mitteilen, auf welchem Port es auf Verbindungen von Clients, in unserem Fall Apache Kafka, hören soll. Dies wird mit dem Schlüssel ZOOKEEPER_CLIENT_PORT konfiguriert. Sobald dieser Port ausgewählt ist, muss der entsprechende Port im Container freigegeben werden. Diese Konfiguration allein reicht aus, um die Kommunikation zwischen dem Kafka-Cluster und Zookeeper zu ermöglichen. Die entsprechende Konfiguration ist unten verfügbar, wie sie in unserer docker-compose Datei verwendet wird.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
version: '3.3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  broker:
    ...
  schema-registry:
    ...
  control-center:
    ...

Kafka Server

Wir müssen auch einen einzelnen Kafka-Broker konfigurieren, mit einer minimalen, praktikablen Konfiguration. Wir müssen die Port-Zuordnungen und die Netzwerkeinstellungen (Zookeeper, angekündigte Listener usw.) festlegen. Außerdem legen wir einige grundlegende Konfigurationen für Logging und Metriken fest.

Details zur Konfiguration finden Sie auf der Confluent-Website; und alle Konfigurationen können hier gefunden werden.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
version: '3.3'
services:
  zookeeper:
    ...
  broker:
    image: confluentinc/cp-server:7.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.apache.kafka=ERROR, kafka=ERROR, kafka.cluster=ERROR,kafka.controller=ERROR, kafka.coordinator=ERROR,kafka.log=ERROR,kafka.server=ERROR,kafka.zookeeper=ERROR,state.change.logger=ERROR
      KAFKA_LOG4J_ROOT_LOGLEVEL: ERROR
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_ENABLE: 'true'

  schema-registry:
    ...
  control-center:
    ...

Start des Clusters

Um den Cluster zu starten, klonen Sie zunächst das Repository; und wechseln lokal mit cd in das Repository.

Stellen Sie sicher, dass die Ports, die von localhost gemappt werden, nicht bereits verwendet werden und dass Sie keine laufenden Container mit denselben Namen wie die in unserer Datei docker-compose.yaml definierten haben (überprüfen Sie den Konfigurationsschlüssel container_name).

1
2
git clone git@github.com:theodorecurtil/kafka_101.git
cd kafka_101

Um den Cluster zu starten, führen Sie einfach diesen Befehl aus

1
docker-compose up -d

Abhängig von der Docker-Version, die Sie haben, könnte der Befehl folgendermassen lauten

1
docker compose up -d

Um zu überprüfen, ob alle Dienste gestartet sind, führen Sie den folgenden Befehl aus

1
docker-compose ps

Die Ausgabe sollte sein

1
2
3
4
5
NAME                IMAGE                                             COMMAND                  SERVICE             CREATED             STATUS              PORTS
broker              confluentinc/cp-server:7.2.1                      "/etc/confluent/dock…"   broker              9 seconds ago       Up 7 seconds        0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9101->9101/tcp, :::9101->9101/tcp, 0.0.0.0:29092->29092/tcp, :::29092->29092/tcp
control-center      confluentinc/cp-enterprise-control-center:7.2.1   "/etc/confluent/dock…"   control-center      9 seconds ago       Up 6 seconds        0.0.0.0:9021->9021/tcp, :::9021->9021/tcp
schema-registry     confluentinc/cp-schema-registry:7.2.1             "/etc/confluent/dock…"   schema-registry     9 seconds ago       Up 7 seconds        0.0.0.0:8081->8081/tcp, :::8081->8081/tcp
zookeeper           confluentinc/cp-zookeeper:7.2.1                   "/etc/confluent/dock…"   zookeeper           9 seconds ago       Up 7 seconds        2888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 3888/tcp

Sie sollten nun in der Lage sein, auf den Container control-center zuzugreifen, der die Confluent-Benutzeroberfläche für die Verwaltung von Kafka-Clustern auf localhost:9021 darstellt. In den Online-Ressourcen finden Sie eine Führung durch Confluent control center.

Produzieren und Konsumieren von Messages mit der CLI

Bei Kafka gibt es die Begriffe Producer und Consumers. Producers sind Client-Anwendungen, die Daten in den Cluster schreiben. Consumers hingegen lesen Daten aus dem Cluster und führen letztendlich Arbeit auf den gelesenen Daten aus, wie beispielsweise eine Flink-Anwendung. Confluent bietet CLI-Tools, um Messages über die Befehlszeile zu erzeugen und zu konsumieren. In diesem Abschnitt werden wir uns das Folgende ansehen:

  1. Erstellen eines Topics
  2. Schreiben (Produzieren) in das Topic
  3. Lesen (konsumieren) aus dem Topic

Um auf die CLI-Tools zuzugreifen, müssen wir in den broker-Container gehen.

1
docker exec -it broker bash

Ein Topic erstellen

In diesem Beispiel erstellen wir ein einfaches Topic im Cluster mit den Standardkonfigurationen. Wir erstellen ein Topic mit dem Namen my-amazing-topic mit einem Replikationsfaktor von 1 und einer Partitionierung von 1. Das bedeutet, dass Messages nicht repliziert werden (1 Message wird nur auf einem Server gespeichert) und nicht partitioniert werden (1 Partition ist gleichbedeutend mit keiner Partitionierung). Das bedeutet, dass das Topic in 1 Log aufgeteilt wird.

Um dieses Topic zu instanziieren, führen Sie den folgenden Befehl innerhalb des Containers broker aus

1
2
3
4
kafka-topics --bootstrap-server localhost:9092 --create \
  --topic my-amazing-topic \
  --partitions 1 \
  --replication-factor 1

Wenn der Befehl erfolgreich ist, wird er Folgendes ausgeben

1
Created topic my-amazing-topic.

Sie können auch überprüfen, ob das Topic erfolgreich erstellt wurde, indem Sie zur Registerkarte Topics der Web-Benutzeroberfläche navigieren; dort sollte das neu erstellte Topic mit dem Status Healthy aufgeführt sein.

Produzieren für das Topic

Nachdem wir ein Topic mit Standardkonfiguration erstellt haben, können wir nun beginnen, Datensätze an dieses Topic zu senden. Führen Sie dazu den folgenden Befehl im Container aus und senden Sie Ihre Messages.

1
2
3
[appuser@broker ~]$ kafka-console-producer --bootstrap-server localhost:9092 --topic my-amazing-topic
>foo
>bar

Mit diesem Befehl werden 2 Messages an das Topic my-amazing-topic ohne Schlüssel und mit den Werten foo und bar, einige Zeichenketten, erstellt.

Man kann sehen, dass die Messages für das Topic erstellt wurden und im Topic verbleiben, indem man zur Registerkarte Topics navigiert.

Wenn Sie auf die Registerkarte Schema klicken, werden Sie feststellen, dass kein Schema vorhanden ist. Das bedeutet, dass das Topic Datensätze mit unterschiedlichen Schemata enthalten kann, z. B. Strings oder Json-Strings. Es wird kein Schema erzwungen, was in der Produktion natürlich keine gute Praxis ist; daher die Notwendigkeit des Containers schema-registry. Aber machen Sie sich jetzt keine Sorgen, wir werden diesen Punkt in unserem nächsten Blog-Beitrag ansprechen, in dem wir eine kleine Producer-Anwendung bauen werden, die Avro-Datensätze mit Schema-Validierung an Kafka sendet.

Verbrauchen aus dem Topic

Der letzte Schritt besteht darin, die soeben erstellten Messages aus dem Topic zu konsumieren. Geben Sie dazu im Container den folgenden Befehl ein

1
2
3
4
5
[appuser@broker ~]$ kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic my-amazing-topic \
  --from-beginning
foo
bar

Den Cluster beenden

Wenn Sie genug mit Ihrem Kafka-Cluster gespielt haben, möchten Sie ihn vielleicht abschalten. Um dies zu tun, cd in dieses Projekt-Repos wieder und docker-compose die Infrastruktur nach unten.

1
2
cd kafka_101
docker-compose down

Was kommt als nächstes?

In einem nächsten Blog-Beitrag werden wir sehen, wie man - ausgehend von dieser Vanilla-Kafka-Infrastruktur - Avro-Datensätze für Kafka erzeugt. Bleiben Sie dran!

📝 Gefällt dir, was du liest?

Dann lass uns in Kontakt treten! Unser Engineering-Team wird sich so schnell wie möglich bei dir melden.