Technologie

Einrichtung von Kafka Connect im verteilten Modus und Bereitstellung auf einem PaaS

28. März 2022

Kafka Connect wurde entwickelt, um Events schnell und fehlertolerant von einer Source/ Quelle zu einem Sink/ Ziel zu übertragen. Diese Anleitung beschreibt die Einrichtung eines Kafka CDC (Change Data Capture) Connectors, der alle Events von MongoDB an ein Kafka Topic in der Confluent Cloud weiterleitet.

Das grundlegende Konzept von Kafka Connect‍‍

Um Kafka Conenct fehlertoleran einzurichten, müssen wir mehrere Worker Nodes einsetzen. Ein Failover soll immer dann stattfinden, wenn ein Node nicht mehr verfügbar ist. Um eine ausreichende Verfügbarkeit zu gewährleisten, werden wir in diesem Artikel ein Cluster mit 3 Nodes einrichten. Ein Worker Node ist ein Konsument, der MongoDB Change Stream Events konsumiert und an ein Kafka-Topic in der Confluent Cloud weiterleitet.

Dabei ist zu beachten, dass die Change Stream Events von MongoDB vollständig quittierte Schreibzugriffe (“fully acknowledged writes”) auf den MongoDB-Cluster sind und keine Schreibzugriffe aus dem WAL-Log. So haben wir die Garantie, dass wir die korrekten/konsistenten Events konsumieren, die in MongoDB geschrieben und von allen Nodes bestätigt wurden.‍

Wir müssen mehrere Worker Nodes einsetzen, um einen Kafka-Connect-Cluster zu bilden. Es gibt keine Zookeeper oder andere Komponente, die wir neben den Workern einsetzen müssen. Die Worker verwenden die Kafka-Consumer-API und sind somit Teil einer Kafka-Consumer-Gruppe. Sie hören auf Rebalances und reagieren entsprechend. Wenn ein Node nicht mehr verfügbar ist, findet ein Rebalance statt und der Polling-Task wird neu verteilt.‍

Worker Nodes kommunizieren‍

  1. mit Kafka-Brokern in Confluent
  2. untereinander

Auf einen Cluster von Worker, können wir ein Connector-Plugin installieren. In unserem Fall ist es ein Source-Connector Plugin, das die Daten von MongoDB pollt. Diese Plugins erweitern den SourceTask von Kafka-Connect selbst und geben eine Liste von SourceRecords zurück, die an die Kafka-Connect Library übergeben werden. Das bedeutet, dass das Plugin den Task konfiguriert, der von der Library ausgeführt werden soll. Die Kafka-Connect Library, die Basis auf jedem Worker, kümmert sich um diesen Task und stellt sicher, dass er im Cluster verteilt ausgeführt wird und im Falle eines Fehlers auf verfügbare Nodes übergeht.‍

Daher müssen wir das Connector-Plugin installieren und konfigurieren. Kafka Connect muss diese Konfigurationen innerhalb des Clusters speichern, damit nach einem Failover der nächste Worker den Task fortsetzen kann. Ein Connector-Plugin kann normalerweise über den Confluent-Hub heruntergeladen und installiert werden. Das Plugin kann heruntergeladen und im konfigurierten plugin.path des Workers abgelegt werden. Wird Confluent-Hub verwendet, wird das Plugin in einem Standard-Plugin.path installiert.‍

1
ENV CONNECT_PLUGIN_PATH="/usr/share/java, /usr/share/confluent-hub-components"

Docker-Hub Plugin Path config

Ein Beispiel für den Confluent-Hub-Ansatz finden Sie in der mitgelieferten Dockerdatei in diesem Leitfaden.

Wenn wir 3 Worker mit dem Connector-Plugin installiert starten, erhalten wir die folgendes Setup für einen Cluster:‍‍

Sobald der Worker korrekt konfiguriert und das Plugin installiert ist, können wir einen Connector und somit inherent einen/ mehrere Tasks erzeugen. Da sich das Plugin um die Erstellung der Tasks kümmert, müssen wir also nur einen Connector mit der richtigen Konfiguration erstellen. Für das MongoDB Source-Connector Plugin wird nur ein Task pro  MongoDB Collection erzeugt. Das heisst wenn wir mehrere Collections pollen wollen, so müssten wir für dieses mehrere Connectors erstellen, die das Plugin benutzen. Das liegt daran, dass das change-stream Feature von MongoDB so aufgebaut ist, dass es über sharded Collections skaliert – der Einfachheit halber werden wir das nicht behandeln.‍

Sobald wir 3 Worker gestartet haben, können wir einen Connector, via POST der Konfiguration an den Worker-Cluster, erzeugen. Unser Setup sieht nun wie folgt aus:‍

Ja, Sie haben richtig gelesen, die Connector Konfiguration muss (im verteilten Modus) über einen HTTP POST an den Cluster gesendet werden, um einen Connector zu erstellen. Daher sollten die Workers von aussen erreichbar sein. Entsprechende Security Massnahmen können getroffen werden.

Beispiel:

1
curl -X POST -H "Content-Type: application/json" -d @config.json http://localhost:8083/connectors

POST connector

Falls wir zum Beispiel eine Collection von einem weiteren MongoDB Clustern konsumieren wollen, müssen wir einen weiteren Connector erstellen. Der weitrere Connector hat eine andere Konfiguration für die Datenbankverbindung, die zu konsumierende Collection etc.

‍Ein vollständiges Beispiel einer Konfiguration eines Connectors finden Sie in der nachfolgenden Schritt-für-Schritt-Anleitung.

Um zu verstehen, was ein Connector genau ist, sehen wir uns nun solch ein Setup mit mehreren Connectoren an, die sich auf zwei verschiedene Datenbank-Cluster (zwei verschiedene MongoDB Cluster) verbinden.

Anstelle von zwei MongoDBs könnten wir natürlich auch ein anderes Plugin installieren, wie z. B. das Debezium PostgreSQL CDC Source Connector Plugin, und einen Connector zu erstellen, der sich mit einer PostgresSQL-Datenbank verbindet.‍

Details zum Worker: Aufbau und Funktionen

REST-API des Workers

Die REST-API jedes Workers wird hauptsächlich dazu verwendet, einen Connector zu erzeugen, aber auch zur Überwachung der Connectoren und Tasks. Via die REST-API, kann man auch Connector-Konfigurationen validieren bevor man sie effekt anfügt. Anfragen an einen der Follower-Worker werden immer an den Leader weitergeleitet.

Wir haben mehrere wichtige API-Endpunkte:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Validate a connector config for a specific plugin
curl -X PUT -H "Content-Type: application/json" -d @config.json http://localhost:8083/conntector-plugins/MongoSourceConnector/config/validate

# Add a connector
curl -X POST -H "Content-Type: application/json" -d @config.json http://localhost:8083/connectors

# Listing connectors
curl -s -X GET "http://localhost:8083/connectors/"

# Deleting a connector
curl -s -X DELETE "http://localhost:8083/connectors/name-from-config-json"

Important connector endpoints

Die gesamte API finden Sie hier:

https://docs.confluent.io/4.1.0/connect/references/restapi.html‍

Einige weitere praktische Beispiele finden Sie hier:

https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/

‍Warum die Worker miteinander kommunizieren müssen

Wenn ein Worker Node dem Cluster beitritt, lädt er das “status topic” (internal compacted topic), in dem die aktuelle Leader-Url gespeichert ist. Die Follower leiten die REST-Anfragen immer an diese Leader-Url weiter. Wieso das wichtig ist, zeigt das nachfolgende Beispiel.

Wenn wir zum Beispiel einen Connector erzeugen oder löschen, muss eine neue Konfiguration im internen Config-Topic gespeichert werden. Wenn wir dieselbe Konfiguration für einen Connector an zwei Follower-Nodes senden und die Anfragen nicht an den Leader weitergeleitet würden, könnten beide denselben Connector zum internen Config-Topic hinzufügen und wir hätten am Ende einen doppelten Connector. Daher werden die Anfragen immer an den Leader weitergeleitet, um sicherzustellen, dass keine falschen Zustände entstehen.‍

(To see that in code / Um das im Code zu sehen)‍

Wir müssen also sicherstellen, dass das Netzwerk so konfiguriert ist, dass die Worker kommunizieren können: Jeder Worker gibt einen Hostnamen und einen Port bekannt und kann bei einem Rebalancing zum Leader gewählt werden.‍

Die folgenden Einstellungen müssen in der Konfiguration hinzugefügt werden:‍

1
2
ENV CONNECT_REST_ADVERTISED_HOST_NAME="kafka-connect-worker1.yourdomain.com"
ENV CONNECT_REST_PORT="8083"

REST Configs

Ein vollständiges Beispiel finden Sie unten in der Schritt-für-Schritt-Anleitung.‍

Loadbalancer-Probleme

Wichtig ist hierbei, dass die Worker jeweils ihre eigene Route haben und ihnen kein Loadbalancer vorgeschaltet ist. Wenn Sie Ihren Kafka-Connect-Workern einen Loadbalancer mit einer Random/Round-Robin-Policy vorschalten, kann es gelegentlich zu dem unten beschriebenen Fehler kommen, da einige Ihrer Anfragen nicht an den Leader, sondern an den Follower weitergeleitet werden. Wie oben beschrieben, würde dies zu Problemen führen, sodass der unten stehende Fehler auftritt:

1
2
3
4
{
	"error_code": 409,
	"message": "Cannot complete request because of a conflicting operation (e.g. worker rebalance)"
}

‍Error with loadbalancers‍

Warum die Worker nicht nur Daten produzieren, sondern auch mit dem Kafka-Broker kommunizieren

Im verteilten Modus muss es natürlich eine Möglichkeit geben, den Connector/ Worker/ Task Status zu speichern, damit alle Worker Nodes darauf zugreifen können und bei einem Fehler entsprechend reagieren können.

  • Der aktuelle Offset muss gespeichert werden (wo wir z. B. beim Konsumieren einer MongoDB-Collection aufgehört haben).
  • Die Konfiguration eines Connectors muss gespeichert werden, damit ein Worker weiss, wohin er sich im Falle eines Failovers verbinden muss.
  • Der Status der Tasks und der ihr aktuell zugewiesene Worker müssen gespeichert werden.
  • Der Status des data topic (für einen Task) muss gespeichert werden.‍

Kafka-Connect speichert diese Informationen derzeit in 3 internen compacted Topics:‍

1
2
3
ENV CONNECT_CONFIG_STORAGE_TOPIC="connect-config"
ENV CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets"
ENV CONNECT_STATUS_STORAGE_TOPIC="connect-status"

Internal Topics‍

Details zum Connector

Um die Connector Konfiguration in der Schritt-für-Schritt-Anleitung zu verstehen, müssen wir zunächst ein paar weitere Punkte zu den Connectors behandeln.

Transformationen

Jeder Connector verbindet sich mit einer Datenquelle. Im Falle unserer MongoDB können wir den Connector so konfigurieren, dass er nicht nur die Patches jeder CRUD-Operation abruft, sondern das gesamte Dokument, das eingefügt, aktualisiert oder gelöscht wurde. Im Falle von MongoDB hat dieses „vollständige Dokument“ jedoch eine bestimmte Struktur und Properties. Wenn Sie z.B. beschliessen, dass Sie bestimmtes Property nicht benötigen oder dass Properties Ihres vollständigen Dokuments miteinander verkettet werden sollen, um den Key des Evenst zu bilden, benötigen Sie Transformationen.

Lassen Sie uns unsere MongoDB-Collection „Events“ beobachten (an die “change stream events” subscriben).‍

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
watchCursor = db.getCollection('events').watch(
  [],
  { fullDocument : "updateLookup" }
);
   
while (!watchCursor.isExhausted()) {
  if (watchCursor.hasNext()) {
    printjson(watchCursor.next());
  }
}

MongoDB watch cursor

Wenn wir ein Event hinzufügen und das “fullDocument” beobachten, erhalten wir Folgendes:‍

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
	"_id": {
		"_data": "8261CB1425000000032B022C0100296E5A10043894F093C2C54A54B1265E075C8DAD33463C5F6964003C36316362313432356539643563653030383832343731343630000004"
	},
	"operationType": "insert",
	"clusterTime": Timestamp(1640698917, 3),
	"fullDocument": {
		"_id": "61cb1425e9d5ce00882471460",
		"streamId": "c7bf6fb3-1b83-442b-9f80-8fc57ad2ac83",
		"aggregateId": "c7bf6fb3-1b83-442b-9f80-8fc57ad2ac83",
		"aggregate": "component",
		"context": "planning",
		"streamRevision": 31,
		"commitId": "61cb1425e9d5ce0088247146",
		"commitSequence": 0,
		"commitStamp": ISODate("2021-12-28T13:41:57.142Z"),
		"payload": {
			"name": "componentChanged",
			"payload": {
				"siteId": "4a48c386-100b-4b23-a39d-6585a6ba790e",
				"identifier": "Alarm type I",
				"labelling": "change stream test1",
				"orientationprice": {
					"amount": 0
				}
			},
			"aggregate": {
				"id": "c7bf6fb3-1b83-442b-9f80-8fc57ad2ac83",
				"name": "component",
				"revision": 32
			},
			"correlationId": "9cda2efd-c8f9-4c66-90c4-84a672f61e21",
			"version": 0,
			"context": {
				"name": "planning"
			},
			"id": "b8c1390f-b683-4177-92ee-e052340b8b5a",
			"occurredAt": ISODate("2021-12-28T13:41:57.142Z")
		},
		"position": null,
		"id": "61cb1425e9d5ce00882471460",
		"restInCommitStream": 0,
		"dispatched": false
	},
	"ns": {
		"db": "rs_38b089be-3f80-42a3-b0fe-8c4382ca5c83",
		"coll": "events"
	},
	"documentKey": {
		"_id": "61cb1425e9d5ce00882471460"
	}
}

MongoDB full document

Wie Sie sehen können, haben wir eine Menge umliegender Properties die wir nicht gebrauchen. Daher müssen wir Transformationen hinzufügen. Dazu gibt es verschiedene Möglichkeiten:

  • Wir transformieren das Dokument in MongoDB mithilfe von Pipelines in der Query.
  • Wir transformieren das Dokument nach dem Abruf innerhalb von Kafka Connect unter Verwendung des MongoDB-Plugins.
  • Wir transformieren das Dokument nach dem Abruf innerhalb von Kafka Connect unter Verwendung der eingebauten SMTs.
  • Wir transformieren das Dokument mit einer Kombination der oben genannten Methoden.

Beachten Sie, dass die eingebauten SMTs meist von einem Schema abhängen. MongoDB wird grösstenteils ohne Schema verwendet, sodass wir ein Schema im Plugin angeben oder ableiten müssen, um sie verwenden zu können. Je nachdem, welches Plugin Sie wählen, können Sie also Transformationen anwenden, aber das ist nicht immer der Fall. Sie sind nicht immer gut dokumentiert und müssen manchmal in der code base und den Tests der code base des Plugins nachgeschlagen werden.

Wenn unser Ziel darin besteht, einen schön aussehenden Schlüssel aus mehreren Properties innerhalb des vollständigen Dokuments zu generieren, den wir später für den eigentlichen Key in Kafka extrahieren können, können wir eine MongoDB-Pipeline-Transformation verwenden. Diese wird während der Überwachungs-Query der Collection ausgeführt, und wir können anschliessend die in MongoDB eingebaute Plugin-Transformation verwenden, um sie als Schlüssel zu extrahieren. Hierfür benötigen wir keine eingebaute SMTs.

  • Für das Beispiel soll der Schlüssel die folgende Struktur haben: context_aggregate_aggregateId_eventName.
  • In code: Setup pipeline
  • In code: Process next document
  • In code: Apply output.schema.key to document

Schauen wir uns an, wie eine Pipeline in MongoDB selbst funktioniert:‍

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
watchCursor = db.getCollection("events").watch(
  [
    {
      $addFields: {
        key: {
          $concat: [
            "$fullDocument.context",
            "_",
            "$fullDocument.aggregate",
            "_",
            "$fullDocument.aggregateId",
            "_",
            "$fullDocument.payload.name",
          ],
        },
      },
    },
  ],
  { fullDocument: "updateLookup" }
);

while (!watchCursor.isExhausted()) {
  if (watchCursor.hasNext()) {
    printjson(watchCursor.next());
  }
}

MongoDB Pipeline

1
2
3
4
5
6
7
8
{
        ...

	"documentKey" : {
		"_id" : "61cb1795e9d5ce00882471470"
	},
	"key" : "planning_component_c7bf6fb3-1b83-442b-9f80-8fc57ad2ac83_componentChanged"
}

MongoDB full document after pipeline

Wir müssen diese Pipeline zu unserer Connector-Konfiguration hinzufügen, um das Connector-Plugin anzuweisen, sie zu der Überwachung hinzuzufügen. Diese wird ausgeführt, um neue Dokumente aus dem Plugin zu streamen. Ausserdem müssen wir dieses neu generierte Property extrahieren und als Key verwenden. Dies kann über den output.schema.key erfolgen. Damit es angewendet werden kann, muss output.format.key auf „schema“ gesetzt werden. Da der Key vom Typ String ist und unser Key in Kafka ein String sein soll, können wir den key.converter auf „StringConverter“ setzen.

1
2
3
4
5
6
7
8
{
	...
	"output.schema.key": "{\"type\" : \"record\", \"name\" : \"key\",\"fields\" : [{\"name\": \"key\", \"type\": [\"string\",  \"null\"]}]}",
	"output.format.key": "schema",
	"key.converter": "org.apache.kafka.connect.storage.StringConverter",
	"key.converter.schemas.enable": false
	...
}

‍Connector config.json with MongoDB-Pipeline‍

Schritt-für-Schritt-Anleitung: So installiert man den Cluster

Als Beispiel werden wir einen Cluster von Worker Nodes in einer Cloud-Foundry-Umgebung einrichten. Cloud-Foundry wurde genommen, weil sich die anzuwendenden Schritte sehr anschaulich und verständlich darstellen lassen. Sie benötigen keine Kenntnisse über Cloud-Foundry, um dieser Anleitung zu folgen. Die Schritte können auch auf z.B. Kubernetes übernommen werden.

Was Sie in dieser Anleitung erwartet:

  • Wir werden Confluent-Cloud verwenden, um unseren Kafka-Cluster einzurichten.
  • Wir verbinden uns mit einer MongoDB-Instanz.
  • Wir wenden eine MongoDB-Pipeline an, um eins verkettetes Property für den “Key” zu erzeugen.
  • Wir extrahieren das neue erzeugte “Key” Property und verwenden es als Key innerhalb des Kafka-Topics.

‍1. Schritt: Einrichten von Confluent Cloud

Falls Sie sich noch nicht angemeldet haben, gehen Sie zu https://confluent.cloud/login und erstellen Sie einen Basis-Cluster.

Wenn Sie Ihren Cluster über längere Zeit mit dem Setup aus diesem Leitfaden betreiben, können unter Umständen Kosten anfallen. Das liegt daran, das die internen Themen eine grosse Anzahl an Partitionen verwenden.‍

Beispiel Kostenübersicht:

Wenn man das obere Setup für ca. 1 Monat laufen lässt, belaufen sich die Kosten auf ungefähr 70$. Um Kosten zu sparen, können Sie das Setup in der Zeit, in der Sie es nicht benötigen, abschalten, oder einen lokalen Kafka-Cluster betreiben.

Kafka’s Beschreibung:

If you choose to create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with a large number of partitions (e.g., 25 or 50, just like Kafka’s built-in __consumer_offsets topic) to support large Kafka Connect clusters.

z. B. für das internal offset topic ist der Standart 25 Partitionen:

offset.storage.topic = default 25 partitions‍

Für den Zweck unseres Setups könnten wir die Anzahl der Partitionen viel niedriger ansetzen, aber die Optimierung des Clusters ist nicht Teil dieser Anleitung. Denken Sie auch an den Zweck Ihres Clusters. Wenn Sie expandieren möchten, benötigen Sie mehr Partitionen.‍

‍2. Schritt: Einrichten des Workers (Dockerfile)

Wir werden das confluentinc/cp-kafka-connect-base Docker-Image wiederverwenden, das die Worker-Library enthält, um unser eigenes Docker-Image für einen Worker zu erstellen und zu konfigurieren.

Die Env-Vars können später überschrieben werden, indem die Env-Vars nach dem Deployen der Anwendung gesetzt werden.‍

1
cf set-env appName CONNECT_REST_PORT 80

Override env var via cf set-env

Wir werden

CONNECT_REST_ADVERTISED_HOST_NAME

und

CONNECT_REST_PORT

für jeden Worker später überschreiben müssen.

Aufgabe: Erstellen Sie ein neues Verzeichnis und speichern Sie das folgende Dockerfile.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Retrieve the base kafka connect image on dockerhub
FROM confluentinc/cp-kafka-connect-base

USER root
ENV CONNECT_PLUGIN_PATH="/usr/share/java, /usr/share/confluent-hub-components"
ENV CONNECT_REST_ADVERTISED_HOST_NAME="kafka-connect-workerNumber.intra.yourdomain.com"
ENV CONNECT_REST_PORT="8083"
ENV CONNECT_GROUP_ID="compose-connect-group"
ENV CONNECT_CONFIG_STORAGE_TOPIC="connect-config"
ENV CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets"
ENV CONNECT_STATUS_STORAGE_TOPIC="connect-status"
ENV CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
ENV CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
ENV CONNECT_BOOTSTRAP_SERVERS="someId.eu-central-1.aws.confluent.cloud:9092"
ENV CONNECT_SECURITY_PROTOCOL="SASL_SSL"
ENV CONNECT_SASL_JAAS_CONFIG='org.apache.kafka.common.security-icon.svg.plain.PlainLoginModule required username="someUserName" password="somePassword";'
ENV CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="https"
ENV CONNECT_SASL_MECHANISM="PLAIN"
ENV CONNECT_CLIENT_BOOTSTRAP_SERVERS="someId.eu-central-1.aws.confluent.cloud:9092"
ENV CONNECT_CLIENT_SECURITY_PROTOCOL="SASL_SSL"
ENV CONNECT_CLIENT_SASL_JAAS_CONFIG='org.apache.kafka.common.security-icon.svg.plain.PlainLoginModule required username="someUserName" password="somePassword";'
ENV CONNECT_CLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM="https"
ENV CONNECT_PRODUCER_SASL_MECHANISM="PLAIN"
ENV CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_SSL"
ENV CONNECT_PRODUCER_SASL_JAAS_CONFIG='org.apache.kafka.common.security-icon.svg.plain.PlainLoginModule required username="someUserName" password="somePassword";'
ENV CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY="All"
ENV CONNECT_PRODUCER_BATCH_SIZE=655360
ENV CONNECT_OFFSET_FLUSH_INTERVAL_MS=30000
ENV CONNECT_OFFSET_FLUSH_TIMEOUT_MS=10000

RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.6.1


COPY config.json .

COPY run.sh /usr/share/java

RUN ["chmod", "+x", "/usr/share/java/run.sh"]

ENTRYPOINT ["/usr/share/java/run.sh"]

Full Dockerfile‍

‍3. Schritt: Vorbereiten der Connector-Konfiguration (config.json)

Wie oben beschrieben, hat jeder Connector seine eigene Konfiguration, die wir überprüfen können und dann via POST anfügen müssen. Unter anderem, enthält die Konfiguration die DB Connection and die Ziel MongoDB Collection.

  • Die oben beschriebene Transformationspipeline sowie der output.schema.key müssen in dieser Konfiguration angewendet werden.
  • Weiter müssen wir die DB connection.uri und collection etc. anfügen.

Aufgabe: Kopieren Sie die folgende Konfiguration und erstellen Sie eine config.json Datei im Stammverzeichnis des neu erstellten Verzeichnisses und ersetzen Sie die erforderlichen Konfigurationseigenschaften.‍

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
	"name": "<connectorName>",
	"config": {
		"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
		"connection.uri": "<databaseUri>",
		"database": "<database>",
		"collection": "<collectionName>",
		"topic.creation.default.partitions": 1,
		"topic.creation.default.replication.factor": 3,
		"copy.existing": true,
		"topic.prefix": "<topicName>",
		"publish.full.document.only": true,
		"pipeline": "[{$addFields : { \"key\" : { \"$concat\": [\"$fullDocument.context\", \"_\", \"$fullDocument.aggregate\", \"_\", \"$fullDocument.aggregateId\", \"_\", \"$fullDocument.payload.name\"]}}}]",
		"output.schema.key": "{\"type\" : \"record\", \"name\" : \"key\",\"fields\" : [{\"name\": \"key\", \"type\": [\"string\",  \"null\"]}]}",
		"output.format.key": "schema",
		"key.converter": "org.apache.kafka.connect.storage.StringConverter",
		"key.converter.schemas.enable": false,
		"output.format.value": "json",
		"value.converter": "org.apache.kafka.connect.storage.StringConverter",
		"producer.override.batch.size": "655360",
		"producer.override.buffer.memory": "8388608"
	}
}

Full Connector config.json to POST‍

‍4. Schritt: Starten Sie den Worker und POSTen Sie die Connector-Konfiguration (run.sh)

Um den Worker zu starten, erstellen wir eine Datei und speichern sie in dem neu erstellten Verzeichnis. Es muss ein sleep infinity hinzugefügt werden, der verhindert, dass der Worker angehalten wird. Nachdem

Aufgabe:

Erstellen Sie eine Datei im Stammverzeichnis mit den Namen run.sh und dem folgenden Inahlt:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#!/usr/bin/env bash
# Launch the worker
/etc/confluent/docker/run &

# Wait for it to start running
# Change the port here if not using the default
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ‚è≥\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
# Now create your connector
echo "Connectors can now be posted to the cluster."
sleep infinity
'

‍Full run.sh‍

‍5. Schritt: Erstellen Sie das Worker Image

Aufgabe: Erstellen Sie Ihr Image mit einem Tag.‍

1
2
3
4
5
6
7
# Tag build and tage your image
docker build ./ -t username/kafka-connect-worker:1.0.0
# Push your image
docker push username/kafka-connect-worker:1.0.0
# Create a latest alias tag
docker tag username/kafka-connect-worker:1.0.0 username/kafka-connect-worker
docker push username/kafka-connect-worker

Build Docker Image‍

‍6. Schritt: Übertragen Sie Ihre Anwendung auf das PaaS

Wir werden unser Image mit einem Container pushen, der 2 GB Arbeitsspeicher und 2 GB Festplattenplatz enthält. Wir wollen 3 Worker laufen lassen, um unseren Cluster zu bilden.

Für Cloud Foundry-Benutzer können Sie den folgenden CLI Commands verwenden:‍

1
2
3
cf push kafka-connect-mongodb-worker1 --docker-image username/kafka-connect-worker:1.0.0 -k 2G -m 2G -u "process"
cf push kafka-connect-mongodb-worker2 --docker-image username/kafka-connect-worker:1.0.0 -k 2G -m 2G -u "process"
cf push kafka-connect-mongodb-worker3 --docker-image username/kafka-connect-worker:1.0.0 -k 2G -m 2G -u "process"

Push workers to cf‍

‍7. Schritt: Netzwerkkommunikation zwischen den Arbeitnehmern

Wir müssen sicherstellen, dass unsere eingesetzte Worker-Node-App die Ports geöffnet hat, die für die Kommunikation

  • mit Kafka-Brokern
  • untereinander

nötig sind.‍

In Cloud Foundry müssen wir dazu die folgenden Schritte mit der cf-cli ausführen:‍

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# ---- REPEAT THIS FOR ALL WORKERS ----
# Get the APP-GUID of the first worker
cf app kafka-connect-worker1 --guid
# Make sure the app listens on both ports: 9092 (kafka broker), 8083 (worker's REST port)
cf curl /v2/apps/APP-GUID -X PUT -d '{"ports": [9092, 8083]}'

# Now create a new internal app-to-app route for which we forward only traffic on the port 8083 for the cluster internal worker nodes.
## Create internal app-to-app only route
cf map-route kafka-connect-worker1 apps.internal --hostname kafka-connect-worker1
## Get the route-mapping URL
cf curl /v2/apps/APP-GUID/routes
## Get route_guide from the route-mapping
cf curl /v2/routes/af63546d-5e50-4481-8471-d0a5ba06c63e/route_mappings
## Make the route forward traffic to port 8083 to the app
cf curl /v2/route_mappings -X POST -d '{"app_guid": "APP-GUID", "route_guid": "ROUTE-GUID", "app_port": 8083}'

# Overwrite the REST settings of the worker
cf set-env kafka-connect-worker1 CONNECT_REST_ADVERTISED_HOST_NAME "kafka-worker1.apps.internal"
cf set-env kafka-connect-worker1 CONNECT_REST_PORT 80
# ---- REPEAT THIS FOR ALL WORKERS ----

# We need to add a network policy so that the workers can talk via the internal routes with each other
cf add-network-policy kafka-connect-worker1 --destination-app kafka-connect-worker2 --port 8083 --protocol tcp
cf add-network-policy kafka-connect-worker1 --destination-app kafka-connect-worker3 --port 8083 --protocol tcp
cf add-network-policy kafka-connect-worker2 --destination-app kafka-connect-worker1 --port 8083 --protocol tcp
cf add-network-policy kafka-connect-worker2 --destination-app kafka-connect-worker3 --port 8083 --protocol tcp
cf add-network-policy kafka-connect-worker3 --destination-app kafka-connect-worker1 --port 8083 --protocol tcp
cf add-network-policy kafka-connect-worker3 --destination-app kafka-connect-worker2 --port 8083 --protocol tcp

Workers internal routes setup

Sie müssen dies für jeden der 3 Worker Nodes, die wir geschoben haben, wiederholen.‍

‍8. Schritt: Erstellen einer public/ externen Route

Wir haben die internen Routen erstellent und somit Cluster-Kommunikation intakt. Eine externe Route muss nun angefügt werden, damit wir die in Schritt 3 erstellte Connector-Konfiguration via POST an unseren Cluster senden können. Die Route die wir erstellen werden, wird den Traffic round-robin an einen der erstellten Workers weiterleiten. Der Worker selbst, wird diesen Request dann innerhalb des Clusters an den Master-Worker weiterleiten. In dieser Anleitung werden wir keine Sicherheitsmassnahmen bezüglich dieser Route treffen, Sie sollten aber eine Authentifizierung mitandenken.‍

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
echo "{
      \"host\": \"kafka-connect-external\",
      \"relationships\": {
        \"domain\": {
          \"data\": { \"guid\": \"$domainGuid\" }
        },
        \"space\": {
          \"data\": { \"guid\": \"$spaceGuid\" }
        }
      },
      \"metadata\": {
        \"labels\": {},
        \"annotations\": {}
      }
    }" > payload.json

cf curl "/v3/routes" -X POST -d @payload.json

# Add all the destinations of the workers to the external route so the traffic ends up on one of them # in a round robin fashion
destinations="{ \"destinations\": ["
  for i in ${!workers[@]}; do
    workername=${workers[$i]}
    cf app kafka-connect-"$spacename"-"$workername" --guid
    appGuid=$result
    destinations+="{
        \"app\": {
          \"guid\": \"$appGuid\",
          \"process\": {
            \"type\": \"web\"
          }
        },
        \"port\": 8083,
        \"protocol\": \"http1\"
      }"

      len=${#workers[@]}
      pos=$(( i + 1 ))
      if [ $pos -ne $len ]; then
        destinations+=","
      fi
  done
destinations+="]}"
echo "$destinations" > payload.json
cf curl "/v3/routes/$routeGuid/destinations" -X POST -d @payload.json

Worker’s external route - round robin to one of the workers‍

‍9. Schritt: POST der Connector-KonfigEs ist nun Zeit unsere in Schritt 3 erstellte Connector-Konfiguration via POST an unseren Cluster zu senden. Wir verwenden dazu unsere neu erstellte Route.

1
2
3
4
5
# POST the connector config
curl https://kafka-connect-external.yourdomain.com/connectors -X POST -d @config.json

# Validate if it worked
curl https://kafka-connect-externa.yourdomain.com/connectors -H "Accept: application/json" 

‍POST connector config‍

‍10. Schritt: Testen Sie Ihren Cluster

Wenn Sie einen Worker stoppen, können Sie eine Reblances beobachten. Stoppen Sie zum Beispiel den Worker, der gerade dabei ist einen Task auszuführen, sollte ein anderer Worker innerhalb des Clusters diesen Task übernehmen und weiterführen. Beachten Sie dabei aber: wenn es nur eine Collection zu verarbeiten gibt, kann es nur ein Worker-Node sein, der arbeitet. In Confluent-Cloud können Sie die internen Topics inspizieren und nachsehen, welcher Worker den Task ausführt – diese sollte sich ändern sobald ein anderer Worker-Node den Task übernimmt..

Sie haben Fragen zu Kafka Connect? Wir sind Ihnen gerne behilflich. Schicken Sie uns über das Kontaktformular eine Nachricht und wir melden uns bei Ihnen. ‍‍

Download the full deployment script here

📝 Gefällt dir, was du liest?

Dann lass uns in Kontakt treten! Unser Engineering-Team wird sich so schnell wie möglich bei dir melden.