1. spark提交任务到集群中
先将代码进行打包记得要报local本地模式注释掉
(1) Standalone集群
1 2 3 4 5
| bin/spark-submit \ --class chapter1.WordCount \ --master spark://node01:7077 \ /root/WordCount.jar \ hdfs://node01:8020/wordcount/input/words.txt
|
(2) HA高可用模式
1 2 3 4 5
| bin/spark-submit \ --class chapter1.WordCount \ --master spark://node01:7077,node02:7077,node03:7077 \ 主节点都要写上 /root/WordCount.jar \ hdfs://node01:8020/wordcount/input/words.txt
|
(3) Yarn模式
yarn模式有两种方式分别是 YarnOnClient 和 YarnOnCluster
区别就是Driver运行在哪里,YarnOnClient 是driver运行在当前节点上,一般是测试学习的时候使用,容易出现网络流量激增, YarnOnClaster 是driver运行在当前集群的某个节点上,一般是生产环境下使用,不容易发生网络流量激增
1 2 3 4 5 6
| ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ YARN模式 --deploy-mode cluster \ 这里可以指定为 Client 或Cluster /path/to/examples.jar \ 1000
|
2. WordCount执行流程

RDD有依赖关系分别为宽依赖和窄依赖,我们把RDD之间根据依赖关系连接起来的链条称为血统.
窄依赖:父RDD的一个分区只能被子RDD中的一个分区所依赖.
宽依赖:父RDD的一个分区能被子RDD中多个分区所依赖.
常见的窄依赖操作有:map,flatMap,union,filter,coalesce等
常见的宽依赖操作有:distinct,reduceByKey,groupByKey,repartition等
reduceByKey和groupBykey那个效率高?
reduceByKey效率更高,因为reduceByKey采用了预聚合,就是对数据操作之前,提前对数据进行一次操作,提高了执行效率
3. spark 作业提交流程

4. RDD简介
rdd是spark底层的抽象数据模型,rdd又叫弹性分布式数据集,它的特点是不可变,可分区,分区内的数据可以并行计算的集合.rdd算子分为两类一种是 transformation 转换算子一种是 action 行动算子,转换算子是惰性计算 只做连接不计算,只有遇到行动算子才运算,行动算子是带动转换算子做运算的,然后将结果输出或保存到指定地点,如何区分转换算子和行动算子,看返回值类型,返回值类型是rdd的就是转换算子,返回值类型不是rdd的就是行动算子,rdd不能携带数据,只能携带元数据信息.
5. RDD算子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| val rdd: RDD[Int] = sc.parallelize(1 to 10) val result: RDD[Int] = rdd.map(_ * 10)
println(result.collect().toBuffer)
val rdd: RDD[String] = sc.parallelize(List("hello word", "apple word")) val result: RDD[String] = rdd.flatMap(_.split("\\s+")) result.foreach(println)
val rdd: RDD[Int] = sc.parallelize(1 to 16, 4) val result: RDD[String] = rdd.mapPartitions(x => Iterator(x.mkString("-"))) result.foreach(println)
val rdd: RDD[Int] = sc.parallelize(1 to 16, 4) val result: RDD[Any] = rdd.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))) result.foreach(println)
val rdd: RDD[Int] = sc.parallelize(1 to 10)
val result: RDD[Int] = rdd.sample(true, 0.5) println(result.collect().toBuffer)
val rdd: RDD[Int] = sc.parallelize(1 to 10) val result: Array[Int] = rdd.takeSample(true, 6) println(result.toBuffer)
val rdd1: RDD[Int] = sc.parallelize(1 to 6, 3)
println(rdd1.getNumPartitions) val rdd2: RDD[Int] = sc.parallelize(5 to 8, 2) println(rdd2.getNumPartitions) val result: RDD[Int] = rdd1.union(rdd2) println(result.getNumPartitions)
result.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))).foreach(println)
val rdd1: RDD[(Int, String)] = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))) val rdd2: RDD[(Int, Int)] = sc.parallelize(Array((1, 4), (2, 5), (5, 6))) val result: RDD[(Int, (String, Int))] = rdd1.join(rdd2) result.foreach(println) leftOutJoin算子 以左rdd为基准 按key进行整合 打印出来的话 第一是key 第二个是数组 左边是rdd1中的数据 右边是option当中的数据 有数据则是Some 没有数据就是None val result1: RDD[(Int, (String, Option[Int]))] = rdd1.leftOuterJoin(rdd2) result1.foreach(println)
rightOutJoin算子 以右边rdd为基准 按key进行整合 打印出来的话 第一是key 第二个是数组 左边是option当中的数据 有数据则是Some 没有数据就是None 右边是rdd2中的数据 val result2: RDD[(Int, (Option[String], Int))] = rdd1.rightOuterJoin(rdd2) result2.foreach(println)
fullOuterJoin算子 打印出来的话 , 第一个就是rdd1和rdd2中的key 第二个是option中的数据 val result3: RDD[(Int, (Option[String], Option[Int]))] = rdd1.fullOuterJoin(rdd2) result3.foreach(println)
coalesce算子 窄依赖 默认是减小分区数 第一个参数是分区数 第二个参数是是否开启shuffle 默认是false 要是先要增加分区数 需要把第二个参数设置成true val rdd: RDD[Int] = sc.parallelize(1 to 16, 4)
println(rdd.getNumPartitions)
val result: RDD[String] = rdd.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))) result.foreach(println)
val result1: RDD[Int] = rdd.coalesce(3)
println(result1.getNumPartitions)
val result2: RDD[String] = result1.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))) result2.foreach(println)
val result4: RDD[Int] = rdd.coalesce(5, true)
println(result4.getNumPartitions)
val result5: RDD[String] = result4.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))) result5.foreach(println)
val rdd: RDD[Int] = sc.parallelize(1 to 16, 4)
println(rdd.getNumPartitions)
rdd.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))).foreach(println) val rdd3: RDD[Int] = rdd.repartition(5)
println(rdd3.getNumPartitions)
rdd3.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))).foreach(println)
|