文章目录
  1. Oozie-3_工作流程定义
  2. 1. 简介
  3. 2. 流程定义
    1. 2.1 控制节点
      1. 2.1.1 start 节点
      2. 2.1.2 end 节点
      3. 2.1.3 kill 节点
      4. 2.1.4 decision 节点
      5. 2.1.5 fork 和 join 节点
    2. 2.2 动作节点
      1. 2.2.1 Action Basics 基本特性
      2. 2.2.2 Map-Reduce Action
      3. 2.2.4 Pig Action
      4. 2.2.5 Hive Action
      5. 2.2.6 Fs (HDFS) action
      6. 2.2.7 SSH Action
      7. 2.2.8 Sub-workflow Action
      8. 2.2.9 Java Action
      9. 2.2.10 Email Action
      10. 2.2.11 Spark 节点
        1. 2.2.12 spark执行python
  4. 3. 流程定义中的参数化(EL 表达式)
    1. 3.1 变量
    2. 3.2 方法
  5. 4. 工作流回调
  6. 5. 工作流程部署

Oozie-3_工作流程定义

[TOC]

1. 简介

Workflow Application

  • DAG(Directed Acyclical Graphs), 有向无环图。
    • 如果存在Cycle, Oozie会立即fail。
  • HPDL:Hadoop Process Definition Language(xml)
  • 流程定义组成:控制流节点动作节点
  • xml中支持 Java EL 表达式
  • 支持action节点的start/end/failure 和 workflow end/failure的 HTTP 回调

每个workflow job的包括的状态:PREP , RUNNING , SUSPENDED , SUCCEEDED , KILLED and FAILED

生命周期:

–>
–> PREP
PREP –> RUNNING、KILLED
RUNNING –> SUSPENDED 、 SUCCEEDED 、 KILLED 、 FAILED
SUSPENDED –> RUNNING 、 KILLED

2. 流程定义

一个流程定义包含 control flow nodesaction nodes , 节点之间通过 箭头连线 连接。

  • 控制流节点:start, end, decision, fork, join, kill
  • 动作节点:map-reduce, pig, hive 等等。

2.1 控制节点

2.1.1 start 节点

  1. start节点是流程定义的开始节点,必须存在且只能存在一个。
  2. 当流程启动的时候,会自动流向start节点定义的下一个节点。

语法:

1
2
3
4
5
6
7
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<start to="[NODE-NAME]"/>
...
</workflow-app>

# to:指向的下一个节点(节点名称)

例子:

1
2
3
4
5
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<start to="firstHadoopJob"/>
...
</workflow-app>

2.1.2 end 节点

  1. end节点是流程定义的结束节点,表示流程执行 成功 (SUCCEEDED)。
  2. 当多个任务指向end节点,一个action到达end的时候,其他actions就会被 kill,但是这种情况也会被视为 SUCCEEDED。
  3. end必须存在且只能存在一个。

语法:

1
2
3
4
5
6
7
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<end name="[NODE-NAME]"/>
...
</workflow-app>

# name:表示节点的名称

例子:

1
2
3
4
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<end name="end"/>
</workflow-app>

2.1.3 kill 节点

  1. kill节点允许workflow job执行 kill 操作。当job到达kill节点的时候,表示job执行出错。
  2. 同理当多个action指向kill节点,有一个到达,其他actions将被kill。
  3. kill节点可以有 0 或多个

语法:

1
2
3
4
5
6
7
8
9
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<kill name="[NODE-NAME]">
<message>[MESSAGE-TO-LOG]</message>
</kill>
...
</workflow-app>

# kill节点没有连线节点。

例子

1
2
3
4
5
6
7
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<kill name="killBecauseNoInput">
<message>Input unavailable</message>
</kill>
...
</workflow-app>

2.1.4 decision 节点

  1. decision节点允许流程对节点的流向做出选择,就行switch-case一样。
  2. 节点需要根据表达式(true or false)做出选择,表达式是JSP Expression Language(EL)。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<decision name="[NODE-NAME]">
<switch>
<case to="[NODE_NAME]">[PREDICATE]</case>
...
<case to="[NODE_NAME]">[PREDICATE]</case>
<default to="[NODE_NAME]"/>
</switch>
</decision>
...
</workflow-app>

# PREDICATE:EL表达式
# 必须有一个default节点,避免不必要的错误

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<decision name="mydecision">
<switch>
<case to="reconsolidatejob">
${fs:fileSize(secondjobOutputDir) gt 10 * GB}
</case>
<case to="rexpandjob">
${fs:filSize(secondjobOutputDir) lt 100 * MB}
</case>
<case to="recomputejob">
${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 }
</case>
<default to="end"/>
</switch>
</decision>
...
</workflow-app>

2.1.5 fork 和 join 节点

  1. fork允许将一条路径分支成多个路径执行。
  2. join等待当前 上个fork节点 的每个执行连线到达。
  3. fork和join必须 成对出现

语法:

1
2
3
4
5
6
7
8
9
10
11
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<fork name="[FORK-NODE-NAME]">
<path start="[NODE-NAME]" />
...
<path start="[NODE-NAME]" />
</fork>
...
<join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
...
</workflow-app>

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<fork name="forking">
<path start="firstparalleljob"/>
<path start="secondparalleljob"/>
</fork>
<action name="firstparallejob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<job-xml>job1.xml</job-xml>
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<action name="secondparalleljob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<job-xml>job2.xml</job-xml>
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<join name="joining" to="nextaction"/>
...
</workflow-app>

tips: Oozie默认提供fork和join的校验机制。如果你确保workflow能够执行,可以关闭forkjoin校验。
方式一(局部):在 job.properties 中指定 oozie.wf.validate.ForkJoin 为false
方式二(全局):在 oozie-site.xml 中指定 oozie.validate.ForkJoin 为false

2.2 动作节点

Action Nodes可以被认为是工作流触发的 computation(计算) / processing(处理) 任务。

2.2.1 Action Basics 基本特性

1、 远程执行

1
对于Oozie而言,这些触发的任务都是处在远程机器上的。

2、 异步执行

1
2
3
对于Oozie而言,任务的触发的都是异步的。所以Oozie必须等待Action执行完毕才能继续执行下一个Action Node. Oozie有两种方式去探测执行的任务: 回调 和 轮询
1. 回调:当一个任务执行,Oozie会提供一个唯一的回调URL,每个任务需要执行URL进行通知。
2. 轮询:有时候因为网络等原因,并不能成功进行URL回调。这是Oozie就会去轮询执行的任务。

3、OK 和 ERROR

1
2
3
每个Action都有只有两个节点流向:OK 和 ERROR
1. 如果任务成功执行完毕,就会转向 ok 节点
2. 否则转向 error 节点,并且需要向Oozie提供 error-code 和 error-message 等信息

4、恢复机制

1
2
3
4
5
Oozie 为执行失败的任务提供了恢复的机制。
1. 如果是失败是在状态转移的时候(如网络,远程机无法访问等),Oozie就会根据预定的重试时间间隔和重试次数去执行
2. 如果不是在状态转移发生异常,Job就会被Suspend,需要手动去恢复

如果一直重试还是不能解决问题,那么Oozie直接将任务流向 error 节点。

2.2.2 Map-Reduce Action

  1. Map-Reduce jobs可以预先对HDFS进行目录的清理和初始化。
  2. properties的配置可以配置在如下文件中,且后者覆盖前者
    1. the config-default.xml
    2. JobConf XML file bundled with the workflow application
    3. tag in workflow definition
    4. Inline map-reduce action configuration
  3. 配置都可以是 EL 表达式

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<map-reduce>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<streaming>
<mapper>[MAPPER-PROCESS]</mapper>
<reducer>[REDUCER-PROCESS]</reducer>
<record-reader>[RECORD-READER-CLASS]</record-reader>
<record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
...
<env>[NAME=VALUE]</env>
...
</streaming>
<!-- Either streaming or pipes can be specified for an action, not both -->
<pipes>
<map>[MAPPER]</map>
<reduce>[REDUCER]</reducer>
<inputformat>[INPUTFORMAT]</inputformat>
<partitioner>[PARTITIONER]</partitioner>
<writer>[OUTPUTFORMAT]</writer>
<program>[EXECUTABLE]</program>
</pipes>
<job-xml>[JOB-XML-FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</map-reduce>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

# name-node 和 job-tracker 都是必须的
# prepare 表示执行前的准备工作
# streaming 和 pipes: hadoop streaming和pipes的支持,只能指定其中一个
# job-xml:指向 JobConf 的 job.xml, 但是里面的配置会被 configuration节点覆盖。
```

> 例子:

* 标准example
* Streaming Example
* Pipes Example
* 参考:[Map-Reduce示例][2]

### 2.2.3 Sqoop Action

1. 运行一个Sqoop Job,需要配置 `job-tracker`, `name-node`, `command`和一些其他配置

> 语法:

```xml
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<command>[SQOOP-COMMAND]</command>
<arg>[SQOOP-ARGUMENT]</arg>
...
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</sqoop>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

## command:执行的sqoop命令,比如一条完整的命令:
* import --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir hdfs://localhost:8020/user/tucu/foo -m 1

## arg:可以将命令的一些参数写在里面,比如:
* <arg>import</arg>
* <arg>--connect</arg>
* <arg>jdbc:hsqldb:file:db.hsqldb</arg>
* <arg>--table</arg>
* <arg>TT</arg>
* <arg>--target-dir</arg>
* <arg>hdfs://localhost:8020/user/tucu/foo</arg>
* <arg>-m</arg>
* <arg>1</arg>

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app name="workflow-app-sqoop" xmlns="uri:oozie:workflow:0.2">
<start to="sqoop"/>
<action name="sqoop">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>localhost:8032</job-tracker>
<name-node>hdfs://localhost:9000</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<command> import --connect jdbc:mysql://localhost:3306/test --username root --password ****** --table users -m 1 --target-dir /mimosa/test/users </command>
</sqoop>
<ok to="end"/>
<error to="kill"/>
</action>
<kill name="kill">
<message> Map/Reduce failed, error message[\$\{wf:errorMessage(wf:lastErrorNode())}] </message>
</kill>
<end name="end"/>
</workflow-app>

# --password 的密码自行修改

注意:
1、由于使用到了mysql的数据源,需要将mysql的驱动包拷贝到Oozie sharelib的sqoop目录下。
2、运行实质就是执行 sqoop 的命令。

2.2.4 Pig Action

  1. Pig Job需要配置 job-tracker, name-node, pig script和一些其他配置
  2. 配置的一些说明基本与Map-Reduce一致
  3. script 中可以包含 ${var}, var 需要在 param节点中指定

语法(schema 0.2):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
...
<action name="[NODE-NAME]">
<pig>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[JOB-XML-FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<script>[PIG-SCRIPT]</script>
<param>[PARAM-VALUE]</param>
...
<param>[PARAM-VALUE]</param>
<argument>[ARGUMENT-VALUE]</argument>
...
<argument>[ARGUMENT-VALUE]</argument>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</pig>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

Example(schema 0.2):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.2">
...
<action name="myfirstpigjob">
<pig>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>oozie.action.external.stats.write</name>
<value>true</value>
</property>
</configuration>
<script>/mypigscript.pig</script>
<argument>-param</argument>
<argument>INPUT=${inputDir}</argument>
<argument>-param</argument>
<argument>OUTPUT=${outputDir}/pig-output3</argument>
</pig>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

语法 (schema 0.1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<pig>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[JOB-XML-FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<script>[PIG-SCRIPT]</script>
<param>[PARAM-VALUE]</param>
...
<param>[PARAM-VALUE]</param>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</pig>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

Example(schema 0.1):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstpigjob">
<pig>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<script>/mypigscript.pig</script>
<param>InputDir=/home/tucu/input-data</param>
<param>OutputDir=${jobOutput}</param>
</pig>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

2.2.5 Hive Action

Hive Action的配置基本与Pig Action一致

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[HIVE SETTINGS FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<script>[HIVE-SCRIPT]</script>
<param>[PARAM-VALUE]</param>
...
<param>[PARAM-VALUE]</param>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</hive>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>


## job-xml:hive的一些配置信息,一般为 $HIVE_HOME/conf/hive-site.xml
## script:hive的脚本文件

例子:

  • 作业目录:
1
2
3
- hive-site.xml
- hive.q
- workflow.xml
  • workflow.xml 文件内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app name="workflow-app-hive" xmlns="uri:oozie:workflow:0.2">
<start to="hive"/>
<action name="hive">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>localhost:8032</job-tracker>
<name-node>hdfs://localhost:9000</name-node>
<job-xml>hive-site.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<script>hive.q</script>
<param>inpath=/users.txt</param>
</hive>
<ok to="end"/>
<error to="kill"/>
</action>
<kill name="kill">
<message> Map/Reduce failed, error message[\$\{wf:errorMessage(wf:lastErrorNode())}] </message>
</kill>
<end name="end"/>
</workflow-app>
  • hive-site.xml 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?xml version="1.0" encoding="UTF-8" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>Username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<!-- 密码自行修改 -->
<value>******</value>
<description>password to use against metastore database</description>
</property>


<property>
<name>hive.exec.local.scratchdir</name>
<value>/usr/local/var/hive/iotmp</value>
<description>Local scratch space for Hive jobs</description>
</property>
<property>
<name>hive.downloaded.resources.dir</name>
<value>/usr/local/var/hive/${hive.session.id}_resources</value>
<description>Temporary local directory for added resources in the remote file system.</description>
</property>
<property>
<name>hive.querylog.location</name>
<value>/usr/local/var/hive/iotemp</value>
<description>Location of Hive run time structured log file</description>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/usr/local/var/hive/${system:user.name}/operation_logs</value>
<description>Top level directory where operation logs are stored if logging functionality is enabled</description>
</property>

</configuration>
  • hive.q文件
1
load data inpath '${inpath}' into table users;

注意事项:
1、类似于sqoop,如果元数据是存储在mysql中,那么需要在oozie的sharelib的hive下导入驱动包。
2、一个坑:由于我的Oozie使用的是CDH版本,需要在导入一个Jar包( hadoop-core-2.6.0-mr1-cdh5.7.0.jar 这个包应该能在Oozie目录下找到,自行搜索一把),否则报错找不到 MRVersion 的类(这个类比较坑,Apache的hadoop没有,CDH的版本才有)
3、运行实质是启动Hive的命令 hive -e "..." -hivevar ...

2.2.6 Fs (HDFS) action

Fs Action 可以通过 Workflow Application 操作 HDFS 中的文件和目录。支持的命令(节点)有 move , delete , mkdir , chmod , touchz and chgrp .

Important:如果某个命令在运行中失败,之前运行成功的命令将 不会回滚

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
...
<action name="[NODE-NAME]">
<fs>
<delete path='[PATH]'/>
...
<mkdir path='[PATH]'/>
...
<move source='[SOURCE-PATH]' target='[TARGET-PATH]'/>
...
<chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
...
<touchz path='[PATH]' />
...
<chgrp path='[PATH]' group='[GROUP]' dir-files='false' />
</fs>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

# delete:删除操作,如果是目录将递归删除,并最后删除目录。
# mkdir:如果目录存在将什么也不做
# move:移动操作
- source 必须存在
- target 的 PATH 中的父目录必须存在
- target PATH 如果是一个文件,那么目标文件不能存在(目录同理)
# chmod:修改权限
- permissions 可以是 符号表示(-rwxrw-rw-),或者数字表示(755)
- dir-files 如果是目录,是否影响目录下的文件权限
# chgrp:修改所组
# touchz:新建一个length为0的文件,如果文件存在执行touch操作

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.5">
...
<action name="hdfscommands">
<fs>
<delete path='hdfs://foo:8020/usr/tucu/temp-data'/>
<mkdir path='archives/${wf:id()}'/>
<move source='${jobInput}' target='archives/${wf:id()}/processed-input'/>
<chmod path='${jobOutput}' permissions='-rwxrw-rw-' dir-files='true'><recursive/></chmod>
<chgrp path='${jobOutput}' group='testgroup' dir-files='true'><recursive/></chgrp>
</fs>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

同时支持schema 0.4 中的 name-node 等节点。参考 Global Configurations

2.2.7 SSH Action

Ssh Action在远程机器上的后台运行一个 secure shell

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<ssh>
<host>[USER]@[HOST]</host>
<command>[SHELL]</command>
<args>[ARGUMENTS]</args>
...
<capture-output/>
</ssh>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

# capture-output:如果配置,表明Oozie可以捕获 SSH 命令的输出,输出必须是 Java Properties的格式且大小不能超过2KB。这些输出可以被 decision nodes 使用。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myssjob">
<ssh>
<host>foo@bar.com<host>
<command>uploaddata</command>
<args>jdbc:derby://bar.com:1527/myDB</args>
<args>hdfs://foobar.com:8020/usr/tucu/myData</args>
</ssh>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

2.2.8 Sub-workflow Action

Sub-workflow Action 运行一个子流程,子流程job可以运行在其他的Oozie系统中。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<sub-workflow>
<app-path>[WF-APPLICATION-PATH]</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
</sub-workflow>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

# propagate-configuration:如果配置表明 父流程 的配置将传播给 子流程
# configuration:传播给 子流程 的配置

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="a">
<sub-workflow>
<app-path>child-wf</app-path>
<configuration>
<property>
<name>input.dir</name>
<value>${wf:id()}/second-mr-output</value>
</property>
</configuration>
</sub-workflow>
<ok to="end"/>
<error to="kill"/>
</action>
...
</workflow-app>

# child-wf :HDFS上的app-path,将会被运行在当前所在的Oozie系统中(http://myhost:11000/oozie),指定的工作流程必须已经部署

Tips:

  • libs的继承
    • 在 oozie-site.xml 指定 oozie.subworkflow.classpath.inheritance (boolean)
    • 在 job.properties 指定 oozie.wf.subworkflow.classpath.inheritance (boolean)
    • 如果同时配置,后者优先前者。后者默认是 false
  • 子流程递归的深度
    • 在 oozie-site.xml 指定 oozie.action.subworkflow.max.depth (数字,默认50)

2.2.9 Java Action

  1. Java Action 执行一个 main 方法,在 Hadoop 集群中被当做一个 Map-Reduce Job with a single Mapper task。
  2. 需要配置 job-tracker, name-node, main Java class, JVM options and arguments.
  3. Java Class 不能调用 System.exit(int n),不然会当 error 处理.
  4. 如果是在一个安全认证的集群中运行,Java Class需要指定Hadoop 授权令牌。比如:

    1
    2
    3
    4
    	// propagate delegation related props from launcher job to MR job
    if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
    jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
    }

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<java>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[JOB-XML]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<main-class>[MAIN-CLASS]</main-class>
<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
<arg>ARGUMENT</arg>
...
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
<capture-output />
</java>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstjavajob">
<java>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>org.apache.oozie.MyFirstMainClass</main-class>
<java-opts>-Dblah</java-opts>
<arg>argument1</arg>
<arg>argument2</arg>
</java>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

Tips: 可以配置 oozie.launcher.action.main.class 来覆盖 Main Class,而避免重新部署流程。

1
2
3
4
<property>
<name>oozie.launcher.action.main.class</name>
<value>org.my.CustomMain</value>
</property>

2.2.10 Email Action

Oozie 允许使用 email 节点进行邮件的发送,但是在发送之前需要对smtp等相关信息进行配置,在 oozie-site.xml 中配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<!-- email 设置 -->
<property>
<name>oozie.email.smtp.host</name>
<value>smtp.163.com</value>
<description>
The host where the email action may find the SMTP server (localhost by default).
</description>
</property>
<property>
<name>oozie.email.smtp.port</name>
<value>25</value>
<description>
The port to connect to for the SMTP server (25 by default).
</description>
</property>
<property>
<name>oozie.email.from.address</name>
<value>chenjhhx@163.com</value>
<description>
The from address to be used for mailing all emails (oozie@localhost by default).
</description>
</property>
<property>
<name>oozie.email.smtp.auth</name>
<value>true</value>
<description>
Boolean property that toggles if authentication is to be done or not. (false by default).
</description>
</property>
<property>
<name>oozie.email.smtp.username</name>
<value>chenjhhx@163.com</value>
<description>
If authentication is enabled, the username to login as (empty by default).
</description>
</property>
<property>
<!-- 可以用自己的账户进行测试 -->
<name>oozie.email.smtp.password</name>
<value>**********</value>
<description>
If authentication is enabled, the username's password (empty by default).
</description>
</property>

Tips: 在使用之前需要确保你的邮箱 smpt等相关服务 是打开的。这个可以进入你邮箱的设置里面进行设置。

语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<email xmlns="uri:oozie:email-action:0.2">
<to>[COMMA-SEPARATED-TO-ADDRESSES]</to>
<cc>[COMMA-SEPARATED-CC-ADDRESSES]</cc> <!-- cc is optional -->
<subject>[SUBJECT]</subject>
<body>[BODY]</body>
<content_type>[CONTENT-TYPE]</content_type> <!-- content_type is optional -->
</email>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

## to:表示发送给谁,多个以逗号分隔
## cc:表示抄送,与to同理
## subject:主题
## body:邮件内容
## content_type:内容的ContentType, 如可以是 text/html;charset=UTF-8(注意如果不填,中文是乱码)

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="an-email">
<email xmlns="uri:oozie:email-action:0.1">
<to>bob@initech.com,the.other.bob@initech.com</to>
<cc>will@initech.com</cc>
<subject>Email notifications for ${wf:id()}</subject>
<body>The wf ${wf:id()} successfully completed.</body>
<content_type>text/html;charset=UTF-8</content_type>
</email>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

2.2.11 Spark 节点

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
...
<action name="[NODE-NAME]">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[SPARK SETTINGS FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<master>[SPARK MASTER URL]</master>
<mode>[SPARK MODE]</mode>
<name>[SPARK JOB NAME]</name>
<class>[SPARK MAIN CLASS]</class>
<jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
<spark-opts>[SPARK-OPTIONS]</spark-opts>
<arg>[ARG-VALUE]</arg>
...
<arg>[ARG-VALUE]</arg>
...
</spark>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

### master: 可以指定spark的master url:spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local

### mode: 可以指定(client, cluster)。配合master参数一起使用,比如(master=yarn, mode=client 相当于master=yarn-client)

Tips

  • 一般spark可以来跑spark jar,指定class名称,选择jars,然后配置一些opts参数等等。如果spark是单机模式启动、可以指定为local或者sparkMaster的url。如果是 spark on yarn, 则需指定为 yarn-cluster
  • spark on yarn需要指定一些运行时的参数,可以通过 spark-opts 指定 --conf ... 或者在 oozie-site.xml指定spark的配置参数目录 oozie.service.SparkConfigurationService.spark.configurations。一般包括如下参数,
    • spark.yarn.historyServer.address=http://SPH-HOST:18088
    • spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
    • spark.eventLog.enabled=true

2.2.12 spark执行python

除了指定执行的jar之外,spark节点还可以执行Python脚本。比如:

1
2
3
4
5
6
7
8
9
10
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>haier-test-namenode:8032</job-tracker>
<name-node>hdfs://haier-test-namenode:8020</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>pyspark</name>
<class>/tmp/spark-yarn.py</class>
<jar>/tmp/spark-yarn.py</jar>
<spark-opts>--executor-memory 1G --executor-cores 2</spark-opts>
</spark>
  • 不过这样执行可能会出现如下问题:
1
2
3
4
5
java.util.NoSuchElementException: key not found: SPARK_HOME
## https://issues.apache.org/jira/browse/OOZIE-2482
## https://community.cloudera.com/t5/Batch-Processing-and-Workflow/quot-Failing-Oozie-Launcher-key-not-found-SPARK-HOME-quot/td-p/39830

## 试了很多解决方案,有的没有用。。
  • 解决方案:
1
2
3
4
5
6
7
## 在configuration中接入配置
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.env</name>
<value>SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark/</value>
</property>

## 个人感觉有点挫。。。

3. 流程定义中的参数化(EL 表达式)

3.1 变量

  1. 参数变量的定义使用 JSP Expression Language,比如 ${NAME}
  2. 变量名遵循一般变量的定义规则,需符合正则:[A-Za-z_][0-9A-Za-z_]*。比如 "job.tracker" 就是错误的定义。
  3. 参数也可以通过 xml 内联的方式引入,如:
1
2
3
4
5
6
7
8
9
10
11
12
<workflow-app name='hello-wf' xmlns="uri:oozie:workflow:0.4">
<parameters>
<property>
<name>inputDir</name>
</property>
<property>
<name>outputDir</name>
<value>out-dir</value>
</property>
</parameters>
...
</workflow-app>

3.2 方法

参考:

  1. Basic EL Constants And Functions
  2. Workflow EL Functions
  3. Hadoop EL Constants And Functions
  4. Hadoop Jobs EL Function
  5. HDFS EL Functions
  6. HCatalog EL Functions

4. 工作流回调

在一个 Workflow Job 或者 Action 的开始和结束,都可以配置回调的 HTTP GET URL。

参考:Workflow_Notifications

5. 工作流程部署

工作流程的应用需要上传到 HDFS 中,目录结构如下:

1
2
3
4
- /workflow.xml
- /config-default.xml
|
- /lib/ (*.jar;*.so)

一个工作流应用至少包含 workflow.xml

文章目录
  1. Oozie-3_工作流程定义
  2. 1. 简介
  3. 2. 流程定义
    1. 2.1 控制节点
      1. 2.1.1 start 节点
      2. 2.1.2 end 节点
      3. 2.1.3 kill 节点
      4. 2.1.4 decision 节点
      5. 2.1.5 fork 和 join 节点
    2. 2.2 动作节点
      1. 2.2.1 Action Basics 基本特性
      2. 2.2.2 Map-Reduce Action
      3. 2.2.4 Pig Action
      4. 2.2.5 Hive Action
      5. 2.2.6 Fs (HDFS) action
      6. 2.2.7 SSH Action
      7. 2.2.8 Sub-workflow Action
      8. 2.2.9 Java Action
      9. 2.2.10 Email Action
      10. 2.2.11 Spark 节点
        1. 2.2.12 spark执行python
  4. 3. 流程定义中的参数化(EL 表达式)
    1. 3.1 变量
    2. 3.2 方法
  5. 4. 工作流回调
  6. 5. 工作流程部署