首页 > 代码库 > spark使用KryoRegistrator java代码示例

spark使用KryoRegistrator java代码示例

转载引用自:http://www.cnblogs.com/tovin/p/3833985.html

最近在使用spark开发过程中发现当数据量很大时,如果cache数据将消耗很多的内存。为了减少内存的消耗,测试了一下 Kryo serialization的使用

代码包含三个类,KryoTest、MyRegistrator、Qualify。

 我们知道在Spark默认使用的是Java自带的序列化机制。如果想使用Kryo serialization,只需要添加KryoTest类中的红色部分,指定spark序列化类

另外还需要增加MyRegistrator类,注册需要用Kryo序列化的类

public class KryoTest {    public static void main(String[] args) {        SparkConf conf = new SparkConf();        conf.setMaster("local");        conf.setAppName("KryoTest");        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");        conf.set("spark.kryo.registrator", "MyRegistrator");                JavaSparkContext sc = new JavaSparkContext(conf);        JavaRDD<String> rdd = sc.textFile("/home/hdpusr/qualifying.txt");        JavaRDD<Qualify> map = rdd.map(new Function<String, Qualify>() {            /* (non-Javadoc)             * @see org.apache.spark.api.java.function.Function#call(java.lang.Object)             */            public Qualify call(String v1) throws Exception {                // TODO Auto-generated method stub                String s[] =  v1.split(",");                Qualify q = new Qualify();                q.setA(Integer.parseInt(s[0]));                q.setB(Long.parseLong(s[1]));                q.setC(s[2]);                                                return q;            }        });        map.persist(StorageLevel.MEMORY_AND_DISK_SER());        System.out.println(map.count());    }}
import org.apache.spark.serializer.KryoRegistrator;import com.esotericsoftware.kryo.Kryo;public class MyRegistrator implements KryoRegistrator{    /* (non-Javadoc)     * @see org.apache.spark.serializer.KryoRegistrator#registerClasses(com.esotericsoftware.kryo.Kryo)     */    public void registerClasses(Kryo arg0) {        // TODO Auto-generated method stub        arg0.register(Qualify.class);    }}
import java.io.Serializable;public class Qualify implements Serializable{    int a;    long b;    String c;    public int getA() {        return a;    }    public void setA(int a) {        this.a = a;    }    public long getB() {        return b;    }    public void setB(long b) {        this.b = b;    }    public String getC() {        return c;    }    public void setC(String c) {        this.c = c;    }    }

下面我们看看使用Java serialization 与Kryo serialization的效果对比

Java serialization

技术分享  

 

Kryo serialization

技术分享

从实际跑的数据可以看出还是能节省不少内存的。当内存不够用的时候建议使用Kryo serialization这种方式

spark使用KryoRegistrator java代码示例