<pre id="bdfbb"><ruby id="bdfbb"></ruby></pre>

    <pre id="bdfbb"><b id="bdfbb"></b></pre>

    <pre id="bdfbb"><del id="bdfbb"><mark id="bdfbb"></mark></del></pre>

          <p id="bdfbb"></p>
          <p id="bdfbb"><del id="bdfbb"><dfn id="bdfbb"></dfn></del></p>

          Kafka Stream演示程序

          小編:管理員 37閱讀 2022.07.25

          本教程假定你第一次,且沒有搭建現有的Kafka或ZooKeeper。但是,如果你已經啟動了Kafka和ZooKeeper,請跳過前兩個步驟。

          Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性以及Kafka服務器端集群技術的優勢,使這些應用程序具有高度可伸縮性,彈性,容錯性,分布式等特性。

          這個快速入門示例將演示如何運行一個流應用程序。一個WordCountDemo的例子(為了方便閱讀,使用的是java8 lambda表達式)

          // Serializers/deserializers (serde) for String and Long types
          final Serde<String> stringSerde = Serdes.String();
          final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys).
          KStream<String, String> textLines = builder.stream("streams-plaintext-input",
              Consumed.with(stringSerde, stringSerde);
          
          KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words.
              .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys
              .groupBy((key, value) -> value) // Count the occurrences of each word (message key).
              .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); 

          從輸入的文本計算出一個詞出現的次數。但是,不像其他的WordCount的例子,你可能會看到,在有限的數據基礎上,執行的演示應用程序的行為略有不同,因為它應該是在一個無限數據的操作,數據流。類似的有界變量,它是一種動態算法,跟蹤和更新的單詞計數。然而,由于它必須假設潛在的無界輸入數據,它會定期輸出其當前狀態和結果,同時繼續處理更多的數據,因為它不知道什么時候它處理過的“所有”的輸入數據。

          作為第一步,我們將啟動Kafka,然后我們將輸入數據準備到Kafka主題,然后由Kafka Streams應用程序處理。

          Step 1: 下載代碼

          下載kafka并解壓它。注意,有多個可下載的Scala版本,我們選擇在這里使用推薦版本(2.11):

          > tar -xzf kafka_2.13-2.8.0.tgz
          > cd kafka_2.13-2.8.0 
          Step 2: 啟動kafka服務

          Kafka使用Zookeeper,所以第一步啟動Zookeeper服務。

          > bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
          ...

          現在啟動 Kafka server:

          > bin/kafka-server-start.sh config/server.properties
          [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
          [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
          ...
          Step 3: 準備輸入topic并啟動Kafka生產者

          接下來,我們創建一個輸入主題“streams-plaintext-input”,和一個輸出主題"streams-wordcount-output":

          > bin/kafka-topics.sh --create     --zookeeper localhost:2181  --replication-factor 1  --partitions 1  --topic streams-plaintext-input Created topic "streams-plaintext-input".

          注意:因為輸出主題是更新日志流(參見下面的應用程序輸出的說明),所以我們為輸出主題啟用了壓縮。

          > bin/kafka-topics.sh --create     --zookeeper localhost:2181  --replication-factor 1  --partitions 1  --topic streams-wordcount-output  --config cleanup.policy=compact  Created topic "streams-wordcount-output".

          也可以使用kafka topic工具查看主題描述:

          > bin/kafka-topics.sh --zookeeper localhost:2181 --describe Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
          Step 4: 啟動 Wordcount 程序

          以下命令啟動WordCount演示程序:

          > bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo 

          演示程序將從輸入主題streams-plaintext-input中讀取,對每個讀取消息執行WordCount算法計算,并將其當前結果連續寫入輸出主題streams-wordcount-output。 因此,除了日志條目外,不會有任何STDOUT輸出,因為結果會寫回到Kafka中。

          現在我們另外開一個終端,來啟動生產者來為該主題寫入一些輸入數據:

          > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input 

          在開一個終端,讀取輸出主題的數據。

          > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic streams-wordcount-output  --from-beginning  --formatter kafka.tools.DefaultMessageFormatter  --property print.key=true  --property print.value=true  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 
          Step 5: 處理數據

          現在,我們通過輸入一行文本然后按,生產一些新的消息到輸入主題streams-plaintext-input。其中消息key為空,消息value為剛剛輸入的字符串編碼文本行(實際上,應用程序的輸入數據通常會連續流入Kafka,而不是 像我們在這個快速入門中那樣手動輸入):

          > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka

          這些消息將被Wordcount程序處理,然后輸出數據到streams-wordcount-output主題中,我們新打開一個命令窗口,輸出消費者:

          > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic streams-wordcount-output  --from-beginning  --formatter kafka.tools.DefaultMessageFormatter  --property print.key=true  --property print.value=true  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer  all     1
          streams 1
          lead    1
          to      1
          kafka   1

          這里,第一列是java.lang.String格式的Kafka消息key,表示正在計數的單詞,第二列是java.lang.Longformat中的消息value,表示該單詞的最新計數。

          現在,用生產者繼續往streams-plaintext-input主題中發消息,輸入"hello kafka streams",然后:

          > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka
          hello kafka streams

          在消費者命令窗口,你可以觀察WordCount程序寫入到輸出主題的數據:

          > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic streams-wordcount-output  --from-beginning  --formatter kafka.tools.DefaultMessageFormatter  --property print.key=true  --property print.value=true  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer  all     1
          streams 1
          lead    1
          to      1
          kafka   1
          hello   1
          kafka   2
          streams 2

          在這里,最后一行打印行kafka 2和streams 2表示計數已經從1遞增到2。每當你向輸入主題寫入更多的輸入消息時,你將觀察到新的消息被添加到streams-wordcount-output主題,表示由WordCount應用程序計算出的最新字數。讓我們輸入一個最終的輸入文本行“join kafka summit”,然后在控制臺生產者中輸入主題streams-wordcount-input之前的:

          > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input all streams lead to kafka
          hello kafka streams join kafka summit

          streams-wordcount-output主題隨后將顯示相應的更新變化(請參見最后三行):

          > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic streams-wordcount-output  --from-beginning  --formatter kafka.tools.DefaultMessageFormatter  --property print.key=true  --property print.value=true  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer  all     1
          streams 1
          lead    1
          to      1
          kafka   1
          hello   1
          kafka   2
          streams 2
          join    1
          kafka   3
          summit  1

          可以看到,Wordcount應用程序的輸出實際上是一個連續的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是諸如“kafka”的記錄關鍵字。 對于具有相同密鑰的多個記錄,每個后面的記錄都是前一個記錄的更新。

          下面的兩張圖說明了幕后發生的事情。第一列顯示KTable 當前狀態的演變,它計數count的單詞出現次數。 第二列顯示從KTable的狀態更新以及發送到輸出主題streams-wordcount-output的更改記錄。

          screenshot

          screenshot

          首先正在處理文本行“all streams lead to kafka”。KTable正在建立,因為每個新單詞都會生成一個新表格(用綠色背景突出顯示),并將相應的更改記錄發送到下游KStream。

          當處理第二行文本“hello kafka streams”時,我們首次觀察到KTable中現有的條目正在被更新(這里是:“kafka”和“streams”)。 再次,更改記錄發送到輸出主題。

          (我們跳過了第三行如何處理的說明)。這解釋了為什么輸出主題具有我們上面顯示的內容,因為它包含完整的變更記錄。

          在這個例子的范圍之外,Kafka Streams在這里做的是利用表和變更日志流之間的對偶性(這里:table = KTable,changelog stream =下游KStream):你可以發布table轉換為流,并且如果你從頭到尾使用整個變更日志流,則可以重新構建表的內容。

          Step 6: 停止應用

          最后,通過Ctrl-C停止控制臺消費者,生產者,Wordcount程序,Kafka Broker和Zokeeper服務。

          關聯標簽:
          国产 亚洲 中文 在线 字幕,99资源网,超碰国产97一区二区三区,无码中文人妻中文中文人妻
            <pre id="bdfbb"><ruby id="bdfbb"></ruby></pre>

            <pre id="bdfbb"><b id="bdfbb"></b></pre>

            <pre id="bdfbb"><del id="bdfbb"><mark id="bdfbb"></mark></del></pre>

                  <p id="bdfbb"></p>
                  <p id="bdfbb"><del id="bdfbb"><dfn id="bdfbb"></dfn></del></p>