首页 > 代码库 > Spark Streaming 实现读取Kafka 生产数据
Spark Streaming 实现读取Kafka 生产数据
在kafka 目录下执行生产消息命令:
./kafka-console-producer --broker-list nodexx:9092 --topic 201609
在spark bin 目录下执行
./run-example streaming.JavaDirectKafkaWordCount nodexx:9092, nodexx:9092 201609
import java.util.HashMap;import java.util.HashSet;import java.util.Arrays;import java.util.regex.Pattern;import scala.Tuple2;import com.google.common.collect.Lists;import kafka.serializer.StringDecoder;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.*;import org.apache.spark.streaming.api.java.*;import org.apache.spark.streaming.kafka.KafkaUtils;import org.apache.spark.streaming.Durations;/** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: JavaDirectKafkaWordCount <brokers> <topics> * <brokers> is a list of one or more Kafka brokers * <topics> is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 */public final class JavaDirectKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" + " <brokers> is a list of one or more Kafka brokers\n" + " <topics> is a list of one or more kafka topics to consume from\n\n"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); String brokers = args[0]; String topics = args[1]; // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); JavaStreamingContext jssc; jssc = new (sparkConf, Durations.seconds(2)); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); // Get the lines, split them into words, count the words and print JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); // Start the computation jssc.start(); jssc.awaitTermination(); }}
Spark Streaming 实现读取Kafka 生产数据
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。