5月11日总结

5月11日总结

杲杲 所谓努力就是慢慢来的过程

1. spark提交任务到集群中

先将代码进行打包记得要报local本地模式注释掉

(1) Standalone集群
1
2
3
4
5
bin/spark-submit \
--class chapter1.WordCount \
--master spark://node01:7077 \
/root/WordCount.jar \
hdfs://node01:8020/wordcount/input/words.txt
(2) HA高可用模式
1
2
3
4
5
bin/spark-submit \
--class chapter1.WordCount \
--master spark://node01:7077,node02:7077,node03:7077 \ 主节点都要写上
/root/WordCount.jar \
hdfs://node01:8020/wordcount/input/words.txt
(3) Yarn模式

yarn模式有两种方式分别是 YarnOnClient 和 YarnOnCluster

区别就是Driver运行在哪里,YarnOnClient 是driver运行在当前节点上,一般是测试学习的时候使用,容易出现网络流量激增, YarnOnClaster 是driver运行在当前集群的某个节点上,一般是生产环境下使用,不容易发生网络流量激增

1
2
3
4
5
6
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \ YARN模式
--deploy-mode cluster \ 这里可以指定为 Client 或Cluster
/path/to/examples.jar \
1000

2. WordCount执行流程

1683793110857

RDD有依赖关系分别为宽依赖和窄依赖,我们把RDD之间根据依赖关系连接起来的链条称为血统.

窄依赖:父RDD的一个分区只能被子RDD中的一个分区所依赖.

宽依赖:父RDD的一个分区能被子RDD中多个分区所依赖.

常见的窄依赖操作有:map,flatMap,union,filter,coalesce等

常见的宽依赖操作有:distinct,reduceByKey,groupByKey,repartition等

reduceByKey和groupBykey那个效率高?

reduceByKey效率更高,因为reduceByKey采用了预聚合,就是对数据操作之前,提前对数据进行一次操作,提高了执行效率

3. spark 作业提交流程

111

4. RDD简介

rdd是spark底层的抽象数据模型,rdd又叫弹性分布式数据集,它的特点是不可变,可分区,分区内的数据可以并行计算的集合.rdd算子分为两类一种是 transformation 转换算子一种是 action 行动算子,转换算子是惰性计算 只做连接不计算,只有遇到行动算子才运算,行动算子是带动转换算子做运算的,然后将结果输出或保存到指定地点,如何区分转换算子和行动算子,看返回值类型,返回值类型是rdd的就是转换算子,返回值类型不是rdd的就是行动算子,rdd不能携带数据,只能携带元数据信息.

5. RDD算子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
//map算子
val rdd: RDD[Int] = sc.parallelize(1 to 10)
val result: RDD[Int] = rdd.map(_ * 10)
//打印输出
println(result.collect().toBuffer)


//flatMap算子 先将数据进行map处理再进行扁平化处理
val rdd: RDD[String] = sc.parallelize(List("hello word", "apple word"))
val result: RDD[String] = rdd.flatMap(_.split("\\s+"))
result.foreach(println)

//mapPartitions算子 是对分区内数据操作
val rdd: RDD[Int] = sc.parallelize(1 to 16, 4)
val result: RDD[String] = rdd.mapPartitions(x => Iterator(x.mkString("-")))
result.foreach(println)


//mapPartitionsWithIndex算子 查看每个分区内都有哪些数据
val rdd: RDD[Int] = sc.parallelize(1 to 16, 4)
val result: RDD[Any] = rdd.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-")))
result.foreach(println)

//sample 随机抽样 基数越大抽出来的数据越准确
val rdd: RDD[Int] = sc.parallelize(1 to 10)
//第一个参数是指定是否放回 true就是取出后再放回,会出现重复值 false就是取出后不放回,不会出现重复值
//第二个参数是比例 取出的比例 在0到1之间 [0,1] 会有浮动 所以说基数越大抽出来的数据越准确
val result: RDD[Int] = rdd.sample(true, 0.5)
println(result.collect().toBuffer)

//takeSample 指定抽出的个数
val rdd: RDD[Int] = sc.parallelize(1 to 10)
val result: Array[Int] = rdd.takeSample(true, 6)
println(result.toBuffer)


//union算子 他是窄依赖 它不仅将数据整合在一起还将分区数也整合在一起
val rdd1: RDD[Int] = sc.parallelize(1 to 6, 3)
//打印分区数
println(rdd1.getNumPartitions)
val rdd2: RDD[Int] = sc.parallelize(5 to 8, 2)
println(rdd2.getNumPartitions)
val result: RDD[Int] = rdd1.union(rdd2)
println(result.getNumPartitions)
//查询每个分区内都有哪些数据
result.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))).foreach(println)


//join算子 只适用于key-value格式的数据
//将key相同的数据整合到一起
val rdd1: RDD[(Int, String)] = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c")))
val rdd2: RDD[(Int, Int)] = sc.parallelize(Array((1, 4), (2, 5), (5, 6)))
val result: RDD[(Int, (String, Int))] = rdd1.join(rdd2)
result.foreach(println)
leftOutJoin算子
以左rdd为基准 按key进行整合 打印出来的话 第一是key 第二个是数组 左边是rdd1中的数据 右边是option当中的数据 有数据则是Some 没有数据就是None
val result1: RDD[(Int, (String, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
result1.foreach(println)

rightOutJoin算子
以右边rdd为基准 按key进行整合 打印出来的话 第一是key 第二个是数组 左边是option当中的数据 有数据则是Some 没有数据就是None 右边是rdd2中的数据
val result2: RDD[(Int, (Option[String], Int))] = rdd1.rightOuterJoin(rdd2)
result2.foreach(println)

fullOuterJoin算子
打印出来的话
, 第一个就是rdd1和rdd2中的key 第二个是option中的数据
val result3: RDD[(Int, (Option[String], Option[Int]))] = rdd1.fullOuterJoin(rdd2)
result3.foreach(println)


coalesce算子 窄依赖 默认是减小分区数 第一个参数是分区数 第二个参数是是否开启shuffle 默认是false
要是先要增加分区数 需要把第二个参数设置成true
val rdd: RDD[Int] = sc.parallelize(1 to 16, 4)
//打印分区数
println(rdd.getNumPartitions)
//打印每个分区中有哪些数据
val result: RDD[String] = rdd.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-")))
result.foreach(println)

val result1: RDD[Int] = rdd.coalesce(3)
//打印分区数
println(result1.getNumPartitions)
//打印每个分区中有那些的数据
val result2: RDD[String] = result1.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-")))
result2.foreach(println)


val result4: RDD[Int] = rdd.coalesce(5, true)
//打印分区数
println(result4.getNumPartitions)
//打印每个分区中有那些的数据
val result5: RDD[String] = result4.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-")))
result5.foreach(println)

//rePartitions算子 重新设置分区数
//它底层是调用coalesce方法 默认是开启shuffle的
val rdd: RDD[Int] = sc.parallelize(1 to 16, 4)
//查看分区数
println(rdd.getNumPartitions)
//查看每个分区内有那些数据
rdd.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))).foreach(println)
val rdd3: RDD[Int] = rdd.repartition(5)
//查看分区数
println(rdd3.getNumPartitions)
//查看每个分区内有那些数据
rdd3.mapPartitionsWithIndex((index, item) => Iterator(index + ":" + item.mkString("-"))).foreach(println)
  • 标题: 5月11日总结
  • 作者: 杲杲
  • 创建于 : 2023-05-11 21:07:49
  • 更新于 : 2023-05-11 21:07:49
  • 链接: https://weishijay.dpdns.org/2023/05/11/5月11日总结/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。