首页 > 代码库 > Storm Transport OptionalDataException问题解决

Storm Transport OptionalDataException问题解决

    使用的Storm版本是0.9.2,在运行一段时间后(时间不定,最快几十分钟),某个worker会报如下异常

java.lang.RuntimeException: java.lang.RuntimeException: java.io.OptionalDataException
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
Caused by: java.lang.RuntimeException: java.io.OptionalDataException
    at backtype.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-          incubating]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) ~[kryo-2.21.jar:na]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.21.jar:na]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) ~[kryo-2.21.jar:na]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:629) ~[kryo-2.21.jar:na]
    at backtype.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:38) ~[storm-core-0.9.2-incubating.jar:0.9. 2-incubating]
    at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:53) ~[storm-core-0.9.2-incubating.jar:0.9.2-     incubating]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:396) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    ... 6 common frames omitted
Caused by: java.io.OptionalDataException: null
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1370) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) ~[na:1.7.0_67]
    at java.util.LinkedList.readObject(LinkedList.java:1136) ~[na:1.7.0_67]
    at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_67]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_67]
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) ~[na:1.7.0_67]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) ~[na:1.7.0_67]
    at backtype.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:56) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    ... 15 common frames omitted

    从异常本身分析,是executor thread在反序列化读取disruptor queue中的信息时出现了问题,当序列化写入的数据类型和读取的数据类型不一致的时候,可能会报OptionalDataException。但是在代码运行过程中,传输的对象本身并没有发生变化,Google了一把,基本没啥收获,最后通过检查代码发现传输的实体类没有定义serialVersionUID,补全后重新提交运行,暂时没再出现异常。按道理serialVersionUID是用于实体类代码有更改时做版本验证的,在Storm拓扑运行过程中,实体类本身并没有发生变化,报错有点诡异。

    在排查问题的过程中,发现有同学在"DisruptorQueue.java:128"同一位置报NPE异常,官方的Troubleshooting是这样解释的

    This is caused by having multiple threads issue methods on the OutputCollector. All emits, acks, and fails must happen on the same thread. One subtle way this can happen is if you make a IBasicBolt that emits on a separate thread. IBasicBolt’s automatically ack after execute is called, so this would cause multiple threads to use the OutputCollector leading to this exception. When using a basic bolt, all emits must happen in the same thread that runs execute.

    意思是使用OutputCollector进行emit,ack,fail调用时,必须在同一个线程中进行,但有同学遇到即使在同一个thread中调用,也会出现NPE问题,和该同学交流后得知,是由于Worker的JVM内存回收时暂停造成读取线程Timeout导致的,调优JVM内存参数后,问题解决。

    在排查异常的过程中,也获取了两篇不错的文章,分别是讲解Storm的Transport机制和相关调优的,分享给大家

    http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

    https://gist.github.com/mrflip/5958028

本文出自 “lotso的博客” 博客,请务必保留此出处http://lotso.blog.51cto.com/3681673/1562425

Storm Transport OptionalDataException问题解决