首页 > 代码库 > Spark SQL UDF
Spark SQL UDF
目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst 。
在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找
EmptyFunctionRegistry 中lookup 只是抛出一个异常。
所以自定义了一个 FunctionRegistry ,SqlContext
@transient protected[sql]lazyval analyzer:Analyzer = newAnalyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)
class UDFRgistry extends FunctionRegistry { def lookupFunction(name: String, children: Seq[Expression]): Expression = { name.toLowerCase match { case "col_set" =>Collect(children(0)) case "array" =>Array(children(0)) case "contains" =>Contains(children) case _ => throw new UnsupportedOperationException } } } class SparkSqlContext(val spctx: SparkContext) extends SQLContext(spctx) { @transient override lazy val analyzer: Analyzer = new Analyzer(catalog, new UDF.UDFRgistry, caseSensitive = true) }
这样就可以找到自定义的函数了。
Spark SQL UDF
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。