Kafka Stream演示程序
小編:管理員 37閱讀 2022.07.25
本教程假定你第一次,且沒有搭建現有的Kafka或ZooKeeper。但是,如果你已經啟動了Kafka和ZooKeeper,請跳過前兩個步驟。
Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性以及Kafka服務器端集群技術的優勢,使這些應用程序具有高度可伸縮性,彈性,容錯性,分布式等特性。
這個快速入門示例將演示如何運行一個流應用程序。一個WordCountDemo的例子(為了方便閱讀,使用的是java8 lambda表達式)
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
Consumed.with(stringSerde, stringSerde);
KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys
.groupBy((key, value) -> value) // Count the occurrences of each word (message key).
.count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
從輸入的文本計算出一個詞出現的次數。但是,不像其他的WordCount的例子,你可能會看到,在有限的數據基礎上,執行的演示應用程序的行為略有不同,因為它應該是在一個無限數據的操作,數據流。類似的有界變量,它是一種動態算法,跟蹤和更新的單詞計數。然而,由于它必須假設潛在的無界輸入數據,它會定期輸出其當前狀態和結果,同時繼續處理更多的數據,因為它不知道什么時候它處理過的“所有”的輸入數據。
作為第一步,我們將啟動Kafka,然后我們將輸入數據準備到Kafka主題,然后由Kafka Streams應用程序處理。
Step 1: 下載代碼下載kafka并解壓它。注意,有多個可下載的Scala版本,我們選擇在這里使用推薦版本(2.11):
> tar -xzf kafka_2.13-2.8.0.tgz
> cd kafka_2.13-2.8.0
Step 2: 啟動kafka服務
Kafka使用Zookeeper,所以第一步啟動Zookeeper服務。
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
現在啟動 Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
Step 3: 準備輸入topic并啟動Kafka生產者
接下來,我們創建一個輸入主題“streams-plaintext-input”,和一個輸出主題"streams-wordcount-output":
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input Created topic "streams-plaintext-input".
注意:因為輸出主題是更新日志流(參見下面的應用程序輸出的說明),所以我們為輸出主題啟用了壓縮。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact Created topic "streams-wordcount-output".
也可以使用kafka topic工具查看主題描述:
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Step 4: 啟動 Wordcount 程序
以下命令啟動WordCount演示程序:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
演示程序將從輸入主題streams-plaintext-input中讀取,對每個讀取消息執行WordCount算法計算,并將其當前結果連續寫入輸出主題streams-wordcount-output。 因此,除了日志條目外,不會有任何STDOUT輸出,因為結果會寫回到Kafka中。
現在我們另外開一個終端,來啟動生產者來為該主題寫入一些輸入數據:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
在開一個終端,讀取輸出主題的數據。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Step 5: 處理數據
現在,我們通過輸入一行文本然后按,生產一些新的消息到輸入主題streams-plaintext-input。其中消息key為空,消息value為剛剛輸入的字符串編碼文本行(實際上,應用程序的輸入數據通常會連續流入Kafka,而不是 像我們在這個快速入門中那樣手動輸入):
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka
這些消息將被Wordcount程序處理,然后輸出數據到streams-wordcount-output主題中,我們新打開一個命令窗口,輸出消費者:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1
streams 1
lead 1
to 1
kafka 1
這里,第一列是java.lang.String格式的Kafka消息key,表示正在計數的單詞,第二列是java.lang.Longformat中的消息value,表示該單詞的最新計數。
現在,用生產者繼續往streams-plaintext-input主題中發消息,輸入"hello kafka streams",然后:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka
hello kafka streams
在消費者命令窗口,你可以觀察WordCount程序寫入到輸出主題的數據:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
在這里,最后一行打印行kafka 2和streams 2表示計數已經從1遞增到2。每當你向輸入主題寫入更多的輸入消息時,你將觀察到新的消息被添加到streams-wordcount-output主題,表示由WordCount應用程序計算出的最新字數。讓我們輸入一個最終的輸入文本行“join kafka summit”,然后在控制臺生產者中輸入主題streams-wordcount-input之前的:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input all streams lead to kafka
hello kafka streams join kafka summit
streams-wordcount-output主題隨后將顯示相應的更新變化(請參見最后三行):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
可以看到,Wordcount應用程序的輸出實際上是一個連續的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是諸如“kafka”的記錄關鍵字。 對于具有相同密鑰的多個記錄,每個后面的記錄都是前一個記錄的更新。
下面的兩張圖說明了幕后發生的事情。第一列顯示KTable
首先正在處理文本行“all streams lead to kafka”。KTable正在建立,因為每個新單詞都會生成一個新表格(用綠色背景突出顯示),并將相應的更改記錄發送到下游KStream。
當處理第二行文本“hello kafka streams”時,我們首次觀察到KTable中現有的條目正在被更新(這里是:“kafka”和“streams”)。 再次,更改記錄發送到輸出主題。
(我們跳過了第三行如何處理的說明)。這解釋了為什么輸出主題具有我們上面顯示的內容,因為它包含完整的變更記錄。
在這個例子的范圍之外,Kafka Streams在這里做的是利用表和變更日志流之間的對偶性(這里:table = KTable,changelog stream =下游KStream):你可以發布table轉換為流,并且如果你從頭到尾使用整個變更日志流,則可以重新構建表的內容。
Step 6: 停止應用最后,通過Ctrl-C停止控制臺消費者,生產者,Wordcount程序,Kafka Broker和Zokeeper服務。
相關推薦
- kafka消費者Java客戶端 一個從kafka集群中獲取消息的java客戶端。kafka客戶端從kafka集群中獲取消息,并透明地處理kafka集群中出現故障broker,透明地調節適應集群中變化的數據分區。也和broker交互,負載平衡消費者。public class KafkaConsumerK,V extends Object implements Consu…
- Apache RocketMQ 社區創建和協同創新 去年,我曾經撰寫了一篇關于 非英語系國家的社區是如何理解并使用 Apache way 進行開放式創新 的博客。在那篇文章里,我表達了作為一名開發者的期待,即能夠熟練地使用郵件列表功能,認真傾聽社區的聲音,再做出決策。此外,開源社區也可以多開展類似“ GSoC ”…