首页 > 代码库 > [Hive]Hive自定义函数UDF

[Hive]Hive自定义函数UDF

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数

用户自定义函数(user defined function),针对单条记录。 

编写一个UDF,需要继承UDF类,并实现evaluate()函数。在查询执行过程中,查询中对应的每个应用到这个函数的地方都会对这个类进行实例化。对于每行输入都会调用到evaluate()函数。而evaluate()函数处理的值会返回给Hive。同时用户是可以重载evaluate方法的。Hive会像Java的方法重载一样,自动选择匹配的方法。

1. 自定义Java类

下面自定义一个Java类OperationAddUDF,实现了Int,Double,Float以及String类型的加法操作。

  1. package com.sjf.open.hive.udf;
  2. import org.apache.hadoop.hive.ql.exec.UDF;
  3. import org.apache.hadoop.hive.serde2.ByteStream;
  4. import org.apache.hadoop.hive.serde2.io.DoubleWritable;
  5. import org.apache.hadoop.hive.serde2.lazy.LazyInteger;
  6. import org.apache.hadoop.io.FloatWritable;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. /**
  10. * Created by xiaosi on 16-11-19.
  11. */
  12. public class OperationAddUDF extends UDF {
  13.    private final ByteStream.Output out = new ByteStream.Output();
  14.    /**
  15.     * IntWritable
  16.     * @param num1
  17.     * @param num2
  18.     * @return
  19.     */
  20.    public IntWritable evaluate(IntWritable num1, IntWritable num2){
  21.        if(num1 == null || num2 == null){
  22.            return null;
  23.        }
  24.        return new IntWritable(num1.get() + num2.get());
  25.    }
  26.    /**
  27.     * DoubleWritable
  28.     * @param num1
  29.     * @param num2
  30.     * @return
  31.     */
  32.    public DoubleWritable evaluate(DoubleWritable num1, DoubleWritable num2){
  33.        if(num1 == null || num2 == null){
  34.            return null;
  35.        }
  36.        return new DoubleWritable(num1.get() + num2.get());
  37.    }
  38.    /**
  39.     * FloatWritable
  40.     * @param num1
  41.     * @param num2
  42.     * @return
  43.     */
  44.    public FloatWritable evaluate(FloatWritable num1, FloatWritable num2){
  45.        if(num1 == null || num2 == null){
  46.            return null;
  47.        }
  48.        return new FloatWritable(num1.get() + num2.get());
  49.    }
  50.    /**
  51.     * Text
  52.     * @param num1
  53.     * @param num2
  54.     * @return
  55.     */
  56.    public Text evaluate(Text num1, Text num2){
  57.        if(num1 == null || num2 == null){
  58.            return null;
  59.        }
  60.        try{
  61.            Integer n1 = Integer.valueOf(num1.toString());
  62.            Integer n2 = Integer.valueOf(num2.toString());
  63.            Integer result = n1 + n2;
  64.            out.reset();
  65.            LazyInteger.writeUTF8NoException(out, result);
  66.            Text text = new Text();
  67.            text.set(out.getData(), 0, out.getLength());
  68.            return text;
  69.        }
  70.        catch (Exception e){
  71.            return null;
  72.        }
  73.    }
  74. }

UDF中evaluate()函数的参数和返回值类型只能是Hive可以序列化的数据类型。例如,如果用户处理的全是数值,那么UDF的输出参数类型可以是基本数据类型int,Integer封装的对象或者是一个IntWritable对象,也就是Hadoop对整型封装后的对象。用户不需要特别的关心将调用到哪个类型,因为当类型不一致的时候,Hive会自动将数据类型转换成匹配的类型。null值在Hive中对于任何数据类型都是合法的,但是对于Java基本数据类型,不能是对象,也不能是null。

2 Hive中使用

如果想在Hive中使用UDF,那么需要将Java代码进行编译,然后将编译后的UDF二进制类文件打包成一个Jar文件。然后,在Hive会话中,将这个Jar文件加入到类路径下,在通过CREATE FUNCTION 语句定义好使用这个Java类的函数:

2.1 添加Jar文件到类路径下
  1. hive (test)> add jar /home/xiaosi/open-hive-1.0-SNAPSHOT.jar;
  2. Added [/home/xiaosi/open-hive-1.0-SNAPSHOT.jar] to class path
  3. Added resources: [/home/xiaosi/open-hive-1.0-SNAPSHOT.jar]

需要注意的是,Jar文件路径是不需要用引号括起来的,同时,到目前为止这个路径需要是当前文件系统的全路径。Hive不仅仅将这个Jar文件加入到classpath下,同时还将其加入到分布式缓存中,这样整个集群的机器都是可以获得该Jar文件的。

2.2 创建函数add
  1. hive (test)> create temporary function add as ‘com.sjf.open.hive.udf.OperationAddUDF‘;
  2. OK
  3. Time taken: 0.004 seconds

注意的是create temporary function语句中的temporary关键字,当前会话中声明的函数只会在当前会话中有效。因此用户需要在每个会话中都增加Jar文件然后创建函数。不过如果用户需要频繁的使用同一个Jar文件和函数的话,那么可以将相关语句增加到$HOME/.hiverc文件中去。

2.3 使用

现在这个数值相加函数可以像其他的函数一样使用了。

  1. hive (test)> select add(12, 34) from employee_part;
  2. OK
  3. 46
  4. Time taken: 0.078 seconds, Fetched: 1 row(s)
  5. hive (test)> select add(12.3, 20.1) from employee_part;
  6. OK
  7. 32.400000000000006
  8. Time taken: 0.098 seconds, Fetched: 1 row(s)
  9. hive (test)> select add("12", "45") from employee_part;
  10. OK
  11. 57
  12. Time taken: 0.077 seconds, Fetched: 1 row(s)
2.4 删除UDF

当我们使用完自定义UDF后,我们可以通过如下命令删除此函数:

  1. hive (test)> drop temporary function if exists add;

来源于《Hive 编程指南》



[Hive]Hive自定义函数UDF