首页 > 代码库 > _00023 Kafka 诡异操作_001自定义Encoder实现Class级别的数据传送以及解析

_00023 Kafka 诡异操作_001自定义Encoder实现Class级别的数据传送以及解析

博文作者:妳那伊抹微笑
博客地址:http://blog.csdn.net/u012185296
博文标题:_00023 Kafka 诡异操作_001自定义Encoder实现Class级别的数据传送以及解析
个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群:214293307  云计算之嫣然伊笑(期待与你一起学习,共同进步)

# Kafka 高级部分之自定义Encoder实现Class级别的数据传送已经解析

# 前言

本博文中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。

本博文《_00023 Kafka 诡异操作_001自定义Encoder实现Class级别的数据传送以及解析》中的Eclipse工程代码下载地址 http://download.csdn.net/detail/u012185296/7633405

# Class级别信息Send的原理

简单的说就是将一个Class给序列化成一个Byte[],然后再将Byte[]给反序列化成一个Class,前提是这个Class必须实现java.io.Serializable这个接口就OK,是不是很简单,饿靠!、、、

然后再自定义Encoder就行了,下面是一个参考案例,使用一个User类

# 自定义Encoder实现Class级别的producer和consumer

在这里我们使用一个User类作为producer的send,具体请看下面的源代码

#自定义Partition实现HashCode Partition

具体请看下面的源代码

# 运行 UserProducer,下面是运行结果(Eclipse下运行)

log4j:WARN No appenders could be found for logger(kafka.utils.VerifiableProperties).

log4j:WARN Please initialize the log4j system properly.

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

User [addr=addr000, age=age0, id=id000,name=name000, sex=sex0]

encoder ---> User [addr=addr000,age=age0, id=id000, name=name000, sex=sex0]

encoder ---> User [addr=addr000,age=age0, id=id000, name=name000, sex=sex0]

hash partition ---> User [addr=addr000,age=age0, id=id000, name=name000, sex=sex0]

User [addr=addr001, age=age1, id=id001,name=name001, sex=sex1]

encoder ---> User [addr=addr001,age=age1, id=id001, name=name001, sex=sex1]

encoder ---> User [addr=addr001,age=age1, id=id001, name=name001, sex=sex1]

hash partition ---> User [addr=addr001,age=age1, id=id001, name=name001, sex=sex1]

User [addr=addr002, age=age2, id=id002,name=name002, sex=sex0]

encoder ---> User [addr=addr002,age=age2, id=id002, name=name002, sex=sex0]

encoder ---> User [addr=addr002,age=age2, id=id002, name=name002, sex=sex0]

hash partition ---> User [addr=addr002,age=age2, id=id002, name=name002, sex=sex0]

User [addr=addr003, age=age3, id=id003,name=name003, sex=sex1]

encoder ---> User [addr=addr003,age=age3, id=id003, name=name003, sex=sex1]

encoder ---> User [addr=addr003,age=age3, id=id003, name=name003, sex=sex1]

hash partition ---> User [addr=addr003,age=age3, id=id003, name=name003, sex=sex1]

User [addr=addr004, age=age4, id=id004,name=name004, sex=sex0]

encoder ---> User [addr=addr004,age=age4, id=id004, name=name004, sex=sex0]

encoder ---> User [addr=addr004,age=age4, id=id004, name=name004, sex=sex0]

hash partition ---> User [addr=addr004,age=age4, id=id004, name=name004, sex=sex0]

User [addr=addr005, age=age5, id=id005,name=name005, sex=sex1]

encoder ---> User [addr=addr005,age=age5, id=id005, name=name005, sex=sex1]

encoder ---> User [addr=addr005,age=age5, id=id005, name=name005, sex=sex1]

hash partition ---> User [addr=addr005,age=age5, id=id005, name=name005, sex=sex1]

User [addr=addr006, age=age6, id=id006,name=name006, sex=sex0]

encoder ---> User [addr=addr006,age=age6, id=id006, name=name006, sex=sex0]

encoder ---> User [addr=addr006,age=age6, id=id006, name=name006, sex=sex0]

hash partition ---> User [addr=addr006,age=age6, id=id006, name=name006, sex=sex0]

User [addr=addr007, age=age7, id=id007,name=name007, sex=sex1]

encoder ---> User [addr=addr007,age=age7, id=id007, name=name007, sex=sex1]

encoder ---> User [addr=addr007,age=age7, id=id007, name=name007, sex=sex1]

hash partition ---> User [addr=addr007,age=age7, id=id007, name=name007, sex=sex1]

User [addr=addr008, age=age8, id=id008,name=name008, sex=sex0]

encoder ---> User [addr=addr008,age=age8, id=id008, name=name008, sex=sex0]

encoder ---> User [addr=addr008,age=age8, id=id008, name=name008, sex=sex0]

hash partition ---> User [addr=addr008,age=age8, id=id008, name=name008, sex=sex0]

User [addr=addr009, age=age9, id=id009,name=name009, sex=sex1]

encoder ---> User [addr=addr009,age=age9, id=id009, name=name009, sex=sex1]

encoder ---> User [addr=addr009,age=age9, id=id009, name=name009, sex=sex1]

hash partition ---> User [addr=addr009,age=age9, id=id009, name=name009, sex=sex1]

producer is successful .

这里可以看到我们的UserProducer已经将User类的数据传送到Kafka了,现在就等ConsumerKafka中取出数据了

# 运行 UserSimpleConsumer,下面是运行结果(Eclipse下运行)

# partition 0的运行结果

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel).

log4j:WARN Please initialize the log4j system properly.

0: User [addr=addr000, age=age0, id=id000,name=name000, sex=sex0]

1: User [addr=addr002, age=age2, id=id002,name=name002, sex=sex0]

2: User [addr=addr006, age=age6, id=id006,name=name006, sex=sex0]

3: User [addr=addr009, age=age9, id=id009, name=name009,sex=sex1]

0~3,一共4条记录

# partition 1的运行结果

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinderfor further details.

log4j:WARN No appenders could be found for logger(kafka.network.BlockingChannel).

log4j:WARN Please initialize the log4j system properly.

0: User [addr=addr001, age=age1, id=id001,name=name001, sex=sex1]

1: User [addr=addr003, age=age3, id=id003,name=name003, sex=sex1]

2: User [addr=addr004, age=age4, id=id004,name=name004, sex=sex0]

3: User [addr=addr005, age=age5, id=id005,name=name005, sex=sex1]

4: User [addr=addr007, age=age7, id=id007,name=name007, sex=sex1]

5: User [addr=addr008, age=age8, id=id008, name=name008,sex=sex0]

0~5,一共6条记录

两个分区加起来刚好10条记录

序列化跟反序列化都成功了,OK

# 这里是源代码

# User.java

package com.yting.cloud.kafka.entity;

 

import java.io.Serializable;

 

/**

 * User entity

 *

 * @Author 王扬庭

 * @Time 2014-07-18

 *

 */

public class Userimplements Serializable{

    private static final longserialVersionUID= 6345666479504626985L;

    private String id;

    private String name;

    private String sex;

    private String age;

    private String addr;

 

    public User() {

    }

 

    public User(String id, String name, String sex, Stringage, String addr) {

       this.id = id;

       this.name = name;

       this.sex = sex;

       this.age = age;

       this.addr = addr;

    }

 

    public String getId() {

       return id;

    }

 

    public void setId(String id) {

       this.id = id;

    }

 

    public String getName() {

       return name;

    }

 

    public void setName(String name) {

       this.name = name;

    }

 

    public String getSex() {

       return sex;

    }

 

    public void setSex(String sex) {

       this.sex = sex;

    }

 

    public String getAge() {

       return age;

    }

 

    public void setAge(String age) {

       this.age = age;

    }

 

    public String getAddr() {

       return addr;

    }

 

    public void setAddr(String addr) {

       this.addr = addr;

    }

 

    @Override

    public String toString() {

       return "User [addr=" + addr + ",age=" + age + ", id=" + id + ", name="

              + name + ", sex=" + sex + "]";

    }

   

}

# HashSimplePartitioner.java

package com.yting.cloud.kafka.partition;

 

 

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

 

/**

 * Kafka官网给的案例SimplePartitioner,官网给的是0.8.0的版本,跟0.8.1的版本不一样,所以改了下,你懂的!

 *

 * @Author 王扬庭

 * @Time2014-07-18

 *

 */

public class HashSimplePartitioner implementsPartitioner {

       publicHashSimplePartitioner(VerifiableProperties props) {

 

       }

 

       @Override

       publicint partition(Object key, int numPartitions) {

              System.out.println("hashpartition ---> " + key);

              returnkey.hashCode() % numPartitions;

       }

 

}

 

# UserEncoder.java

package com.yting.cloud.kafka.encoder;

 

import com.yting.cloud.kafka.entity.User;

import com.yting.cloud.kafka.util.BeanUtils;

 

import kafka.serializer.Encoder;

import kafka.utils.VerifiableProperties;


 /**
 * UserEncoder
 * 
 * @Author 王扬庭
 * @Time 2014-07-18
 * 
 */

public class UserEncoderimplementsEncoder<User>{

 

     publicUserEncoder(VerifiableProperties props) {

        

     }

 

    @Override

    public byte[] toBytes(User user) {

       System.out.println("encoder ---> " +user);

       return BeanUtils.object2Bytes(user);

    }

   

}

 

# UserProducer.java

package com.yting.cloud.kafka.producer;

 

import java.util.*;

 

import com.yting.cloud.kafka.entity.User;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

 

/**

 * Kafka官网给的案例 Producer,饿在Eclipse下本地连接服务器测试,所以修改了一些代码

 *

 * @Author 王扬庭

 * @Time 2014-07-18

 *

 */

public class UserProducer {

    public static void main(String[]args) {

       long events = 10;

 

       Properties props = newProperties();

//     props.put("metadata.broker.list","broker1:9092,broker2:9092");

       props.put("metadata.broker.list","rs229:9092"); // Eclipse下rs229在本地hosts也要配置,或者写成ip形式也可以

       props.put("serializer.class","com.yting.cloud.kafka.encoder.UserEncoder"); //需要修改

       props.put("partitioner.class","com.yting.cloud.kafka.partition.HashSimplePartitioner");

       props.put("zookeeper.connect","rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka"); //需要修改

       props.put("request.required.acks","1");

 

       ProducerConfig config = newProducerConfig(props);

 

       Producer<User, User>producer = new Producer<User, User>(config);

 

       for (long nEvents = 0; nEvents< events; nEvents++) {

           User msg = newUser("id00"+nEvents, "name00"+nEvents, "sex"+nEvents%2,"age"+nEvents, "addr00"+nEvents);

           System.out.println(msg);

           KeyedMessage<User,User> data = http://www.mamicode.com/new KeyedMessage("test-user-001",msg, msg);

           producer.send(data);

       }

       producer.close();

      

       System.out.println("produceris successful .");

    }

}

# UserSimpleConsumer.java

package com.yting.cloud.kafka.consumer;

 

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.*;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

 

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

import com.yting.cloud.kafka.entity.User;

import com.yting.cloud.kafka.util.BeanUtils;

 

/**

 * Kafka官网给的案例 SimpleConsumer,饿在Eclipse本地连接服务器测试,所以修改了一些代码

 *

 * @Author 王扬庭

 * @Time 2014-07-18

 *

 */

public class UserSimpleConsumer {

    public static void main(Stringargs[]) {

       UserSimpleConsumer example =new UserSimpleConsumer();

       long maxReads = 100;

       String topic ="test-user-001";

       int partition = 0; //

//     int partition = 1; //

       List<String> seeds = newArrayList<String>();

       seeds.add("rs229");

       seeds.add("rs227");

       seeds.add("rs226");

       seeds.add("rs198");

       seeds.add("rs197");

       int port =Integer.parseInt("9092");

       try {

           example.run(maxReads,topic, partition, seeds, port);

       } catch (Exception e) {

           System.out.println("Oops:"+ e);

           e.printStackTrace();

       }

    }

 

    private List<String>m_replicaBrokers = new ArrayList<String>();

 

    public UserSimpleConsumer() {

       m_replicaBrokers = newArrayList<String>();

    }

 

    public void run(long a_maxReads,String a_topic, int a_partition, List<String> a_seedBrokers, int a_port)throws Exception {

       // find the meta data aboutthe topic and partition we are interested in

       //

       PartitionMetadata metadata =http://www.mamicode.com/findLeader(a_seedBrokers, a_port, a_topic,

              a_partition);

       if (metadata =http://www.mamicode.com/= null) {

           System.out

                  .println("Can‘tfind metadata for Topic and Partition. Exiting");

           return;

       }

       if (metadata.leader() == null){

           System.out

                  .println("Can‘tfind Leader for Topic and Partition. Exiting");

           return;

       }

       String leadBroker =metadata.leader().host();

       String clientName ="Client_" + a_topic + "_" + a_partition;

 

       SimpleConsumer consumer = newSimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);

       long readOffset =getLastOffset(consumer, a_topic, a_partition,

              kafka.api.OffsetRequest.EarliestTime(),clientName);

 

       int numErrors = 0;

       while (a_maxReads > 0) {

           if (consumer == null) {

              consumer = newSimpleConsumer(leadBroker, a_port, 100000,

                     64 * 1024,clientName);

           }

           FetchRequest req = newFetchRequestBuilder().clientId(clientName)

                  .addFetch(a_topic,a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might needto be increased if large batches are written to Kafka

                  .build();

           FetchResponsefetchResponse = consumer.fetch(req);

 

           if(fetchResponse.hasError()) {

              numErrors++;

              // Something wentwrong!

              short code =fetchResponse.errorCode(a_topic, a_partition);

              System.out.println("Errorfetching data from the Broker:"

                     + leadBroker +" Reason: " + code);

              if (numErrors > 5)

                  break;

              if (code == ErrorMapping.OffsetOutOfRangeCode()) {

                  // We asked for aninvalid offset. For simple case ask for

                  // the last elementto reset

                  readOffset =getLastOffset(consumer, a_topic, a_partition,

                         kafka.api.OffsetRequest.LatestTime(),clientName);

                  continue;

              }

              consumer.close();

              consumer = null;

              leadBroker =findNewLeader(leadBroker, a_topic, a_partition,

                     a_port);

              continue;

           }

           numErrors = 0;

 

           long numRead = 0;

           for (MessageAndOffsetmessageAndOffset : fetchResponse.messageSet(

                  a_topic,a_partition)) {

              long currentOffset =messageAndOffset.offset();

              if (currentOffset <readOffset) {

                  System.out.println("Foundan old offset: " + currentOffset

                         + " Expecting: " + readOffset);

                  continue;

              }

              readOffset =messageAndOffset.nextOffset();

              ByteBuffer payload =messageAndOffset.message().payload();

 

              byte[] bytes = newbyte[payload.limit()];

              payload.get(bytes);

              // ===这里就是反序列化=======================================================

              User user = (User)BeanUtils.bytes2Object(bytes);

              System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + user);

              //=========================================================================

              numRead++;

              a_maxReads--;

           }

 

           if (numRead == 0) {

              try {

                  Thread.sleep(1000);

              } catch(InterruptedException ie) {

              }

           }

       }

       if (consumer != null)

           consumer.close();

    }

 

    public static longgetLastOffset(SimpleConsumer consumer, String topic,

           int partition, longwhichTime, String clientName) {

       TopicAndPartitiontopicAndPartition = new TopicAndPartition(topic,

              partition);

       Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();

       requestInfo.put(topicAndPartition,new PartitionOffsetRequestInfo(

              whichTime, 1));

       kafka.javaapi.OffsetRequestrequest = new kafka.javaapi.OffsetRequest(

              requestInfo,kafka.api.OffsetRequest.CurrentVersion(),

              clientName);

       OffsetResponse response =consumer.getOffsetsBefore(request);

 

       if (response.hasError()) {

           System.out

                  .println("Errorfetching data Offset Data the Broker. Reason: "

                         + response.errorCode(topic,partition));

           return 0;

       }

       long[] offsets =response.offsets(topic, partition);

       return offsets[0];

    }

 

    private StringfindNewLeader(String a_oldLeader, String a_topic,

           int a_partition, inta_port) throws Exception {

       for (int i = 0; i < 3; i++){

           boolean goToSleep = false;

           PartitionMetadata metadata= http://www.mamicode.com/findLeader(m_replicaBrokers, a_port,

                  a_topic,a_partition);

           if (metadata =http://www.mamicode.com/= null) {

              goToSleep = true;

           } else if(metadata.leader() == null) {

              goToSleep = true;

           } else if(a_oldLeader.equalsIgnoreCase(metadata.leader().host())

                  && i == 0){

              // first time throughif the leader hasn‘t changed give

              // ZooKeeper a secondto recover

              // second time, assumethe broker did recover before failover,

              // or it was anon-Broker issue

              //

              goToSleep = true;

           } else {

              returnmetadata.leader().host();

           }

           if (goToSleep) {

              try {

                  Thread.sleep(1000);

              } catch(InterruptedException ie) {

              }

           }

       }

       System.out

              .println("Unableto find new leader after Broker failure. Exiting");

       throw new Exception(

              "Unable to findnew leader after Broker failure. Exiting");

    }

 

    private PartitionMetadatafindLeader(List<String> a_seedBrokers,

           int a_port, Stringa_topic, int a_partition) {

       PartitionMetadatareturnMetaData = http://www.mamicode.com/null;

       loop: for (String seed :a_seedBrokers) {

           SimpleConsumer consumer =null;

           try {

              consumer = newSimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");

              List<String>topics = Collections.singletonList(a_topic);

              TopicMetadataRequestreq = new TopicMetadataRequest(topics);

              kafka.javaapi.TopicMetadataResponseresp = consumer.send(req);

 

              List<TopicMetadata>metaData = http://www.mamicode.com/resp.topicsMetadata();

              for (TopicMetadata item: metaData) {

                  for(PartitionMetadata part : item.partitionsMetadata()) {

                     if(part.partitionId() == a_partition) {

                         returnMetaData= http://www.mamicode.com/part;

                         break loop;

                     }

                  }

              }

           } catch (Exception e) {

              System.out.println("Errorcommunicating with Broker [" + seed

                     + "] tofind Leader for [" + a_topic + ", "

                     + a_partition +"] Reason: " + e);

           } finally {

              if (consumer != null)

                  consumer.close();

           }

       }

       if (returnMetaData != null) {

           m_replicaBrokers.clear();

           for (kafka.cluster.Brokerreplica : returnMetaData.replicas()) {

              m_replicaBrokers.add(replica.host());

           }

       }

       return returnMetaData;

    }

}

# 结束感言

搞完了终于 ,整理这东西真浪费时间,不过要是你遇到了这个问题,能帮助你就好,觉得好的话就帮忙顶一下吧,反正又不会怀孕 、、、

本博文中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。

本博文《_00023 Kafka 诡异操作_001自定义Encoder实现Class级别的数据传送以及解析》中的Eclipse工程代码下载地址 http://download.csdn.net/detail/u012185296/7633405

# Time2014-07-18 11:08:22