文章目录
  1. Spark Custom Window Functions (UDWF)
    1. 1. 简介
    2. 2. 基于函数的方式自定义
    3. 3. 基于SQL的方式使用
      1. 3.1 编写my_row_number函数
      2. 3.2 注册MyRowNumber
      3. 3.3 注意点(坑)
      4. 3.4 使用
    4. 4. 总结

[TOC]

Spark Custom Window Functions (UDWF)

1. 简介

Spark可以自定义UDF、UDAF、UDTF等自定义函数。当然也可以使用开窗函数WindowFunction(如row_number、first_value、lead等)。

关于一些内置函数的使用, 这里就不阐述了。后面将讲诉如何自定义(官方没提供,得看源码了解机制。。。)

注意: 写文章的时候, spark版本是2.1.x。spark2.3.x以及2.4.x的API有所不同(未测试可行性)

2. 基于函数的方式自定义

这里可以直接参考:http://blog.nuvola-tech.com/2017/10/spark-custom-window-function-for-sessionization/

描述的很清楚,基于Spark Core的方式来使用。

3. 基于SQL的方式使用

自定义函数的编写和使用,刚刚的文章其实已经描述的比较清楚了。这里我将以最简单的row_number为例,来注册给Spark的udf,然后在sql中来使用。

3.1 编写my_row_number函数

1
2
3
4
case class MyRowNumber() extends RowNumberLike {
override val evaluateExpression = rowNumber
override def prettyName: String = "my_row_number"
}

这里直接抄的 org.apache.spark.sql.catalyst.expressions.RowNumber 类。

3.2 注册MyRowNumber

这里需要查看原生的RowNumber是如何注册的,它是在FunctionRegistry中进行注册的:

然后再来看看expressions里面是啥,这里面其实就是一个个函数的表达式,我们找到row_number

OK!其实最关键的就是看这个expression是咋实现的,我们就可以进行注册了。

1
2
3
4
5
6
7
8
FunctionRegistry.builtin.registerFunction(
"my_row_number",
new ExpressionInfo(MyRowNumber.getClass.getCanonicalName, "my_row_number"),
// 这个可以参照 FunctionRegistry#expression 的编写, 我这里比较简单, 没有函数入参
e => {
MyRowNumber()
}
)

3.3 注意点(坑)

这里有一点需要注意,原先注册udf是这样定义的:

1
spark.udf.register("my_udf", MyUDF _)

查看源码,其实他也是通过 FunctionRegistry 来进行注册的。(辛亏spark.udf是lazy的)

所以UDWF的注册一定要放在spark.udf.register的注册之前,不然UDWF无法成功注册进去(因为是copy的一份,可以看下源码)

3.4 使用

1
2
3
4
5
6
7
8
9
10
11
select 
id,
age,
my_row_number() over(partition by id order by age) rn
from (
select 1 id, 10 age
union all
select 2 id, 10 age
union all
select 1 id, 20 age
) t

4. 总结

总体来说还是比较简单的,更复杂的UDWF(比如含有入参)可以自寻摸索。我从官方jira没看到相关的issue,如果有更好的注册方式,麻烦告知。

文章目录
  1. Spark Custom Window Functions (UDWF)
    1. 1. 简介
    2. 2. 基于函数的方式自定义
    3. 3. 基于SQL的方式使用
      1. 3.1 编写my_row_number函数
      2. 3.2 注册MyRowNumber
      3. 3.3 注意点(坑)
      4. 3.4 使用
    4. 4. 总结