Oozie Spark Action 配置
  • Spark Action 用来运行spark 任务,流程任务必须等当前节点的spark任务执行完成之后才能执行后续节点任务。
  • 运行Spark Job,必须在 spark action里面配置 job-tracer,name-node,master,和一些必要的参数和配置。
  • Spark options 可以用 " spark-opts" 元素来配置
  • 同Shell Action一样 Spark Action 可以配置成创建或者删除HDFS目录之后再去执行一个Sqoop任务
  • Spark 应用的配置可以使用job-xml文件中的元素,也可以使用内部元素来配置,像EL表达式也支持在内部元素中的配置,内部元素的配置可以覆盖外部文件中的配置。

Spark Action格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
    ...
    <action name="[NODE-NAME]">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[SPARK SETTINGS FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <master>[SPARK MASTER URL]</master>
            <mode>[SPARK MODE]</mode>
            <name>[SPARK JOB NAME]</name>
            <class>[SPARK MAIN CLASS]</class>
            <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
            <spark-opts>[SPARK-OPTIONS]</spark-opts>
            <arg>[ARG-VALUE]</arg>
                ...
            <arg>[ARG-VALUE]</arg>
            ...
        </spark>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>
  • prepare 元素 如果存在,表明在执行sqoop 命令之前需要执行的一系列 hdfs路径的创建和删除操作,并且路径必须以  hdfs://HOST:PORT  开头
  • job-xml 元素 如果存在,则作为sqoop任务的配置文件,从 schema 0.3开始支持多个job-xml元素用来支持多个job.xml文件
  • configuration 用来给spark任务传递参数
  • master用来指定spark master 例如spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local.
  • mode以集群或者客户端的模式运行spark 程序 例如 :client,cluster
  • name spark 应用的名称
  • classspark运行程序的主类名
  • jarspark 应用需要引用的其它jar包
  • spark-opts提交给驱动程序的参数。比如--conf key=value或者是在oozie-site.xml中配置的oozie.service.SparkConfiguationService.spark.configurations
  • arg spark 应用参数

Spark Action 使用实例一:Oozie自带案例运行,使用oozie调度spark程序
1,首先下载Oozie自带的例子,解压,打开到 examplesappsspark 目录,根据自己的安装环境修改之后的job.properties文件如下
1
2
3
4
5
6
7
jobTracker=hadoop-node1.novalocal:8021
master=local[*]
queueName=default
examplesRoot=xwj_test
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/oozie/${examplesRoot}/apps/spark
2,根据自己测试环境路径,修改后的workflow.xml 内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'>
    <start to='spark-node' />
 
    <action name='spark-node'>
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/apps/spark/output"/>
            </prepare>
            <master>${master}</master>
            <name>Spark-FileCopy</name>
            <class>org.apache.oozie.example.SparkFileCopy</class>
            <jar>${nameNode}/user/oozie/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar>
            <arg>${nameNode}/user/oozie/${examplesRoot}/apps/spark/input/data.txt</arg>
            <arg>${nameNode}/user/oozie/${examplesRoot}/apps/spark/output</arg>
        </spark>
        <ok to="end" />
        <error to="fail" />
    </action>
 
    <kill name="fail">
        <message>Workflow failed, error
            message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>
    <end name='end' />
</workflow-app>
3,查看 org.apache.oozie.example.SparkFileCopy 内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package org.apache.oozie.example;
 
import java.io.PrintStream;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public final class SparkFileCopy
{
  public static void main(String[] args)
    throws Exception
  {
    if (args.length < 2) {
      System.err.println("Usage: SparkFileCopy <file> <file>");
      System.exit(1);
    }
 
    SparkConf sparkConf = new SparkConf().setAppName("SparkFileCopy");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    JavaRDD lines = ctx.textFile(args[0]);
    lines.saveAsTextFile(args[1]);
    System.out.println("Copied file from " + args[0] + " to " + args[1]);
    ctx.stop();
  }
}
其功能就是一个文件的拷贝
4,首先在本地的测试节点上创建文件夹
mkdir -p /opt/mydata/user/oozie/xwj_test/apps/shell/spark
5,在hdfs上创建目录 hdfs dfs -mkdir -p /user/oozie/xwj_test/apps/shell/spark
6,将上述文件上传到新建好的目录中
cd /opt/mydata/user/oozie/xwj_test/apps/shell/spark
6,将本地文件 上传到hdfs目录中
hdfs dfs -put ../spark/* /user/oozie/xwj_test/apps/shell/spark
7,查看hdfs上的目录文件是否存在
hdfs dfs -ls -r /user/oozie/xwj_test/apps/shell/spark
8,切换yarn用户重新提交任务
su yarn
oozie job -oozie http://hadoop-node0.novalocal:11000/oozie -config /opt/mydata/user/oozie/xwj_test/apps/shell/email/job.properties -run
查看结果
内容来源于网络如有侵权请私信删除
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!