<pre id="bdfbb"><ruby id="bdfbb"></ruby></pre>

    <pre id="bdfbb"><b id="bdfbb"></b></pre>

    <pre id="bdfbb"><del id="bdfbb"><mark id="bdfbb"></mark></del></pre>

          <p id="bdfbb"></p>
          <p id="bdfbb"><del id="bdfbb"><dfn id="bdfbb"></dfn></del></p>

          Kafka Stream WordCountDemo.java

          小編:管理員 23閱讀 2022.07.25

          /*
           * Licensed to the Apache Software Foundation (ASF) under one or more
           * contributor license agreements. See the NOTICE file distributed with
           * this work for additional information regarding copyright ownership.
           * The ASF licenses this file to You under the Apache License, Version 2.0
           * (the "License"); you may not use this file except in compliance with
           * the License. You may obtain a copy of the License at
           *
           *    http://www.apache.org/licenses/LICENSE-2.0
           *
           * Unless required by applicable law or agreed to in writing, software
           * distributed under the License is distributed on an "AS IS" BASIS,
           * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
           * See the License for the specific language governing permissions and
           * limitations under the License.
           */ package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; /**
           * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
           * that computes a simple word occurrence histogram from an input text.
           * <p>
           * In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
           * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
           * is an updated count of a single word.
           * <p>
           * Before running this example you must create the input topic and the output topic (e.g. via
           * {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via
           * {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic.
           */ public final class WordCountDemo { public static final String INPUT_TOPIC = "streams-plaintext-input"; public static final String OUTPUT_TOPIC = "streams-wordcount-output"; static Properties getStreamsConfig() { final Properties props = new Properties();
                  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
                  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                  props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
                  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
                  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props;
              } static void createWordCountStream(final StreamsBuilder builder) { final KStream<String, String> source = builder.stream(INPUT_TOPIC); final KTable<String, Long> counts = source
                      .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                      .groupBy((key, value) -> value)
                      .count(); // need to override value serde to Long type counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
              } public static void main(final String[] args) { final Properties props = getStreamsConfig(); final StreamsBuilder builder = new StreamsBuilder();
                  createWordCountStream(builder); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() {
                          streams.close();
                          latch.countDown();
                      }
                  }); try {
                      streams.start();
                      latch.await();
                  } catch (final Throwable e) {
                      System.exit(1);
                  }
                  System.exit(0);
              }
          }
          關聯標簽:
          国产 亚洲 中文 在线 字幕,99资源网,超碰国产97一区二区三区,无码中文人妻中文中文人妻
            <pre id="bdfbb"><ruby id="bdfbb"></ruby></pre>

            <pre id="bdfbb"><b id="bdfbb"></b></pre>

            <pre id="bdfbb"><del id="bdfbb"><mark id="bdfbb"></mark></del></pre>

                  <p id="bdfbb"></p>
                  <p id="bdfbb"><del id="bdfbb"><dfn id="bdfbb"></dfn></del></p>