文章目录
  1. Oozie-9_自定义UDF和Action节点
  2. 1. 自定义UDF
  3. 2. 自定义Action节点

[TOC]

Oozie-9_自定义UDF和Action节点

1. 自定义UDF

有的时候oozie提供的EL函数没法满足需求,这个时候就需要对oozie的udf进行扩展。

  • 新建maven工程项目,这里的pom最好使用oozie项目自身的pom(在oozie的目录下可以找到)
  • 然后我们可以新建一个静态方法,比如我们需要一个获取时间字符串的函数,如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 代码随便看看即可,关键是一个静态方法

public class TerminusCoordELFunctions {

private static final String defaultValueFormatter = "YYYY-MM-dd";

/**
* @param inValueFormat 输入日期格式,不输入默认为 "YYYY-MM-DD"
* @param inValue 输入日期的值,不输入从当前时间
* @param outFormat 输出格式,不输入默认为 "YYYY-MM-DD"
* @param interval 时间间隔
* @param intervalType 'S' 秒 'MIN' 分钟 'H' 时 'D' 天 'MON' 月 'Y'年
* @return
*/
public static String getDateFromNow(String inValueFormat, String inValue, String outFormat, int interval, String intervalType) {
if (Strings.isNullOrEmpty(inValueFormat)) {
inValueFormat = defaultValueFormatter;
}
DateTime inTime = DateTime.now();
if (!Strings.isNullOrEmpty(inValue)) {
inTime = DateTimeFormat.forPattern(inValueFormat).parseDateTime(inValue);
}
DateTime outTime = null;

if (!Strings.isNullOrEmpty(intervalType)) {
if (Objects.equal(intervalType.toUpperCase(), "S")) {
outTime = inTime.plusSeconds(interval);
} else if (Objects.equal(intervalType.toUpperCase(), "MIN")) {
outTime = inTime.plusMinutes(interval);
} else if (Objects.equal(intervalType.toUpperCase(), "H")) {
outTime = inTime.plusHours(interval);
} else if (Objects.equal(intervalType.toUpperCase(), "D")) {
outTime = inTime.plusDays(interval);
} else if (Objects.equal(intervalType.toUpperCase(), "MON")) {
outTime = inTime.plusMonths(interval);
} else if (Objects.equal(intervalType.toUpperCase(), "Y")) {
outTime = inTime.plusYears(interval);
}
} else {
outTime = inTime.plusDays(interval);
}

if (Strings.isNullOrEmpty(outFormat)) {
outFormat = defaultValueFormatter;
}

return outTime.toString(outFormat);
}
}
  • 打包项目,然后把jar包放到oozie-server的运行环境中。
1
2
3
Cloudera Manager平台上传到安装oozie那台机器的 `/var/lib/oozie/` 目录

* 注: `非Cloudera Manager平台`直接上传到 `$OOZIE_HOME/oozie-server/webapps/oozie/WEB-INF/lib/` 即可。
  • oozie-site.xml进行配置
1
2
3
4
5
6
7
8
9
<!-- oozie EL function ext -->
<property>
<name>oozie.service.ELService.ext.functions.workflow</name>
<value>
horus:getDateFromNow=io.terminus.horus.oozie.udf.TerminusCoordELFunctions#getDateFromNow
</value>
</property>

## 当然这个是针对workflow的,还有其他的配置参考 oozie-default.xml 文件中
  • 这样你就可以在workflow.xml中使用了, 如${horus:getDateFromNow('', '', 'yyyy-MM-dd', -1, 'D')}

2. 自定义Action节点

有的时候发现oozie提供的Action不够用,那么就需要进行扩展。

过程

和自定义UDF一样, 创建一个maven项目, pom同上。

  • 新建一个类继承ActionExecutor, 然后对里面的覆盖方法进行编写。
  • resources目录新建一个xml schema文件。可以参考其他节点的schema文件, 比如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:custom="uri:custom:sshfs-action:0.1"
elementFormDefault="qualified"
targetNamespace="uri:custom:sshfs-action:0.1">
<xs:complexType name="sshfs">
<xs:sequence>
<xs:element name="protocol" type="xs:string"/>
<xs:element name="name-node" type="xs:string"/>
<xs:element name="file-path" type="xs:string"/>
<xs:element name="des-key" type="xs:string"/>
<xs:element name="file-date" type="xs:string"/>
<xs:element name="hive-url" type="xs:string"/>
<xs:element name="hive-table" type="xs:string"/>
<xs:element name="ssh-host" type="xs:string"/>
<xs:element name="ssh-port" type="xs:string"/>
<xs:element name="ssh-username" type="xs:string"/>
<xs:element name="ssh-password" type="xs:string"/>
<xs:element name="excel-password" type="xs:string"/>
<xs:element name="file-type" type="xs:string"/>
<xs:element name="split" type="xs:string"/>
</xs:sequence>
</xs:complexType>
<xs:element name="sshfs" type="custom:sshfs"/>
</xs:schema>
  • 最后打包项目,同样放到oozie的运行环境中。
  • 配置 oozie-site.xml,如下
1
2
3
4
5
6
7
8
<property>
<name>oozie.service.ActionService.executor.ext.classes</name>
<value>io.terminus.horus.oozie.udn.nodes.SshFsExecutor</value>
</property>
<property>
<name>oozie.service.SchemaService.wf.ext.schemas</name>
<value>sshfs.xsd</value>
</property>
  • 最后你就可以使用自定节点了,比如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<sshfs xmlns="uri:custom:sshfs-action:0.1">
<protocol>http</protocol>
<name-node>hdfs://xr-test-namenode:8020</name-node>
<file-path>http://jfopr.haier.com/order/other/getSuccessOrderXls?dateStr=${horus:getDateFromNow('', '', 'yyyyMMdd', 0, 'D')}</file-path>
<des-key>%24%3DX4uY6%3B%5E4VADN*23A8%23h%3E%5D%294%24wg76V2</des-key>
<file-date>null</file-date>
<hive-url>jdbc:hive2://xr-test-namenode:10000/default</hive-url>
<hive-table>s_haibei_order</hive-table>
<ssh-host></ssh-host>
<ssh-port></ssh-port>
<ssh-username></ssh-username>
<ssh-password></ssh-password>
<excel-password>null</excel-password>
<file-type>excel</file-type>
<split>null</split>
</sshfs>
文章目录
  1. Oozie-9_自定义UDF和Action节点
  2. 1. 自定义UDF
  3. 2. 自定义Action节点