首页 > 代码库 > Spark SQL UDF

Spark SQL UDF

目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst 。

在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找

EmptyFunctionRegistry 中lookup 只是抛出一个异常。

所以自定义了一个 FunctionRegistry ,SqlContext

@transient
  protected[sql]lazyval analyzer:Analyzer =
    newAnalyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)

class UDFRgistry extends FunctionRegistry {
    def lookupFunction(name: String, children: Seq[Expression]): Expression = {
      name.toLowerCase match {
        case "col_set" =>Collect(children(0))
        case "array" =>Array(children(0))
        case "contains" =>Contains(children)
        case _ => throw new UnsupportedOperationException
      }
    }
  }
 
class SparkSqlContext(val spctx: SparkContext) extends SQLContext(spctx) {
  @transient
  override lazy val analyzer: Analyzer =
    new Analyzer(catalog, new UDF.UDFRgistry, caseSensitive = true)
}

这样就可以找到自定义的函数了。

Spark SQL UDF