首页 > 代码库 > (六)storm-kafka源码走读之PartitionManager

(六)storm-kafka源码走读之PartitionManager

PartitionManager算是storm-kafka的核心类了,现在开始简单分析一下。还是先声明一下,metric部分这里不做分析。

PartitionManager主要负责的是消息的发送、容错处理,所以PartitionManager会有三个集合

  •  _pending尚未发送的message的offset集合, 是个TreeSet<Long>()
  • failed : 发送失败的offset 集合,是个TreeSet<Long>()
  • _waitingToEmit: 存放待发射的message,是个LinkedList,从kafka读到的message全部先放在这里
还有几个变量需要说一下:
  • Long _emittedToOffset;   从kafka读到的offset,从kafka读到的messages会放入_waitingToEmit,放入这个list,我们就认为一定会被emit,所以emittedToOffset可以认为是从kafka读到的offset 
  • Long _committedTo;  已经写入zk的offset
  • lastCompletedOffset() 已经被成功处理的offset,由于message是要在storm里面处理的,其中是可能fail的,所以正在处理的offset是缓存在_pending中的 
    如果_pending为空,那么lastCompletedOffset=_emittedToOffset 
    如果_pending不为空,那么lastCompletedOffset为pending list里面第一个offset,因为后面都还在等待ack
public long lastCompletedOffset() {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.first();
        }
    }

来看初始化的过程:
     public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
        _partition = id;
        _connections = connections;
        _spoutConfig = spoutConfig;
        _topologyInstanceId = topologyInstanceId;
        _consumer = connections.register(id.host, id.partition);
        _state = state;
        _stormConf = stormConf;
        numberAcked = numberFailed = 0;

        String jsonTopologyId = null;
        Long jsonOffset = null;
        String path = committedPath();
        try {
            Map<Object, Object> json = _state.readJSON(path); // 从zk读取offset
            LOG.info("Read partition information from: " + path +  "  --> " + json );
            if (json != null) {
                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                jsonOffset = (Long) json.get("offset");
            }
        } catch (Throwable e) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
        }

		/**
		 * 根据用户设置的startOffsetTime,值来读取offset(-2 从kafka头开始  -1 是从最新的开始 0 =无 从ZK开始)
		 **/
        Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);

        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
            _committedTo = currentOffset;
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
        } else {
            _committedTo = jsonOffset;
            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
        }

		/**
		 * 下面这个if判断是如果当前读取的offset值与提交到zk的值不一致,且相差Long.MAX_VALUE,就认为中间很大部分msg发射了没有提交,就把这部分全部放弃,避免重发
		 * 令_committedTo = currentOffset, 这个是新修复的bug,之前maxOffsetBehind=100000(好像是这个值,这个太小),我后面看下这个bug issue再把这部分解释清楚一下,现在有点迷糊了
		 **/
        if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
            LOG.info("Last commit offset from zookeeper: " + _committedTo);
            _committedTo = currentOffset;
            LOG.info("Commit offset " + _committedTo + " is more than " +
                    spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
        }

        LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
        _emittedToOffset = _committedTo;

        _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
        _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
        _fetchAPICallCount = new CountMetric();
        _fetchAPIMessageCount = new CountMetric();
    }
刚开始的时候需要读取message,放到_waitingToEmit中,这是fill的过程,看代码
private void fill() {
        long start = System.nanoTime();
        long offset;
		
		// 首先要判断是否有fail的offset, 如果有的话,在需要从这个offset开始往下去读取message,所以这里有重发的可能
        final boolean had_failed = !failed.isEmpty(); 

        // Are there failed tuples? If so, fetch those first.
        if (had_failed) {
            offset = failed.first(); // 取失败的最小的offset值,
        } else {
            offset = _emittedToOffset;
        }

        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
        long end = System.nanoTime();
        long millis = (end - start) / 1000000;
        _fetchAPILatencyMax.update(millis);
        _fetchAPILatencyMean.update(millis);
        _fetchAPICallCount.incr();
        if (msgs != null) {
            int numMessages = 0;

            for (MessageAndOffset msg : msgs) {
                final Long cur_offset = msg.offset();
                if (cur_offset < offset) {
                    // Skip any old offsets.
                    continue;
                }
				
				/**
				 * 只要是没有失败的或者失败的set中含有该offset(因为失败msg有很多,我们只是从最小的offset开始读取msg的)
				 * ,就把这个message放到待发射的list中
				 **/
                if (!had_failed || failed.contains(cur_offset)) {
                    numMessages += 1;
                    _pending.add(cur_offset);
                    _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
                    _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                    if (had_failed) { // 如果失败列表中含有该offset,就移除,因为要重新发射了。
                        failed.remove(cur_offset);
                    }
                }
            }
            _fetchAPIMessageCount.incrBy(numMessages);
        }
    }

fetchMessage过程
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
        ByteBufferMessageSet msgs = null;
        String topic = config.topic;
        int partitionId = partition.partition;
        for (int errors = 0; errors < 2 && msgs == null; errors++) { //容忍两次错误
            FetchRequestBuilder builder = new FetchRequestBuilder();
            FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
                    clientId(config.clientId).maxWait(config.fetchMaxWait).build();
            FetchResponse fetchResponse;
            try {
                fetchResponse = consumer.fetch(fetchRequest);
            } catch (Exception e) {
                if (e instanceof ConnectException ||
                        e instanceof SocketTimeoutException ||
                        e instanceof IOException ||
                        e instanceof UnresolvedAddressException
                        ) {
                    LOG.warn("Network error when fetching messages:", e);
                    throw new FailedFetchException(e);
                } else {
                    throw new RuntimeException(e);
                }
            }
            if (fetchResponse.hasError()) { // 主要处理offset outofrange的case,通过getOffset从earliest或latest读
                KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
                if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
                    long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
                    LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                            "retrying with default start offset time from configuration. " +
                            "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
                    offset = startOffset;
                } else {
                    String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                    LOG.error(message);
                    throw new FailedFetchException(message);
                }
            } else {
                msgs = fetchResponse.messageSet(topic, partitionId);
            }
        }
        return msgs;
    }



再来看next方法,这个方法就是KafkaSpout 的nextTuple真正调用的方法
//returns false if it's reached the end of current batch
    public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill(); // 开始时获取message
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); //每次读取一条
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
			// 如果忘记了,可以再返回看下自定义scheme这篇 : http://blog.csdn.net/wzhg0508/article/details/40874155
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); 
            if (tups != null) {
                for (List<Object> tup : tups) { //这个地方在讲述自定义Scheme时,提到了
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break; // 这里就是每成功发射一天msg,就break掉,返回emitstate给kafkaSpout的nextTuple中做判断和定时commit成功处理的offset到zk
            } else {
                ack(toEmit.offset); // ack 做清除工作
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

来看ack的操作:
public void ack(Long offset) {
        if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
            // Too many things pending!
            _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
        }
        _pending.remove(offset);
        numberAcked++;
    }

接下来看下,commit的操作,这个操作也是在kafkaSpout中调用的
public void commit() {
        long lastCompletedOffset = lastCompletedOffset();
        if (_committedTo != lastCompletedOffset) {
            LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
            Map<Object, Object> data = http://www.mamicode.com/(Map) ImmutableMap.builder()>
这个commit很简单,不用我多做解释了
重点来看下fail处理吧
public void fail(Long offset) {
        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
            LOG.info(
                    "Skipping failed tuple at offset=" + offset +
                            " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
                            " behind _emittedToOffset=" + _emittedToOffset
            );
        } else {
            LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
            failed.add(offset);
            numberFailed++;
            if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
                throw new RuntimeException("Too many tuple failures");
            }
        }
    }
之前storm-kafka-0.8plus的版本是这样的(摘自storm-kafka-0.8-plus 源码解析

首先作者没有cache message,而只是cache offset 
所以fail的时候,他是无法直接replay的,在他的注释里面写了,不这样做的原因是怕内存爆掉

所以他的做法是,当一个offset fail的时候, 直接将_emittedToOffset回滚到当前fail的这个offset 
下次从Kafka fetch的时候会从_emittedToOffset开始读,这样做的好处就是依赖kafka做replay,问题就是会有重复问题 
所以使用时,一定要考虑,是否可以接受重复问题

public void fail(Long offset) {
        //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
        // things might get crazy with lots of timeouts
        if (_emittedToOffset > offset) {
            _emittedToOffset = offset;
            _pending.tailSet(offset).clear();
        }
    }

大家对比一下吧

后续继续更新上述未说清楚的地方,望大家包涵

reference

http://www.cnblogs.com/fxjwind/p/3808346.html

(六)storm-kafka源码走读之PartitionManager