大数据平台spark开发指南

客服9005发表于:2018年07月05日 17:49:20更新于:2019年01月03日 09:59:26

大数据平台spark开发指南

 

    目前大数据平台上所有的资源都需要经过信云智进行购买,购买完成后,在多租户平台上会看到所购买的资源和连接信息,使用连接信息连接集群进行任务提交,下面以spark任务为例进行说明。

    SPARK 主要分为spark sql,spark streaming,spark mllib,spark graphx四个模块,以下分别以spark streaming 和spark sql为例,说明如何在大数据平台上进行开发。

点击下载开发程序示例

 

一.资源购买

 

  • 登录信云智,购买资源

image.png

                    此处购买的是spark资源,购买资源量可以自行调整。

  • 登录多租户,查看购买资源的连接信息

                     登录后可以在服务列表中看到购买的spark资源的访问信息

image.png

image.png

  • 下载连接集群时需要的keytab

登录多租户管理平台,点击其中的下载keytab文件,下载keytab文件来进行集群连接

 

image.png

 

 

二.任务提交

        spark的任务提交,目前在生产环境,都是提交到yarn cluster,可以分为使用jar提交,或者使用python脚本提交,如下分别进行说明。

  • jar包方式,代码编写完成后,需要打包成jar,并将jar上传到client节点,以如下计算pi的代码为例,打包为sparkDemo.jar,然后上传到client节点

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

* Created by yujin on 2018/6/14.

*/

object HdfsWordCount {

def main(args: Array[String]) {

if (args.length < 1) {

System.err.println("Usage: HdfsWordCount <directory>")

System.exit(1)

}

 

val sparkConf = new SparkConf().setAppName("HdfsWordCount")

// Create the context

val ssc = new StreamingContext(sparkConf, Seconds(2))

 

// Create the FileInputDStream on the directory and use the

// stream to count words in new files created

val lines = ssc.textFileStream(args(0))

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()

}

}

 

提交脚本如下,注意需要提供keytab 和principal,来进行认证,提供queue来进行任务执行。

 

spark-submit --class com.citic.spark.sql.streaming.HdfsWordCount --master yarn-cluster     --num-executors 1     --driver-memory 512m     --executor-memory 512m     --executor-cores 3   --queue da0496f4-ed15-4311-a0c9-a2f54e3618dc  --keytab /root/xxx.keytab --principal  xxx@EXAMPLE.REALM   /root/sparkDemo.jar  10

 

 

 

 

  • python脚本形式

 

以如下的spark sql的脚本为例

from pyspark.sql import DataFrameReader

from pyspark.sql import SQLContext

import sys

 

from pyspark.sql import SparkSession

import collections

 

spark = SparkSession.builder.appName("SparkSQL_example").config("spark.sql.warehouse.dir","spark-warehouse").enableHiveSupport().getOrCreate()

 

df = spark.sql("select * from test.users")

 

print(df.show(10))

 

spark.stop()

 

编写完成后,提交任务命令如下(以提交任务到yarn cluster为例)

 

如果运行的是spark on hive,需要将hive-site.xml配置文件通过--files选项进行提交

/usr/hdp/2.6.0.3-8/spark2/bin/spark-submit --master yarn --deploy-mode cluster  --conf spark.executorEnv.PYTHONHASHSEED=321 --executor-memory 2G --executor-cores 4 --num-executors 5  --queue test_queue  --keytab /root/huangzhe.keytab --principal xxx@EXAMPLE.REALM --files /usr/hdp/2.6.0.3-8/spark2/conf/hive-site.xml  /root/data_process.py

 

三. 任务监控

任务提交之后,可以通过yarn的监控界面进行任务运行情况的监控,如下

image.png

 

以上为大数据平台开发的完整流程.


中信云服务团队