首页 > 代码库 > 如何编写自定义hive UDF函数

如何编写自定义hive UDF函数

Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:

UDF:操作单个数据行,产生单个数据行;

UDAF:操作多个数据行,产生一个数据行。

UDTF:操作一个数据行,产生多个数据行一个表作为输出。

 

用户构建的UDF使用过程如下:

第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。

UDF实例参见http://svn.apache.org/repos/asf/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udf/example/UDFExampleAdd.java

package org.apache.hadoop.hive.contrib.udf.example;import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDF;/** * UDFExampleAdd. * *///UDF是作用于单个数据行,产生一个数据行 //用户必须要继承UDF,且必须至少实现一个evalute方法,该方法并不在UDF中 //但是Hive会检查用户的UDF是否拥有一个evalute方法@Description(name = "example_add", value = "http://www.mamicode.com/_FUNC_(expr) - Example UDAF that returns the sum")public class UDFExampleAdd extends UDF {//实现具体逻辑  public Integer evaluate(Integer... a) {    int total = 0;    for (Integer element : a) {      if (element != null) {        total += element;      }    }    return total;  }  public Double evaluate(Double... a) {    double total = 0;    for (Double element : a) {      if (element != null) {        total += element;      }    }    return total;  }}

UDAF实例参见

http://svn.apache.org/repos/asf/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java

package org.apache.hadoop.hive.contrib.udaf.example;import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;/** * This is a simple UDAF that calculates average. *  * It should be very easy to follow and can be used as an example for writing * new UDAFs. *  * Note that Hive internally uses a different mechanism (called GenericUDAF) to * implement built-in aggregation functions, which are harder to program but * more efficient. *  *///UDAF是输入多个数据行,产生一个数据行 //用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类@Description(name = "example_avg",value = "http://www.mamicode.com/_FUNC_(col) - Example UDAF to compute average")public final class UDAFExampleAvg extends UDAF {  /**   * The internal state of an aggregation for average.   *    * Note that this is only needed if the internal state cannot be represented   * by a primitive.   *    * The internal state can also contains fields with types like   * ArrayList<String> and HashMap<String,Double> if needed.   */  public static class UDAFAvgState {    private long mCount;    private double mSum;  }  /**   * The actual class for doing the aggregation. Hive will automatically look   * for all internal classes of the UDAF that implements UDAFEvaluator.   */  public static class UDAFExampleAvgEvaluator implements UDAFEvaluator {    UDAFAvgState state;    public UDAFExampleAvgEvaluator() {      super();      state = new UDAFAvgState();      init();    }    /**     * Reset the state of the aggregation.	 * 重置聚合过程的状态     */    public void init() {      state.mSum = 0;      state.mCount = 0;    }    /**     * Iterate through one row of original data.     *      * The number and type of arguments need to the same as we call this UDAF     * from Hive command line.     *      * This function should always return true.	 * 在原始值的一行数据上进行迭代     * 参数的个数和类型需与hive命令行中调用该UDF的参数相同。	 * 这个函数应当总是返回true     */    public boolean iterate(Double o) {      if (o != null) {        state.mSum += o;        state.mCount++;      }      return true;    }    /**     * Terminate a partial aggregation and return the state. If the state is a     * primitive, just return primitive Java classes like Integer or String.     */	//Hive需要部分聚集结果的时候会调用该方法     //会返回一个封装了聚集计算当前状态的对象    public UDAFAvgState terminatePartial() {      // This is SQL standard - average of zero items should be null.      return state.mCount == 0 ? null : state;    }    /**     * Merge with a partial aggregation.     *      * This function should always have a single argument which has the same     * type as the return value of terminatePartial().     */	//合并两个部分聚集值会调用这个方法    public boolean merge(UDAFAvgState o) {      if (o != null) {        state.mSum += o.mSum;        state.mCount += o.mCount;      }      return true;    }    /**     * Terminates the aggregation and return the final result.	 * 终止聚合过程,返回最终结果     */	//Hive需要最终聚集结果时候会调用该方法    public Double terminate() {      // This is SQL standard - average of zero items should be null.      return state.mCount == 0 ? null : Double.valueOf(state.mSum          / state.mCount);    }  }  private UDAFExampleAvg() {    // prevent instantiation  }}

第二步:将写好的UDF函数注册到Hive中,具体有下面两种方法。

方法一

(1)将写好的类打包为jar。

(2)进入到Hive外壳环境中,利用add jar 注册该jar文件

(3)为该类起一个别名,用于查询使用。

参考命令见下:

add jar UDFExample.jar //注册jarcreate temporary function my_add as ‘org.apache.hadoop.hive.contrib.udf.example. UDFExampleAdd‘;  // UDF只是为这个Hive会话临时定义的create temporary function my_avg as ‘org.apache.hadoop.hive.contrib.udaf.example. UDAFExampleAvg‘; 

但这种方法注册的UDF只有在当前Hive会话中生效。如果想永久生效,可在Hive源码中注册该UDF函数,具体见方法二 

方法二

(1)在org.apache.hadoop.hive.ql.exec.FunctionRegistry中注册UDF函数

registerUDF("my_add", UDFExampleAdd.class, false);registerUDAF("my_avg", UDAFExampleAvg.class);

(2)打包编译Hive源码包

(3)部署Hive包和UDF包,将UDF包放在Hive的ClassPath中即可。

如何编写自定义hive UDF函数