首页 > 代码库 > (三)storm-kafka源码走读之如何构建一个KafkaSpout

(三)storm-kafka源码走读之如何构建一个KafkaSpout

上一节介绍了config的相关信息,这一节说下,这些参数分别是什么,在zookeeper中的存放路径是怎样的,之前QQ群里有很多不知道该怎么传入正确的参数来new 一个kafkaSpout,其主要还是参数传递正确就可。


看SpoutConfig的构造函数

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
        super(hosts, topic);
        this.zkRoot = zkRoot;
        this.id = id;
    }

需要一个BrokerHosts,看代码:

public class ZkHosts implements BrokerHosts {
    private static final String DEFAULT_ZK_PATH = "/brokers";

    public String brokerZkStr = null;
    public String brokerZkPath = null; // e.g., /kafka/brokers
    public int refreshFreqSecs = 60;

    public ZkHosts(String brokerZkStr, String brokerZkPath) {
        this.brokerZkStr = brokerZkStr;
        this.brokerZkPath = brokerZkPath;
    }

    public ZkHosts(String brokerZkStr) {
        this(brokerZkStr, DEFAULT_ZK_PATH);
    }
}
需要brokerZKStr,这个其实就是hosts列表,多个host以逗号隔开,因为zookeeper解析string时是以逗号分隔的,这里附上zookeeper的解析代码

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

其中主要StringParser做解析的,看俺如何解析的就知道了

public ConnectStringParser(String connectString) {
        // parse out chroot, if any
        int off = connectString.indexOf('/');
        if (off >= 0) {
            String chrootPath = connectString.substring(off);
            // ignore "/" chroot spec, same as null
            if (chrootPath.length() == 1) {
                this.chrootPath = null;
            } else {
                PathUtils.validatePath(chrootPath);
                this.chrootPath = chrootPath;
            }
            connectString = connectString.substring(0, off);
        } else {
            this.chrootPath = null;
        }

        String hostsList[] = connectString.split(",");
        for (String host : hostsList) {
            int port = DEFAULT_PORT;
            int pidx = host.lastIndexOf(':');
            if (pidx >= 0) {
                // otherwise : is at the end of the string, ignore
                if (pidx < host.length() - 1) {
                    port = Integer.parseInt(host.substring(pidx + 1));
                }
                host = host.substring(0, pidx);
            }
            serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
        }
    }

好了,这里就说到这了。

刚才说到brokerZKStr需要,还有一个参数就是zkpath,这个可以自己定,也有个默认值 “/brokers”

SpoutConfig还有个zkroot,这个zkroot其实就是Consumer端消费的信息存放地方,好了给个例子:

String topic = “test”;  //
        String zkRoot = “/kafkastorm”; //
        String spoutId = “id”; //读取的status会被存在,/kafkastorm/id下面,所以id类似consumer group
        
        BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181"); // 这里使用默认的/brokers

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 下一节介绍 scheme
        
        /*spoutConfig.zkServers = new ArrayList<String>(){{ //只有在local模式下需要记录读取状态时,才需要设置
            add("10.118.136.107");
        }};
        spoutConfig.zkPort = 2181;*/
        
        spoutConfig.forceFromStart = true; 
        spoutConfig.startOffsetTime = -1;//从最新的开始消费 
        // spoutConfig.metricsTimeBucketSizeInSecs = 6;

        builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);

这里就成功建了一个KafkaSpout,如果项目运行成功的话,

可以到zk master上看下相关信息,

./bin/zkCli.sh -server 10.1.110.24:2181



而对于StaticHosts来说,看官方解释:

StaticHosts

This is an alternative implementation where broker -> partition information is static. In order to construct an instance of this class you need to first construct an instance of GlobalPartitionInformation.

    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
    partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
    partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
    partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);
个人认为是需要开发人员,自己知道partition与broker之间的对应关系,正确关联起来。而storm-kafka 0.9.0.1的版本是,不需要指定,我只需要传入zkServer list,partition总数,由kafkautil利用两个for(遍历所有broker和partition)循环,临时生成Consumer去连接消费一下试试,如果有数据,那么就把partitionId和brokerHost关系存到Map中去。可想而知0.9.3-rc1为什么要改成这样了。如果该broker没有该partition信息,后果会怎样???笔者没有测试过,有测试过的请留言,说一下情况。

Reference

http://www.cnblogs.com/fxjwind/p/3808346.html

(三)storm-kafka源码走读之如何构建一个KafkaSpout