Erste Schritte mit Apache Kafka und Python

Erste Schritte mit Apache Kafka und Python
In dieser Lektion werden wir sehen, wie wir Apache Kafka mit Python verwenden und eine Beispielanwendung mit dem Python -Client für Apache Kafka erstellen können.

Um diese Lektion abzuschließen, müssen Sie eine aktive Installation für Kafka auf Ihrer Maschine haben. Lesen Sie Apache Kafka auf Ubuntu, um zu wissen, wie das geht.

Installieren Sie den Python -Client für Apache Kafka

Bevor wir mit Apache Kafka im Python -Programm arbeiten können, müssen wir den Python -Client für Apache Kafka installieren. Dies kann verwendet werden Pip (Python -Paketindex). Hier ist ein Befehl, um dies zu erreichen:

PIP3 Installieren Sie Kafka-Python

Dies ist eine schnelle Installation am Terminal:

Python Kafka Client -Installation mit PIP

Nachdem wir eine aktive Installation für Apache Kafka haben und auch den Python Kafka -Client installiert haben, sind wir bereit, mit dem Codieren zu beginnen.

Einen Produzenten machen

Das erste, was Sie Nachrichten über Kafka veröffentlichen müssen, ist eine Produzentenanwendung, die Nachrichten an Themen in Kafka senden kann.

Beachten Sie, dass Kafka -Produzenten asynchrone Nachrichtenproduzenten sind. Dies bedeutet, dass die Operationen, die während einer Nachricht zur Kafka-Topic-Partition veröffentlicht werden, nicht blockiert werden. Um die Dinge einfach zu halten, werden wir einen einfachen JSON -Verlag für diese Lektion schreiben.

Machen Sie zu Beginn eine Instanz für den Kafka -Produzenten:

vom Kafka -Import Kafkaproducer
JSON importieren
pprint importieren
Produzent = Kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.Dumps (v).codieren ('utf-8'))

Das Attribut bootstrap_servers informiert über den Host & Port für den Kafka -Server. Das Attribut Value_Serializer dient nur zum Zweck der JSON -Serialisierung der aufgetretenen JSON -Werte.

Um mit dem Kafka -Produzenten zu spielen, versuchen wir, die Metriken im Zusammenhang mit dem Produzenten und Kafka -Cluster zu drucken:

Metriken = Produzent.Metriken ()
pprint.pprint (Metriken)

Wir werden jetzt Folgendes sehen:

Kafka Mterics

Lassen Sie uns nun endlich versuchen, eine Nachricht an die Kafka -Warteschlange zu senden. Ein einfaches JSON -Objekt ist ein gutes Beispiel:

Hersteller.send ('LinuxHint', 'Thema': 'Kafka')

Der LinuxHint ist die Thema Partition, auf der das JSON -Objekt gesendet wird. Wenn Sie das Skript ausführen, erhalten Sie keine Ausgabe, da die Nachricht gerade an die Themenpartition gesendet wird. Es ist Zeit, einen Verbraucher zu schreiben, damit wir unsere Anwendung testen können.

Verbraucher machen

Jetzt sind wir bereit, eine neue Verbindung als Verbraucheranwendung herzustellen und die Nachrichten vom Kafka -Thema zu erhalten. Beginnen Sie mit einer neuen Instanz für den Verbraucher:

vom Kafka -Import Kafkaconsumer
Aus Kafka Import Topicpartition
drucken ('Verbindung herstellen.'))
Consumer = Kafkaconsumer (Bootstrap_Servers = 'localhost: 9092')

Weisen Sie nun dieser Verbindung ein Thema und einen möglichen Offset -Wert zu.

print ('zuweisen Thema zuweisen.'))
Verbraucher.zuweisen ([topicPartition ('LinuxHint', 2)])

Schließlich sind wir bereit, die MSSAGE zu drucken:

drucken ('Meldung abrufen.'))
Für die Nachricht im Verbraucher:
print ("offset:" + str (message [0]) + "\ t msg:" + str (meldung))

Dadurch erhalten wir eine Liste aller veröffentlichten Nachrichten auf der Kafka Consumer -Topic -Partition. Die Ausgabe für dieses Programm ist:

Kafka -Verbraucher

Nur für eine kurze Referenz finden Sie hier das komplette Produzenten -Skript:

vom Kafka -Import Kafkaproducer
JSON importieren
pprint importieren
Produzent = Kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.Dumps (v).codieren ('utf-8'))
Hersteller.send ('LinuxHint', 'Thema': 'Kafka')
# Metriken = Produzent.Metriken ()
# pprint.pprint (Metriken)

Und hier ist das vollständige Verbraucherprogramm, das wir verwendet haben:

vom Kafka -Import Kafkaconsumer
Aus Kafka Import Topicpartition
drucken ('Verbindung herstellen.'))
Consumer = Kafkaconsumer (Bootstrap_Servers = 'localhost: 9092')
print ('zuweisen Thema zuweisen.'))
Verbraucher.zuweisen ([topicPartition ('LinuxHint', 2)])
drucken ('Meldung abrufen.'))
Für die Nachricht im Verbraucher:
print ("offset:" + str (message [0]) + "\ t msg:" + str (meldung))

Abschluss

In dieser Lektion haben wir uns angesehen, wie wir Apache Kafka in unseren Python -Programmen installieren und beginnen können. Wir haben gezeigt, wie einfach es ist, einfache Aufgaben im Zusammenhang mit Kafka in Python mit dem demonstrierten Kafka -Kunden für Python auszuführen.