文章目录
  1. 1. Sqoop
  2. 2. Hive
  3. 3. Shell
  4. 4. Email
  5. 5. Map-Reduce
  6. 6. Fs (HDFS)
  7. 7. DistCp Action
  8. 8. Pig Action
  9. 9. Java Action
  10. 10. Sub-workflow Action
  11. 11. 总结
  12. 12. 参考资料

Oozie 工作流中,Action节点包括即 Oozie 支持的 jobs 和 sub-workflows。其中,Oozie 支持的 Hadoop jobs 类型有 Map ReducePigHiveSqoop。另外,Oozie 还支持的 jobs 类型还有 JavaStreamingFs(Hadoop File System 操作)SshShellEmailDistCp

Sqoop

Sqoop action 执行一个 Sqoop job。工作流一直等待直到 Sqoop job 完成后才会进入到工作流中的下一个 action。在运行 Sqoop action 之前,须要设置 job-trackername-nodeSqoop commandarg elements。Sqoop action 在执行 Sqoop job 前可以创建或删除 HDFS 目录。

通过 job-xml 可以指定 Sqoop action 配置文件,或通过 configuration 元素在线对 Sqoop action 配置。其中,可以在在线配置中使用 EL 表达式,并且在线配置的值会覆盖在通过 job-xml 的配置值。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<command>[SQOOP-COMMAND]</command>
<arg>[SQOOP-ARGUMENT]</arg>
...
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</sqoop>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

其中,prepare (Optional)元素指出在开始 job 之前将要删除或创建的 HDFS 目录,具体目录须以 hdfs://HOST:PORT 开始。job-xml (Optional)元素指定了配置 Sqoop job 的文件。configuration 元素设置对 Sqoop job 的配置属性。

Sqoop 命令可以通过 command 或多个 arg 设置。在使用 command 时,Oozie 会以空格分割 command 中的命令成为多个参数。在使用多个 arg 时,Oozie 会将每一个arg 中的值作为参数传递给 Sqoop job。当在单个参数中的变量中有空格时,须要使用多个 arg 方式。

所有的以上的元素中的值均可以通过 EL 表达式进行参数化设置。

command 例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirsthivejob">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-traker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<command>import --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir hdfs://localhost:8020/user/tucu/foo -m 1</command>
</sqoop>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

arg 例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirsthivejob">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-traker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:hsqldb:file:db.hsqldb</arg>
<arg>--table</arg>
<arg>TT</arg>
<arg>--target-dir</arg>
<arg>hdfs://localhost:8020/user/tucu/foo</arg>
<arg>-m</arg>
<arg>1</arg>
</sqoop>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

Hive

Hive action 执行一个 Hive job。工作流一直等待直到 Hive job 完成后才会进入到工作流中的下一个 action。在运行 Hive action 之前,须要设置 job-trackername-nodesctript,以及一些必要的必要的参数。Hive action 在执行 Hive job 前可以创建或删除 HDFS 目录。

通过 job-xml 可以指定 Hive action 配置文件,或通过 configuration 元素在线对 Hive action 配置。其中,可以在在线配置中使用 EL 表达式,并且在线配置的值会覆盖在通过 job-xml 的配置值。

Oozie Hive action 支持在 Hive 脚本中以 ${VARIABLES} 方式设置变量参数。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[HIVE SETTINGS FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<script>[HIVE-SCRIPT]</script>
<param>[PARAM-VALUE]</param>
...
<param>[PARAM-VALUE]</param>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</hive>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

其中,prepare (Optional)元素指出在开始 job 之前将要删除或创建的 HDFS 目录,具体目录须以 hdfs://HOST:PORT 开始。job-xml (Optional)元素指定了配置 Hive job 的文件。configuration 元素设置对 Hive job 的配置属性。

script 须包含 Hive 脚本的路径,Hive 脚本可以通过 ${VARIABLES} 方式设置变量参数,这些变量的值可以通过 params 元素设置,并传递给 Hive 脚本。

所有的以上的元素中的值均可以通过 EL 表达式进行参数化设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirsthivejob">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<script>myscript.q</script>
<param>InputDir=/home/tucu/input-data</param>
<param>OutputDir=${jobOutput}</param>
</hive>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

Shell

Shell action 执行一个 Shell 命令。工作流一直等待直到 Shell 命令完成后才会进入到工作流中的下一个 action。在运行 Shell 命令之前,须要设置 job-trackername-nodeexec,以及一些必要的必要的参数。Shell 命令在执行前可以创建或删除 HDFS 目录。

通过 job-xml 可以指定 Shell 配置文件,或通过 configuration 元素在线对 Shell 配置。其中,可以在在线配置中使用 EL 表达式,并且在线配置的值会覆盖在通过 job-xml 的配置值。

Shell job 的标准输出 STDOUT 可以被工作流中 Shell job 之后的 job 使用,标准化输出须满足2个条件:

  • 输出的格式须满足 Java Properties 文件的格式
  • 输出内容的大小须不超过 2KB

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
...
<action name="[NODE-NAME]">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[SHELL SETTINGS FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<exec>[SHELL-COMMAND]</exec>
<argument>[ARG-VALUE]</argument>
...
<argument>[ARG-VALUE]</argument>
<env-var>[VAR1=VALUE1]</env-var>
...
<env-var>[VARN=VALUEN]</env-var>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
<capture-output/>
</shell>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

其中,prepare (Optional)元素指出在开始 job 之前将要删除或创建的 HDFS 目录,具体目录须以 hdfs://HOST:PORT 开始。job-xml 元素指定了配置 Shell job 的文件。configuration 元素设置对 Shell job 的配置属性。

exec 必须包含 Shell 命令文件的路径,并且,Shell 命令可以使用一个或多个参数,可以通过 argument 设置并传递给 Shell 命令。

env-var (Optional)包含了传递给 Shell 命令的环境,须仅包含一对变量与值,如果变量为 $PATH,须按照 Unix 惯例,即 PATH=$PATH:mypath。不能使用 ${PATH},否则会被 Oozie 的 EL 识别程序识别并取代。

capture-output (Optional)捕获 Shell 命令的标准化输出 STDOUT 。Shell 命令的标准化输出须以 Java Properties 文件格式,且不超过2KB,被工作流以 String action:output(String node, String key) 函数获取。

所有的以上的元素中的值均可以通过 EL 表达式进行参数化设置。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<workflow-app xmlns='uri:oozie:workflow:0.3' name='shell-wf'>
<start to='shell1' />
<action name='shell1'>
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${EXEC}</exec>
<argument>A</argument>
<argument>B</argument>
<file>${EXEC}#${EXEC}</file> <!--Copy the executable to compute node's current working directory -->
</shell>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Script failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

在提交 Oozie job 时 job 的 属性文件示例如下:

1
2
3
4
5
6
7
8
9
10
oozie.wf.application.path=hdfs://localhost:8020/user/kamrul/workflows/script#Execute is expected to be in the Workflow directory.
#Shell Script to run
EXEC=script.sh
#CPP executable. Executable should be binary compatible to the compute node OS.
#EXEC=hello
#Perl script
#EXEC=script.pl
jobTracker=localhost:8021
nameNode=hdfs://localhost:8020
queueName=default

运行打包成 jar 后的 Java 程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<workflow-app xmlns='uri:oozie:workflow:0.3' name='shell-wf'>
<start to='shell1' />
<action name='shell1'>
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>java</exec>
<argument>-classpath</argument>
<argument>./${EXEC}:$CLASSPATH</argument>
<argument>Hello</argument>
<file>${EXEC}#${EXEC}</file> <!--Copy the jar to compute node current working directory -->
</shell>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Script failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>

在提交 Oozie job 时 job 的 属性文件示例如下:

1
2
3
4
5
oozie.wf.application.path=hdfs://localhost:8020/user/kamrul/workflows/script#Hello.jar file is expected to be in the Workflow directory.
EXEC=Hello.jar
jobTracker=localhost:8021
nameNode=hdfs://localhost:8020
queueName=default

相比于在 CLI 中执行 Shell 脚本,Shell Action 有以下限制:

  • 不支持交互;
  • 不能使用 sudo 切换到其他的用户;
  • 用户须明确的上传需要的第三方包(如 jarlib 和运行的 etc );
  • 由于 Oozie 在 Hadoop 的计算节点执行 Shell 命令,因此,Oozie 仅支持在计算节点已安装的功能,或上传的第三方包支持的功能。

Email

Email action 允许发送邮件。Email action 需提供接收方邮件地址,抄送方地址(optional),邮件主题和邮件内容。多个邮件接收者可以通过逗号间隔。

Email action 异步执行,工作流须在邮件发送出去后才进入工作流的下一个 action。

所有的以上的元素中的值均可以通过 EL 表达式进行参数化设置。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<email xmlns="uri:oozie:email-action:0.2">
<to>[COMMA-SEPARATED-TO-ADDRESSES]</to>
<cc>[COMMA-SEPARATED-CC-ADDRESSES]</cc> <!-- cc is optional -->
<subject>[SUBJECT]</subject>
<body>[BODY]</body>
<content_type>[CONTENT-TYPE]</content_type> <!-- content_type is optional -->
</email>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

其中,tocc 指出接受邮件的邮箱地址,多个接收者用逗号间隔。subjectbody 指出邮件的主题和内容。

Email action 支持 SMTP 协议。在 oozie-site.xml 中需要进行配置。其中,oozie.email.smtp.host 指定 SMTP 服务器地址;oozie.email.smtp.port 指定连接 SMTP 服务器的端口号,默认为25;oozie.email.from.address 指定邮件发送方地址;oozie.email.smtp.auth 以布尔值(truefalse )指定是否需要身份验证;oozie.email.smtp.username 指定登陆 SMTP 服务器的邮箱地址,而 oozie.email.smtp.password 指定登陆 SMTP 服务器的邮箱的密码。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="an-email">
<email xmlns="uri:oozie:email-action:0.1">
<to>bob@initech.com,the.other.bob@initech.com</to>
<cc>will@initech.com</cc>
<subject>Email notifications for ${wf:id()}</subject>
<body>The wf ${wf:id()} successfully completed.</body>
</email>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

Map-Reduce

map-reduce action 执行一个 Hadoop map/reduce job。这里,Hadoop jobs 可以是 Java Map/Reduce jobs 或 streaming jobs。

map-reduce action 通过配置,可以在执行 map/reduce job 前进行系统清理和目录创建。这样可以确保 Oozie 在一个 Hadoop job 执行失败后重试(因为在执行 Hadoop job 前,Hadoop 会检查 job 的输出目录是否存在。当在失败后重试时,未清理输出目录会造成重试失败)。

工作流会等到 Hadoop map/reduce job 执行结束后继续执行工作流后面的 action。在运行 Hadoop map/reduce job 前须要进行必要的 Hadoop JobConf 属性配置。Hadoop JobConf 属性配置如下:

  • config-default.xml,或
  • 工作流定义中的 tag,或
  • JobConf XML 文件,或
  • map-reduce action 中配置

配置属性按照上述文件属性加载,上述顺序中后面的配置的属性值会覆盖前面的值。Streaming 和 map-reduce action 中的配置可以使用 EL 表达式。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<map-reduce>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<streaming>
<mapper>[MAPPER-PROCESS]</mapper>
<reducer>[REDUCER-PROCESS]</reducer>
<record-reader>[RECORD-READER-CLASS]</record-reader>
<record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
...
<env>[NAME=VALUE]</env>
...
</streaming>
<!-- Either streaming or pipes can be specified for an action, not both -->
<pipes>
<map>[MAPPER]</map>
<reduce>[REDUCER]</reducer>
<inputformat>[INPUTFORMAT]</inputformat>
<partitioner>[PARTITIONER]</partitioner>
<writer>[OUTPUTFORMAT]</writer>
<program>[EXECUTABLE]</program>
</pipes>
<job-xml>[JOB-XML-FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</map-reduce> <ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstHadoopJob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="hdfs://foo:8020/usr/tucu/output-data"/>
</prepare>
<job-xml>/myfirstjob.xml</job-xml>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>/usr/tucu/input-data</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/usr/tucu/input-data</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>${firstJobReducers}</value>
</property>
<property>
<name>oozie.action.external.stats.write</name>
<value>true</value>
</property>
</configuration>
</map-reduce>
<ok to="myNextAction"/>
<error to="errorCleanup"/>
</action>
...
</workflow-app>

Streaming 例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="firstjob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${output}"/>
</prepare>
<streaming>
<mapper>/bin/bash testarchive/bin/mapper.sh testfile</mapper>
<reducer>/bin/bash testarchive/bin/reducer.sh</reducer>
</streaming>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>${input}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${output}</value>
</property>
<property>
<name>stream.num.map.output.key.fields</name>
<value>3</value>
</property>
</configuration>
<file>/users/blabla/testfile.sh#testfile</file>
<archive>/users/blabla/testarchive.jar#testarchive</archive>
</map-reduce>
<ok to="end"/>
<error to="kill"/>
</action>
...
</workflow-app>

Pipes 例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="firstjob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${output}"/>
</prepare>
<pipes>
<program>bin/wordcount-simple#wordcount-simple</program>
</pipes>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>${input}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${output}</value>
</property>
</configuration>
<archive>/users/blabla/testarchive.jar#testarchive</archive>
</map-reduce>
<ok to="end"/>
<error to="kill"/>
</action>
...
</workflow-app>

Fs (HDFS)

fs action 允许操作 HDFS 中的文件,包括movedeletemkdirchmodtouchzchgrp。FS 命令异步执行,工作流等待 fs action 执行结束后才进入下一个 action。

fs action 中的路径可以通过 EL 表达式参数化,路径须为绝对路径。

特别地,fs action 为非原子操作,即在中途操作失败不能回滚。在执行 fs action 前,会检查是否源路径存在,及目标路径不存在。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
...
<action name="[NODE-NAME]">
<fs>
<delete path='[PATH]'/>
...
<mkdir path='[PATH]'/>
...
<move source='[SOURCE-PATH]' target='[TARGET-PATH]'/>
...
<chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
...
<touchz path='[PATH]' />
...
<chgrp path='[PATH]' group='[GROUP]' dir-files='false' />
</fs>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

delete 命令删除指定的路径,如果是目录,则会递归的删除目录下文件,再删除目录。

mkdir 命令将会创建指定的目录,路径中不存在的目录都会创建,如果都存在,不执行任何操作。

move 命令执行前,源路径必须存在,在执行命令时:

  • 目标路径的文件系统 URI(如 hdfs://{nameNode})会跳过,move 会认为目标文件系统 URI 与源路径的一样。如果目标路径包含文件系统 URI,必须与源路径的一样;
  • 目标路径的父目录必须存在;
  • 如果目标路径是一个文件名称,该文件不能存在;
  • 如果目标路径已经存在,move 会将源文件或目录作为目录路径的子文件或子目录。

chmod 命令更改指定路径的权限。权限通过 Unix 标识符(如 -rwxrw-rw-)或八进制(如 755)表示。当更改一个目录的权限时,默认的目录和目录下一级文件权限会改变。若只改变目录权限,可以通过设置 dir-filesfalse 实现。如果需要递归的改变目录及其下的所有级的文件或目录,需要提供 recursive 元素。

touchz 命令在指定的路径下创建一个大小为零的文件,如果有文件在指定的路径下存在,则会执行一个 touch 操作。touchz 命令只能在绝对路径下使用。

chgrp 命令改变指定路径的群组。当对一个目录执行命令时,默认的目录和目录下一级文件群组会改变。若只改变目录群组,可以通过设置 dir-filesfalse 实现。如果需要递归的改变目录及其下的所有级的文件或目录,需要提供 recursive 元素。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.5">
...
<action name="hdfscommands">
<fs>
<delete path='hdfs://foo:8020/usr/tucu/temp-data'/>
<mkdir path='archives/${wf:id()}'/>
<move source='${jobInput}' target='archives/${wf:id()}/processed-input'/>
<chmod path='${jobOutput}' permissions='-rwxrw-rw-' dir-files='true'><recursive/></chmod>
<chgrp path='${jobOutput}' group='testgroup' dir-files='true'><recursive/></chgrp>
</fs>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

DistCp Action

DistCp action 使用 Hadoop distributed copy 在不同集群间或同一集群内拷贝文件。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.4">
...
<action name="[NODE-NAME]">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode1}</name-node>
<arg>${nameNode1}/path/to/input.txt</arg>
<arg>${nameNode2}/path/to/output.txt</arg>
</distcp>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

其中,第一个 arg 指定输入,第二个 arg 指定输出。如果在2个安全的集群间使用 DistCp,下面的配置需要添加的 action 的配置中:

1
2
3
4
<property>
<name>oozie.launcher.mapreduce.job.hdfs-servers</name>
<value>${nameNode1},${nameNode2}</value>
</property>

Pig Action

pig action 执行一个 Pig 任务。工作流等待 pig action 结束后继续执行之后的 action。在运行 Pig action 之前,须要设置 job-trackername-nodescript,以及一些必要的必要的参数。pig 命令在执行前可以创建或删除 HDFS 目录或文件,或 HCatalog 分区的清理。这样可以确保 Oozie 在一个 Pig job 执行失败后重试(因为 Pig 会在临时目录下产生中间结果数据。当在失败后重试时,未清理临时目录及其下的中间结果数据,会造成重试失败)。

Hadoop JobConf 属性配置如下:

  • config-default.xml,或
  • 工作流定义中的 tag,或
  • JobConf XML 文件,或
  • pig action 中配置

配置属性按照上述文件属性加载,上述顺序中后面的配置的属性值会覆盖前面的值。在线的属性值可以使用 EL 表达式参数化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
...
<action name="[NODE-NAME]">
<pig>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[JOB-XML-FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<script>[PIG-SCRIPT]</script>
<param>[PARAM-VALUE]</param>
...
<param>[PARAM-VALUE]</param>
<argument>[ARGUMENT-VALUE]</argument>
...
<argument>[ARGUMENT-VALUE]</argument>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</pig>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

prepare (optional)元素指定了在执行任务前将要删除的路径。目录的清理和删除 Hcatalog 表分区两者只能执行其中一个。Hcatalog 表分区的 URI 为 hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value]

job-xml(optional)元素指向 Hadoop JobConf job.xml文件。configuration (optional)元素包含 Hadoop jobs 的 JobConf 属性。在 configuration 元素中的属性值将会覆盖在 job-xml 中的属性值。在 Pig 运行期间,在线配置的属性值和在 job-xml 中的值将会传递给 Hadoop jobs。

script 元素指定 pig 脚本,并可以通过 ${VARIABLE} 参数化。这些参数化的变量可以在 params 中指定。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.2">
...
<action name="myfirstpigjob">
<pig>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>oozie.action.external.stats.write</name>
<value>true</value>
</property>
</configuration>
<script>/mypigscript.pig</script>
<argument>-param</argument>
<argument>INPUT=${inputDir}</argument>
<argument>-param</argument>
<argument>OUTPUT=${outputDir}/pig-output3</argument>
</pig>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

Java Action

java action 执行指定的 Java 类的 public static void main(String[] args) 方法,在 Hadoop 集群中作为 map-reduce job 执行。工作流等待 java action 执行结束后执行工作流中之后的 action。

java action 在执行前需要配置 job-trackername-nodemain Java classJVM选项和其他参数。

Java 类执行成功后过渡到 OK action,有异常抛出时过渡到 error action。在 main Java class 中不能调用 System.exit(int n),否则出现错误过渡。

java action 通过配置,可以在执行 java job 前创建或删除 HDFS 目录或文件,或 HCatalog 分区的清理。这样可以确保 Oozie 在一个 Java job 执行失败后重试(因为在执行 Hadoop job 前,Hadoop 会检查 job 的输出目录是否存在。当在失败后重试时,未清理输出目录会造成重试失败)。

命令在执行前可以创建或删除 HDFS 目录或文件,或 HCatalog 分区的清理。这样可以确保 Oozie 在一个 Java job 执行失败后重试(因为 Java 应用程序会产生中间结果的临时数据。当在失败后重试时,未清理临时中间结果数据,会造成重试失败)。

在线配置属性值是可以使用 EL 表达式。

capture-output 元素可以用于传递 Java 应用结果给后续 Oozie 上下文使用,可以通过 EL 函数获取。结果须为 Java Properties 格式的文件,文件的名字以 JavaMainMapper.OOZIE_JAVA_MAIN_CAPTURE_OUTPUT_FILE 指定。

另外,为了在安全的集群上成功执行 Java action,需要按照下面的代码传递 Hadoop 代理 token:

1
2
3
4
// propagate delegation related props from launcher job to MR job
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<java>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[JOB-XML]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<main-class>[MAIN-CLASS]</main-class>
<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
<arg>ARGUMENT</arg>
...
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
<capture-output />
</java>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

其中,prepare(optional)元素指定了在执行任务前将要删除的路径。目录的清理和删除 Hcatalog 表分区两者只能执行其中一个。Hcatalog 表分区的 URI 为 hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value]

java-opts(optional)元素和 java-opt(optional)元素包含了启动 JVM 的命令。

arg (optional)元素包含了 main 函数的参数,每一个 arg 的值作为单个参数按照先后顺序传递给 main 方法。

所有的以上的元素中的值均可以通过 EL 表达式进行参数化设置。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstjavajob">
<java>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>org.apache.oozie.MyFirstMainClass</main-class>
<java-opts>-Dblah</java-opts>
<arg>argument1</arg>
<arg>argument2</arg>
</java>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>

Sub-workflow Action

sub-workflow action 是一个工作流作为另外一个工作流的节点,子工作流可以与父工作流在同一个 Oozie 系统中,也可以位于不同 Oozie 系统中。父工作流会等待子工作流执行结束后执行后面的 action。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<sub-workflow>
<app-path>[WF-APPLICATION-PATH]</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
</sub-workflow>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>

其中,app-path 元素指定了子工作流的路径,propagate-configuration (optional)指出子工作流的配置,configuration 部分设置需要运行子工作流的属性。子工作流的配置信息可以通过 EL 表达式参数化。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="a">
<sub-workflow>
<app-path>child-wf</app-path>
<configuration>
<property>
<name>input.dir</name>
<value>${wf:id()}/second-mr-output</value>
</property>
</configuration>
</sub-workflow>
<ok to="end"/>
<error to="kill"/>
</action>
...
</workflow-app>

在上例中,名称为 child-wf 运行在 Oozie 实例 http://myhost:11000/oozie 中。configuration 参数 input.dir 作为属性将会传递给子工作流中。

oozie-site.xml 中设置 oozie.subworkflow.classpath.inheritancetrue,或在 job.properties 中设置 oozie.wf.subworkflow.classpath.inheritancetrue,则子工作流可以继承父工作流的 jars。当子工作流和父工作流有冲突的 jar 时,优选子工作流 jar

为了阻止子工作流不定的递归执行,可以在 oozie-site.xml 中 设置 oozie.action.subworkflow.max.depth,即子工作流被最多调用几次。

总结

Oozie Workflow 支持的 job 类型非常丰富,本篇主要对 Oozie Workflow 中支持的 jobs ,分别给出定义这些 job 的语法,并给出了例子说明。

参考资料

  1. WorkflowFunctionalSpec
文章目录
  1. 1. Sqoop
  2. 2. Hive
  3. 3. Shell
  4. 4. Email
  5. 5. Map-Reduce
  6. 6. Fs (HDFS)
  7. 7. DistCp Action
  8. 8. Pig Action
  9. 9. Java Action
  10. 10. Sub-workflow Action
  11. 11. 总结
  12. 12. 参考资料