首页 > 代码库 > Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现

Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现

     storm在0.9.3版本中提供了一个抽象通用的Bolt KafkaBolt用来实现数据写入kafka的目的,我们先来看一个具体的例子,然后再看如何实现的。

我们用代码加注释的方式,来看下如何使用
//1. KafkaBolt的前置组件emit出来的(可以是spout也可以是bolt)
        Spout spout = new Spout(new Fields("key", "message"));
        builder.setSpout("spout", spout);

        //2. 给KafkaBolt配置topic及前置tuple消息到kafka的mapping关系
        KafkaBolt bolt = new KafkaBolt();
        bolt.withTopicSelector(new DefaultTopicSelector("tony-S2K"))
            .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");

        Config conf = new Config();
        //3. 设置kafka producer的配置
        Properties props = new Properties();
        props.put("metadata.broker.list", "10.100.90.203:9092");
        props.put("producer.type","async");
        props.put("request.required.acks", "0"); // 0 ,-1 ,1
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
        conf.put("topic","tony-S2K");

        if(args.length > 0){
            // cluster submit.
            try {
                StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }else{
            new LocalCluster().submitTopology("kafkaboltTest", conf, builder.createTopology());
        }

完整的代码参考github:
https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/KafkaBoltTestTopology.java




Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现