Kategorien: | HowTos |
---|---|
Tags: | apachekafka® cadence |
In Teil 3 meiner Cadence-Blogserie habe ich die Drone Delivery Demo-Anwendung vorgestellt und mich dabei auf den Drohnen-Workflow konzentriert. In diesem Beitrag werden wir uns Drone Delivery aus der Perspektive der Bestellungs-Workflows ansehen, erfahren, wie die Drohnen- und Bestellungs-Workflows miteinander interagieren, einige zusätzliche Cadence+Kafka-Integrationsmuster entdecken und einige neue Cadence-Funktionen (z. B. Neuversuche, Nebeneffekte, Abfragen und Als neu fortfahren) genauer betrachten.
Damit wir tiefer in die Drone Delivery-Anwendung eintauchen können, verwenden wir dieses übersichtliche Architekturdiagramm, um den Aufbau besser verstehen zu können. Die oberen 2 Reihen sind die Apache Kafka®-Komponenten, und die unteren 2 Reihen sind die Cadence-Workflows. Ich habe die Workflow-Schritte aus Gründen der Übersichtlichkeit vereinfacht. In den unteren Reihen wird jeder Drohnen-Workflow einer physischen Drohne zugeordnet, wobei die Schritte den Ereignissen im Lebenszyklus der Drohnenlieferung entsprechen.
Aus der Bestellungsperspektive spiegeln die nummerierten Schritte die Ereignisse im Lebenszyklus von Bestellungen/Lieferungen wie folgt wider:
Sehen wir uns nun den Bestellablauf genauer an.
Der erste Schritt im Lebenszyklus einer Bestellung besteht darin, dass ein Kunde etwas über eine Drohnenlieferungs-App bestellt. Wir gehen davon aus, dass dies ein Ereignis „Neue Bestellung erstellen“ auslöst, das in das Kafka-Topic „Neue Bestellungen“ gestellt wird. Dies bringt uns zum ersten unserer neuen Cadence+Kafka-Integrationsmuster, dem unten stehenden Muster „Neuen Cadence-Workflow von Kafka starten“ (1).
Dieses Muster ist einfach. Ein unabhängiger Kafka-Consumer läuft ständig und holt die nächste Bestellung aus dem Topic „Neue Bestellungen“ ab. Mithilfe eines Cadence-Clients erstellt und startet er eine neue Bestellungs-Workflow-Instanz. Hier ist ein beispielhafter Code dafür:
WorkflowClient workflowClient =
WorkflowClient.newInstance(
new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build()),
WorkflowClientOptions.newBuilder().setDomain(domainName).build());
Properties kafkaProps = new Properties();
try (FileReader fileReader = new FileReader("consumer2.properties")) {
kafkaProps.load(fileReader);
} catch (IOException e) {
e.printStackTrace();
}
// uses a unique group
kafkaProps.put("group.id", "newOrder");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
consumer.subscribe(Collections.singleton(newordersTopicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.print("Consumer got new Order WF creation request! ");
System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
String orderName = record.value().toString();
OrderWorkflow orderWorkflow = workflowClient.newWorkflowStub(OrderWorkflow.class);
System.out.println("Starting new Order workfow!");
WorkflowExecution workflowExecution = WorkflowClient.start(orderWorkflow::startWorkflow, orderName);
System.out.println("Started new Order workfow! Workflow ID = " + workflowExecution.getWorkflowId());
}
}
}
}
Dies ist also ein Beispiel für die Ausführung von Cadence-Code in einem Kafka-Consumer. Etwas Ähnliches haben wir bereits in Teil 2 gesehen, wo wir ein Signal an einen laufenden Cadence-Workflow in einem Kafka-Consumer gesendet haben. Der Unterschied besteht darin, dass wir in diesem Beispiel einen Cadence-Workflow starten.
Der Bestellungs-Workflow ist ganz einfach. Nach dem Start werden zufällige Bestellungs- und Lieferorte generiert (die garantiert innerhalb der Reichweite der Drohne liegen, damit sie auch zur Basis zurückkehren kann), in einer Activity wird eine Nachricht an Kafka gesendet, um mitzuteilen, dass die Drohne bereit für die Lieferung ist (siehe unten), der Status des Ortes wird in einer Schleife aktualisiert (die über ein Signal vom Drohnen-Workflow empfangen wird) und dann wird gewartet, bis der Status „orderComplete“ erreicht ist, um den Prozess zu beenden. Andere Activities sind denkbar, z. B. die Überprüfung auf Lieferverletzungen und das Senden von Standortaktualisierungen an Kafka zur Analyse und Zuordnung.
public static class OrderWorkflowImpl implements OrderWorkflow {
@Override
public String startWorkflow(String name) {
System.out.println("Started Order workflow " + name + ", ID=" + Workflow.getWorkflowInfo().getWorkflowId());
// Order creates fake order and delivery locations
// randomly generated but within range of Drones
startLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(baseLocation, 0.1, maxLegDistance));
System.out.println("Order WF startLocation = " + startLocation.toString());
deliveryLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(startLocation, 0.1, maxLegDistance));
System.out.println("Order WF deliveryLocation = " + deliveryLocation.toString());
// A real activity - request a drone - wraps a Kafka producer
activities.readyForDelivery(name);
boolean delivered = false;
String endState = "orderComplete";
while (!delivered)
{
Workflow.await(() -> newState != "");
System.out.println("order " + name + " got signal = " + newState);
updates.add(newState);
if (newState.equals(endState))
{
delivered = true;
System.out.println("Order WF exiting!");
}
lastState = newState;
newState = "";
}
return "Order " + name + " " + endState;
}
}
Wir werden die Nebeneffekte unten erklären.
Sobald die Bestellung zur Abholung bereit ist (möglicherweise nach einer Verzögerung aufgrund der Vorbereitungszeit für die Bestellung), sind wir bereit für die entscheidende Koordination zwischen den Workflows der Drohne und der Bestellung unter Verwendung des Cadence+Kafka-Musters „nächsten Auftrag aus einer Warteschlange holen“ (2, 3).
Wir erhoffen uns von dieser Interaktion, dass (a) die Drohnen bereit sind, eine Bestellung auszuliefern, (b) die Bestellungen zur Auslieferung bereit sind, (c) genau eine Bestellung genau einer Drohne zugewiesen wird. Das heißt, wir wollen nicht, dass sich Drohnen um Bestellungen „streiten“, dass Drohnen versuchen, mehr als eine Bestellung auszuliefern, oder dass Bestellungen, denen keine Drohne zugewiesen wird, nicht ausgeführt werden. (a) und (b) können in beliebiger Reihenfolge auftreten, und es können jederzeit 0 oder mehr Drohnen oder Bestellungen bereitstehen.
Wie funktioniert dieses Muster in der Praxis? Es besteht tatsächlich aus zwei Cadence+Kafka-Untermustern.
Das erste Muster (2) ist eine einfache Ein-Wege-Benachrichtigung von Cadence an Kafka. Der Bestellungs-Workflow hat eine Activity, readyForDelivery()
, die einen Kafka-Producer umschließt. Dies ist ein Fernaufruf, der fehlschlagen kann. Deshalb habe ich eine Cadence Activity verwendet, obwohl sie nicht lange läuft und nicht auf eine Antwort wartet, anders als das Cadence+Kafka-Microservices-Muster, das wir in Blog 2 demonstriert haben, das eine Benachrichtigung sendet und dann auf eine Antwort von Kafka wartet. Der Producer sendet die ID der Bestellung an das Topic „Bestellungen bereit“ und dann blockiert der Bestellungs-Workflow, während er mit Workflow.await()
auf ein Signal von einer Drohne wartet, das besagt, dass die Bestellung abgeholt wurde.
Aber wie nimmt die Drohne eine vorbereitete Bestellung an? Hier kommt das zweite Muster ins Spiel (3). Der Drohnen-Workflow hat eine Activity „Warten auf Bestellung“ (3). Diese umschließt einen Kafka-Consumer (3a), der tatsächlich im Cadence Activity-Thread läuft. Er ist also vorübergehend und dauert nur so lange, wie die Activity läuft. Der Workflow fragt das Topic „Bestellungen bereit“ ab, bis eine einzelne Bestellung zurückgegeben wird (3b), wodurch die Activity abgeschlossen wird (3c).
Kafka-Consumer werden für diesen Anwendungsfall etwas anders als normalerweise verwendet. Es gibt genau einen Consumer pro Drohnen-Workflow im Zustand „Warten auf Bestellung“. Der Consumer fragt das Topic so lange ab, bis ein einziger Datensatz zurückgegeben wird, und wird dann beendet. Um sicherzustellen, dass nur 1 Bestellung zurückgegeben wird, haben wir max.poll.records
auf 1 gesetzt.
Alle diese Consumer teilen sich eine gemeinsame Consumer-Gruppe, sodass die Bestellungen auf alle wartenden Drohnen verteilt werden, aber nur eine Drohne die jeweilige Bestellung erhalten kann. Wir verwenden keinen Kafka-Schlüssel, sodass die Datensätze einfach nach dem Round-Robin-Prinzip an die Consumer geliefert werden. Es kann ein gewisser Overhead entstehen, weil Consumer regelmäßig der Gruppe beitreten und sie wieder verlassen (hauptsächlich Verzögerung durch Neuverteilung). Und wenn die Anzahl der Drohnen steigt, muss die Anzahl der Partitionen des Topics erhöht werden, um sicherzustellen, dass es genügend Partitionen für die Anzahl der Consumer gibt. Die Regel lautet: Partitionen >= Consumer. Sie könnten versucht sein, die Anzahl der Partitionen zu Beginn sehr hoch anzusetzen, aber frühere Experimente haben gezeigt, dass zu viele Partitionen den Durchsatz des Kafka-Clusters verringern können und dass es eine optimale Anzahl von Partitionen gibt, die von der Größe des Clusters abhängt (<= 100 Partitionen ist für den normalen Betrieb in Ordnung). Wenn Sie mehr Drohnen haben, erhöhen Sie einfach die Größe Ihres Kafka-Clusters, um damit Schritt zu halten. Hier ist die Implementierung der waitForOrder()
Activity:
public static class DroneActivitiesImpl implements DroneActivities
{
public String waitForOrder(String name) {
// Kafka consumer that polls for a new Order that's been created and is ready for pickup to trigger Drone delivery trip
// Each Drone can only have 1 order at a time, and each order can only be delivered by 1 drone (or drone wars may result)
Properties kafkaProps = new Properties();
try (FileReader fileReader = new FileReader("consumer2.properties")) {
kafkaProps.load(fileReader);
} catch (IOException e) {
e.printStackTrace();
}
// set max.poll.records to 1 so we onlyu get 1 order at time.
// All consumers waiting for order are in their own shared consumer group
// NOTE that this means we need partitions >= number of Drones - assumption is this is < 100 for performance reasons
kafkaProps.put("group.id", "waitForOrder");
kafkaProps.put("max.poll.records", "1");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
consumer.subscribe(Collections.singleton(orderjobsTopicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.print("waitForOrder got an order! ");
System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
// ensure that we don't get this order again
consumer.commitAsync();
return record.value().toString();
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
return "";
}
}
Der Code ist in unserem Github-Repository verfügbar.
Das soll es für diesen Blog-Beitrag gewesen sein. Im nächsten Teil werden wir mit einer Zusammenfassung der verwendeten Cadence- & Kafka-Integrationsmuster fortfahren und einige der neuen Cadence-Funktionen genauer betrachten.
Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.
Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.
Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.
Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.
Kategorien: | HowTos |
---|---|
Tags: | apachekafka® cadence |
über den Autor
zur Person
Paul is the Technology Evangelist at Instaclustr/Spot by NetApp. For the past five years, he has been learning new scalable Big Data technologies, solving realistic problems, building applications, and blogging and talking about Apache Cassandra, Apache Spark, Apache Kafka, Redis, Elasticsearch, PostgreSQL, Cadence, and many more open source technologies. Since learning to program on a VAX 11/780, Paul has extensive R&D, teaching, and consulting experience in distributed systems, technology innovation, software architecture and engineering, software performance and scalability, grid and cloud computing, and data analytics and machine learning. Paul has also worked at Waikato University (NZ), UNSW, CSIRO, UCL (UK), NICTA/ANU, and several tech start-ups. Paul has an MSc in Machine Learning and a BSc (Computer Science and Philosophy).
Benachrichtigungen