您好,欢迎访问代理记账网站
移动应用 微信公众号 联系我们

咨询热线 -

电话 15988168888

联系客服
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

史上最详细Spark-RDD算子介绍

Spark-Core

一、RDD概述

1.RDD(Resilient Distributed Dataset)

  • 弹性分布式数据集,是Spark中的最基本数据抽象

  • 在源码中是一个抽象类,代表一个弹性的,不可变、可分区、里面元素可以并行计算的集合。

  1. 弹性
    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重写分片
  2. 分布式
    • 数据存储在大数据集群中的不同节点上
  3. 数据集,不存储数据
    • RDD封装了计算逻辑,并不保存数据
  4. 数据抽象
    • RDD是一个抽象类,需要子类具体实现。
  5. 不可变
    • RDD封装了计算逻辑,不可以改变,想要改变只能生成新的RDD,在新的RDD中封装计算逻辑
  6. 可分区,并行计算

2.RDD的五大特性

  1. 一组分区(Partition),即是数据集的基本组成单位,标记数据来源于哪个分区

  2. 一个计算每个分区的函数

  3. RDD之间的依赖关系

  4. 一个Partitioner,即RDD的分区器,控制分区数据的流向(KV结构)

  5. 一个列表,存储存取每个Partition的优先位置,移动数据不如移动计算,除非资源不够

二、创建RDD的方式

  • RDD创建方式分为三种:sc为SparkContext,即Spark连接

    1. 从集合中创建RDD

      • sc.parallelize(集合)

      • sc.makeRDD(集合)

        makeRDD底层调用的就是parallelize

    2. 从外部存储中创建RDD

      • 从外部存储中创建RDD包括:本地文件系统,Hadoop支持数据集,如HDFS,Hbase等
      • sc.textFile(“文件路径”)
    3. 从其他RDD中转换新的RDD

      • 调用RDD转换算子,运算后产生新的RDD

三、RDD的分区规则

1.RDD从集合中创建

  • 默认:为cpu核心数的分区数量
  • 使用parallelize或makeRDD创建RDD的时候也可以指定分区的数量

分区源码:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E2dlULNU-1628181644666)(C:\Users\Jzs\AppData\Roaming\Typora\typora-user-images\image-20210727201546272.png)]

2.RDD从外部存储中创建

  • 默认:取值为cpu核心数和2的最小值,一般为2

四、RDD的Transformation转换算子

1)value类型算子:

  • 对一个RDD生效的算子

1.map():映射

  • 功能说明:

    • 参数为一个函数f,作用于RDD中的每一个元素。

    当一个RDD执行 map(f:T=>U) 【T为每个元素的类型,U为计算后返回值的类型】

    计算的时候,会遍历RDD中的每一个数据项,依次应用函数f,产生一个新的RDD。即返回的新RDD的每一个元素都是依次应用函数f得来的。

    • 计算后的RDD不会改变原来的RDD分区
    • 一个分区内的计算是有序的,不同分区间计算是无序的
  • 代码说明:

    //创建一个1,2,3,4且分区为2 的rdd:RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
    
    //将rdd经过map算子转换为每个元素*2 的新RDD
    val mapRdd: RDD[Int] = rdd.map(elem => elem *2) 
    

2.mapPartitions():以分区为单位执行Map

  • 功能说明:

    • 参数(f:Iterator[T] => Iterator[U] )

      【T为每个元素的类型,U为计算后每个元素的类型】

      函数f将每个分区的元素放入一个迭代器中进行处理,返回处理后的迭代器

    • map()算子一次处理一个数据,mapPartitions一次处理一个分区的数据(批处理)

    • 计算后不改变原来的分区数

  • 代码说明:

    //创建一个RDD,分区数为2
    val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
    
    //将一个分区的迭代器中每个元素map转换为2倍后返回新RDD  
    val rdd1 = rdd.mapPartitions(iter=>iter.map(_*2))
    

3.map()和mapPartitions()的区别

  • map()一次处理一条数据
  • mapPartitions()一次处理一个分区的数据
  • 因为mapPartitions()一次处理一个分区的数据,因此需要将整个分区数据加载到内存中,所以需要占用更多的内存资源。

4.mapPartitionsWithIndex():带分区号

  • **参数:**函数f:(Int,Iterator[T]) => Iterptor[U] 【Int表示分区编号】

  • 功能说明:

    和mapPartitions相同,但增加了分区编号

  • 代码说明

    //创建一个RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
    //使每个元素和所在分区号形成一个元组,组成一个新的RDD
    val indexRdd = rdd.mapPartitionsWithIndex(
        (index,iter)=>iter.map((index,_))
    )
    

5.flatMap():扁平化

  • **参数:**函数(f:list => list),将RDD中List的多个子List扁平化为一个大List

  • 功能说明:

    将RDD中的每一个元素经过函数f将数据拆分到一个大集合中返回

  • 代码说明:

    //创建一个RDD,一个集合中包含多个子集合
    val listRDD=sc.makeRDD(List(List(1,2),List(3,4),List(5,6),List(7)), 2)
    
    //通过flatMap()算子将所有子集合中数据取出放入到一个大的集合中,并收集打印
    listRDD.flatMap(list=>list).collect.foreach(println)
    
    
  • flatMap()中所有元素必须都是集合

6.golm():分区转换数据

  • 无参数

  • 功能说明:

    将RDD中的每个分区变成一个数组,返回一个新的RDD。新RDD中元素与原来的类型一致

  • 代码说明:

    //创建一个RDD,分区数为2
    val rdd = sc.makeRDD(1 to 4, 2)
    
    //将每个分区的元素转换为数组,并通过map()求出每个风区中的最大值
    val maxRdd: RDD[Int] = rdd.glom().map(_.max)
    
    //将新RDD收集并求出所有风区最大值之和
    println(maxRdd.collect().sum)
    

7.groupBy():分组

  • 参数:函数(f:T => U ) 如:elem => elem 【通过相同的元素聚合】

  • 返回值:Tuple2(elem,CompactBuffer(elem,elem,… ))

  • 功能说明:

    按照传入函数f的返回值进行分组,将计算结果相同的元素放入一个迭代器中。

  • 代码说明:

    //创建一个RDD,分区为2
    val rdd = sc.makeRDD(1 to 4, 2)
    
    //将每个%2结果相同的元素放入一个迭代器中
    rdd.groupBy(_ % 2).collect().foreach(println)
    
    //创建一个RDD
    val rdd1: RDD[String] = sc.makeRDD(List("hello","hive","hadoop","spark","scala"))
    
    //按照首字母第一个单词相同分组
    rdd1.groupBy(str=>str.charAt(0)).collect().foreach(println)
    
    //打印结果:
    (0,CompactBuffer(2, 4))
    (1,CompactBuffer(1, 3))
    (s,CompactBuffer(spark, scala))
    (h,CompactBuffer(hello, hive, hadoop))
    

8.filter():过滤

  • **参数:**函数(f:T => Boolean) 如(_%2 ==1),即保留结果为true的元素

  • 功能说明:

    ​ 将RDD中的每一个元素经过函数f后返回一个Boolean类型的结果。结果为true的元素保留,false丢弃。

  • 代码说明:

    //创建一个RDD,分区数为2
    val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 2)
    
    //将每个元素%2,值等于0的,即返回值为true的加入到新的RDD,返回值false的丢弃
    val filterRdd: RDD[Int] = rdd.filter(_ % 2 == 0)
    

9.sample():采样

  • 参数:

    1. withReplacement: Boolean, 【参数1:Boolean类型。true表示取出后放回抽样,所以能被重复抽到, fase表示取出后不放回,所以最多被抽1次】

    2. fraction: Double, 【参数1为false时,取值 [ 0,1 ] 表示每个元素被取出的概率

      ​ 参数1位true时,取值 [ 1-无穷 ] 表示每个元素可能被抽取的次数的概率最高 】

    3. seed: Long = Utils.random.nextLong): RDD[T] 【Seed:随机数种子,默认和当前时间戳有关,若指定随 机数种子的值,则采样后值将固定,重复执行结果不变】

  • 功能说明:

    从数据中采样

  • 代码说明:

    //3.1 创建一个RDD
    val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
    
    // 抽取数据不放回(伯努利算法)
    // 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
    // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
    // 第一个参数:抽取的数据是否放回,false:不放回
    // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
    // 第三个参数:随机数种子
    val sampleRDD: RDD[Int] = dataRDD.sample(false, 0.5)
    sampleRDD.collect().foreach(println)
    
    println("----------------------")
    
    // 抽取数据放回(泊松算法)
    // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
    // 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
    // 第三个参数:随机数种子
    val sampleRDD1: RDD[Int] = dataRDD.sample(true, 2)
     sampleRDD1.collect().foreach(println)
    
    

10.distinct():去重

  • 参数:

    • 默认为空参,去重后的分区数和去重前一致
    • 指定参数(numPartitions:Int),可以改变去重后的分区数量
  • 功能说明:

    对内部的元素进行去重,去重后放入新的RDD中

  • 与HashSet的去重不同点:

    • HashSet通过将元素放入一个HashSet实现自动去重,但需要将所有元素全部放入一个HashSet,需要消耗大量内存,可能出现OOM(内存溢出)

    • distinct()去重的内部源码:map(x => (x, null)).reduceByKey((x, ) => x, numPartitions).map(._1)

      ​ 通过将元素map转换,再reduceByKey聚合最后map保留元素本身的方式去重

      这种方式使用了Spark算子的多分区并行操作,解决了内存溢出的问题

  • 代码说明:

    // 创建一个RDD
     val distinctRdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2,9,6,1))
    
    // 打印去重后生成的新RDD
    distinctRdd.distinct().collect().foreach(println)
    
    // 对RDD采用多个Task去重,提高并发度
    distinctRdd.distinct(2).collect().foreach(println)
    
    
  • distinc()去重会执行shuffle过程

11.coalesce():分区合并

  • 参数

    1. numPartitioons:Int 【合并后的分区数,可以大于原来的分区数,也可以小于原来的分区数】
    2. shuffle:Boolean = false 【默认为false,不走shuffle流程,true则走shuff了流程】
  • 功能说明:

    默认值为false,不走shuffle流程,适合用于减小分区数的操作。

    如果增加分区数,需要将shuffle设置为true。不走shuffle且扩大分区没有意义,元素仍在原来的分区之中。

  • 代码说明:

    // 创建一个RDD,分区数为2
    val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
    
    // 缩减分区数为2,默认不走shuffle
    val coalesceRDD: RDD[Int] = rdd.coalesce(2)
    

12.repartition():重写分区(执行shuffle)

  • 参数 :(numPartitions:Int)设置重新分区后的分区数

  • 功能说明:

    rePartitions算子底层调用的就是coalesce,但是将shuffle值固定为true。无论如何进行分区都将执行shuffle流程。所以一般使用于扩大风区,或将数据均匀分配到分区中的情况。

  • 代码说明:

    // 创建一个RDD
    val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
    
    //扩大分区
    val repartitionRdd: RDD[Int] = rdd.repartition(3)
    

13.sortBy():排序

  • 参数

    1. 函数(f : T => U),排序的规则
    2. ascending:Boolean = true,默认为true,升序,false则为降序
  • 功能说明:

    用于数据排序,数据通过f函数进行处理,然后按照结果进行排序,默认为升序。

    排序后的分区数与原来的分区数量一致

  • 代码说明:

     val rdd: RDD[Int] = sc.makeRDD(List(2, 1, 3, 4, 6, 5))
    
    // 默认是升序排
    val sortRdd: RDD[Int] = rdd.sortBy(num => num)
    sortRdd.collect().foreach(println)
    //  配置为倒序排
    val sortRdd2: RDD[Int] = rdd.sortBy(num => num, false)
    sortRdd2.collect().foreach(println)
    

14.pipe():调用脚本

  • 编写一个shell脚本在linux系统中启动spark并执行脚本

2)双value类型算子

  • 对2个RDD使用的转换算子

1.intersection()交集

  • 参数: (other:RDD [T] )

  • **返回值:**RDD [T]

  • 功能说明:

    传入一个RDD与自己求交集(即共同的元素)返回一个新的RDD

  • 代码说明:

    // 创建第一个RDD
    val rdd1: RDD[Int] = sc.makeRDD(1 to 5)
    
    // 创建第二个RDD
    val rdd2: RDD[Int] = sc.makeRDD(3 to 8)
    
    // 计算第一个RDD与第二个RDD的交集并打印
    rdd1.intersection(rdd2).collect().foreach(println)
    
    //打印结果:
    3 
    4
    5
    
  • 如果两个RDD分区数不同,返回RDD分区数 = 分区数较大RDD分区数

2.union()并集:不去重

  • 参数:(other:RDD [T] )

  • 返回值: RDD [T]

  • 功能说明: 对源RDD和参数RDD求并集(所有元素且不去重)后返回新RDD

  • 代码说明:

        // 创建第一个RDD
        val rdd1: RDD[Int] = sc.makeRDD(1 to 4)
    
        // 创建第二个RDD
        val rdd2: RDD[Int] = sc.makeRDD(4 to 8)
    
        // 计算两个RDD的并集
        rdd1.union(rdd2).collect().foreach(x => print(x+" "))
    
    	//打印结果:
    	1 2 3 4 4 5 6 7 8 
    
  • union后的RDD分区数等于两个RDD分区数之和

3.subtract()差集

  • 参数: (other:RDD [ T ] )

  • 返回值: RDD [ T ]

  • 功能说明:

    返回源RDD中元素减去参数RDD中元素,形成新RDD

  • 代码说明:

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(1 to 4)
    
        // 创建第二个RDD
        val rdd1: RDD[Int] = sc.makeRDD(4 to 8)
    
        // 计算第一个RDD与第二个RDD的差集并打印
        rdd.subtract(rdd1).collect().foreach(x => print(x+" "))
    
    //打印结果:
    1 2 3
    
    • 返回RDD分区数 = 源RDD的分区数

4.zip()拉链

  • 参数: (other:RDD [ U ])

  • 返回值: RDD[ (T , U) ]

  • 功能说明:

    将两个RDD中的元素,以二元组键值对的形式进行合并。其中,key为源RDD中元素,value为参数RDD中的元素。

    两个RDD的分区数量和元素数量必须都相同才能进行zip,否则抛出异常。

  • 代码说明:

        // 创建第一个RDD
        val rdd1: RDD[Int] = sc.makeRDD(Array(1,2,3),3)
    
        // 创建第二个RDD
        val rdd2: RDD[String] = sc.makeRDD(Array("a","b","c"),3)
    
        // 第一个RDD组合第二个RDD并打印
        rdd1.zip(rdd2).collect().foreach(x => print(x+" "))
        println()
        // 第二个RDD组合第一个RDD并打印
        rdd2.zip(rdd1).collect().foreach(x => print(x+" "))
    
    	//打印结果:
    	(1,a) (2,b) (3,c) 
    	(a,1) (b,2) (c,3) 
    
    • 拉链后返回RDD分区数和计算的两个RDD分区数相同

3)(Key,Value)类型

  • 作用于元素为(key,value)类型的RDD的转换算子

1.partitionBy():按照key将元素重新分区

  • 参数: (partitioner:Partitioner):参数为一个分区器

  • 返回值: RDD [ (K,V) ]

  • 功能说明:

    将RDD中的K按照指定Partitioner重新进行分区

    如果原有的RDD和新RDD是一致的话,就不进行分区,否则会产生Shuffle过程

  • 代码说明: 自定义一个分区器

      /**
       * 自定义分区器的步骤:
       *  1.继承org.apache.spark.Partitioner类
       *  2.实现抽象方法:
       *            numPartitions:Int   返回自定义分区器的分区个数
       *            getPartition(key:Any):Int   返回给定Key的分区编号(0到numpartitions-1)
       *            equals():判断分区器对象和其他分区器是否相等。
       */
      
      //继承Partitioner类,重写抽象方法
      class MyPartitoner(num:Int) extends Partitioner {
        
        //返回分区个数
        override def numPartitions: Int = num
        
        //将key进行匹配,小于等于2的进入0号分区,其余进入1号分区
        override def getPartition(key: Any): Int = {
          key match {
            case i:Int if i<=2 => 0
            case _ => 1
          }
        }
      }
    

2.reduceByKey():按照key聚合value

  • 参数: 参数为一个函数,或者函数+分区数

    ​ (func : (V,V) => V )

    ​ (func : (V,V) => V , numPartitions:Int)

  • 返回值: RDD [ (K , V ) ]

  • 功能说明:

    • 传递一个函数func,将RDD[K,V] 中的元素按照相同K的元素对V进行聚合。返回新RDD元素为KV形式二元组,K为聚合的K,V为源RDD的V经过参数(函数)聚合结果值。

    • 如:(res,value)=> (res+value)作为参数,相同K的元素中,第一个元素的V值为res,遍历相同K元素的所有V,执行 res+=value 返回res。实现将所有K相同的元素将value相加求和的计算。

    • 可以通过numPartitions的值设置返回新RDD的分区个数

  • 代码说明:

    	// 创建第一个RDD
        val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
    
        // 计算相同key对应值的相加结果
        val reduce: RDD[(String, Int)] = rdd.reduceByKey((v1,v2) => v1+v2)
    
        // 打印结果
        reduce.collect().foreach(println)
    
    	//打印输出:
    	(a,6)
    	(b,7)
    
    • reduceByKey()会在一个分区内先进行相同K的元素聚合。将每个分区结果再同其他分区进行计算。即会预先进行聚合(预聚合)再执行Shuffle。

3.groupByKey():按照key重新分组

  • 参数: 空参数或指定分区数或分区器

  • 返回值: RDD[ (K, Iterable[V]) ]

  • 功能说明:

    对源RDD总K相同的元素进行分组,分组后返回一个KV二元组,K为源RDD的K,V为一个集合Seq。

    即Itertable,元素为所有源RDD中相同K的元素的V的集合。

  • 代码说明:

        // 创建第一个RDD
        val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
    
        // 将相同key对应值聚合到一个Seq中
        val group: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    
        // 打印结果
        group.collect().foreach(println)
    
        // 计算相同key对应值的相加结果
        group.map(t=>(t._1,t._2.sum)).collect().foreach(println)
    
    	// 打印输出:
    	(a,CompactBuffer(1, 5))
    	(b,CompactBuffer(5, 2))
    	(a,6)
    	(b,7)
    

4.reduceByKey()和groupBykey()的区别

  1. reduceByKey()按照相同K的元素进行聚合。先在分区内进行预聚合。而gouupByKey()没有预聚合,直接Shuffle取出所有分区元素进行计算。

  2. 运算效果相同时,优选reduceByKey()。因为其预聚合功能减少Shuffle任务量。提高执行效率。

5.aggregateByKey():按照key处理分区内和分区间逻辑

  • 参数:(ZeroValue:U)(seqOp:(U,V=>U),combOp(U,U)=> U)

    ​ zeroValue:初始值,每个分区中相同K计算的初始值

    ​ seqOp:分区内计算函数

    ​ combOp:分区间计算函数

  • **返回值:**RDD [ (K,U) ]

  • 功能说明:

    可以设置分区内V计算的初始值进行区内相同K的元素之间V的计算,且初始值对每个分区内都生效。区间也可以设置单独计算函数

  • 代码说明:

 //创建第一两个分区个RDD,区内计算最大值,区间进行求和
 val rdd: RDD[(String, Int)] = sc.makeRDD(
    List(("a",1),("a",3),("a",5),("b",7),("b",2),("b",4),("b",6),("a",7)), 2)

// 取出每个分区相同key对应值的最大值,然后相加
    val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(0)(
      (res, elem) => math.max(res, elem),
      (res, elem) => res + elem
    )
    rdd1.foreach(println)
//打印输出:
(b,13)
(a,12)

6.foldByKey():分区内和分区间逻辑相同的aggregateByKey()

  • 参数: (zeroValue:V)(func:(V,V)=>V)

    ​ zeroValue:每个区计算的初始值

    ​ func:区内和区间计算的函数,即区内和区间计算逻辑相同

  • 返回值: RDD [ (K,V) ]

  • 功能说明:

    是aggregateByKey在区内和区间计算逻辑相同时候的简化操作。

  • 代码说明:

        // 创建第一个RDD
        val list: List[(String, Int)] = 
         List(("a",1),("a",3),("a",5),("b",7),("b",2),("b",4),("b",6),("a",7))
        val rdd = sc.makeRDD(list,2)
    
        // 求wordcount
        //rdd.aggregateByKey(0)(_+_,_+_).collect().foreach(println)
    
        rdd.foldByKey(0)(_+_).collect().foreach(println)
    
    	//打印输出:
    	(b,19)
    	(a,16)
    

7.combineByKey():转换结构后分区内和分区间操作

  • 参数:

    def combineByKey[C](
      createCombiner: V => C,  // 参数1:可将分区内所有元素转换数据结构
    
      mergeValue: (C, V) => C,   //参数2:分区内计算逻辑,需申明数据类型
    
      mergeCombiners: (C, C) => C   //参数3: 分区间逻辑,需申明数据类型
    ): RDD[(K, C)]
    
  • **返回值:**RDD[(K, C)]

  • 功能说明:

    可以将每个元素的V转换数据结构再参与分区内和分区间对相同K的元素中,通过转换后V值进行逻辑计算。

  • 代码说明:

// 创建第一个RDD
val list: List[(String, Int)] = 
	List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val rdd: RDD[(String, Int)] = sc.makeRDD(list, 2)

// 将相同key对应的值相加,同时记录该key出现的次数,放入一个二元组
val combineRdd: RDD[(String, (Int, Int))] = rdd.combineByKey(
  (_, 1),
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

// 打印合并后的结果
combineRdd.collect().foreach(println)

// 计算平均值
combineRdd.map {
  case (key, value) => {
    (key, value._1 / value._2.toDouble)
  }
}.collect().foreach(println)
// 打印输出:
(b,(286,3))
(a,(274,3))
(b,95.33333333333333)
(a,91.33333333333333)

8.reduceByKey,foldByKey,aggregateByKey,combineByKey

  • reduceByKey():没有初始值,分区内和分区间计算逻辑相同

  • foldByKey():有初始值,分区内和分区间计算逻辑相同

  • aggregateByKey():有初始值,分区内和分区间计算逻辑可以不同

  • combineByKey():

    ​ 有初始值,且可以转换初始值数据结构。分区内和分区间计算逻辑可以不同

9.sortByKey():按照key进行排序

  • 参数:

    def sortByKey(
         ascending: Boolean = true, // 参数1:默认为true,升序,false则为降序
         numPartitions: Int = self.partitions.length //参数2:默认分区数保持不变,											可以设置排序后返回RDD的分区数
    )  : RDD[(K, V)]
    
  • **返回值:**RDD[(K, V)]:排序后RDD

  • 功能说明:

    按照指定规则对源RDD中元素按照K进行排序,返回排序后的新RDD

  • 代码说明:

        // 创建第一个RDD
        val rdd: RDD[(Int, String)] =
          sc.makeRDD(Array((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))
    
        // 按照key的正序(默认顺序)
        rdd.sortByKey(true).collect().foreach(elem => print(elem+" "))
      	println()
        // 按照key的倒序
        rdd.sortByKey(false).collect().foreach(elem => print(elem+" "))
    
    	//打印输出:
    	(1,dd) (2,bb) (3,aa) (6,cc) //正序
    	(6,cc) (3,aa) (2,bb) (1,dd)	//倒序
    

10.mapValues():只对value进行操作

  • 参数: (f: V => U) :函数,对源RDD所有元素中V进行逻辑计算

  • **返回值:**RDD[(K, U)] :返回值K不变,V为经过函数f计算后结果

  • 功能说明:

    针对源RDD中元素(K,V)只进行V的操作,返回K不变V为计算后结果

  • 代码说明:

        // 创建第一个RDD
        val rdd: RDD[(Int, String)] = 
    		sc.makeRDD(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
    
        // 对value添加字符串"|||"
        rdd.mapValues(_ + "|||").collect().foreach(println)
    
    	//打印输出:
    	(1,a|||)
    	(1,d|||)
    	(2,b|||)
    	(3,c|||)
    

11.join():等同于SQL中内连接

  • 参数: (other: RDD[(K, W)]) :参数为一个KV结构的

  • 返回值: RDD[(K, (V, W))] :返回K不变,V为两个相同K的V组合的二元组

  • 功能说明:

    在类型均为(K,V)结构的RDD上进行join,返回一个相同K对应所有元素V组成的元组

  • 代码说明:

    val rdd1: RDD[(Int, String)] = 
    				sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"),(1, "z")))
    val rdd2: RDD[(Int, Int)] =
    				sc.makeRDD(Array((1, 4), (2, 5), (4, 6),(1, 7)))
    
    
    val result: RDD[(Int, (String, Int))] = rdd1.join(rdd2)
    
    result.collect().foreach(println)
    
    //打印输出:
    (1,(a,4))
    (1,(a,7))
    (1,(z,4))
    (1,(z,7))
    (2,(b,5))
    
    • 对源RDD中出现相同K情况,会分别与参数RDD中所有相同K的元素依次join。形成K不变V为组合的二元组
    • 若源RDD和参数RDD中只有一个RDD有的K值,不会进行关联

12.cogroup():类似SQL中全连接,但在同一个RDD中对key进行聚合

  • 参数: (other: RDD[(K, W)]):参数为一个KV结构的RDD

  • 返回值: RDD[(K, (Iterable[V], Iterable[W]))] :返回值为一个KV结构,且KV均为集

  • **功能说明: **

    • 在类型均为KV结构的RDD上进行cogroup(),返回一个KV均为集合的RDD,

      K为作用的两个RDD相同的K,V为每个RDD中K相同元素的V的集合。

    • 对于仅有一个RDD中的K。形成K不变,value为一个RDD中vlaue的集合和一个空集合。

  • **代码说明: **

val rdd1: RDD[(Int, String)] = 
					sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"),(1,"z")))
val rdd2: RDD[(Int, Int)] =
					sc.makeRDD(Array((1,4),(2,5),(4,6),(1,100)))

val result: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2)

result.collect().foreach(println)
//打印输出:
(1,(CompactBuffer(a, z),CompactBuffer(4, 100)))
(2,(CompactBuffer(b),CompactBuffer(5)))
(3,(CompactBuffer(c),CompactBuffer()))
(4,(CompactBuffer(),CompactBuffer(6)))

五、Action行动算子

  • 行动算子是触发整个作业的执行。原因在于转换算子都是懒加载,不会立即执行,等到遇到行动算子才开始执行整个流程。

1.reduce():聚合

  • 参数: (f: (T, T) => T):参数为一个函数

  • 返回值: T:返回经过函数计算后的结果

  • **功能说明: **

    函数f作用于RDD中的所有元素,先聚合分区内数据,再聚合分区间的数据

  • **代码说明: **

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
        // 聚合数据
        val reduceResult: Int = rdd.reduce(_+_)
        println(reduceResult)
    
    
    	//打印输出:
    	10
    

2.collect():以数组的形式返回数据集

  • 参数: 无参数

  • 返回值: Array[T]:将RDD数据以数组形式返回

  • **功能说明: **

    • 以数据Array的形式返回RDD的所有元素
    • collect()会将所有元素拉取到Driver端,谨慎使用
  • **代码说明: **

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
        // 收集数据到Driver
        rdd.collect().foreach(println)
    
    //打印输出:
    1
    2
    3
    4
    

3.count():返回RDD中元素的个数

  • 参数: 无参数

  • 返回值: Long:RDD中元素个数

  • **功能说明: **

    返回RDD中元素的个数

  • **代码说明: **

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
        // 返回RDD中元素的个数
        val countResult: Long = rdd.count()
        println(countResult)
    
    	//打印输出:
    	4
    

4.fitst():返回RDD中第一个元素

  • 参数: 无参数

  • 返回值: T:RDD中第一个元素

  • **功能说明: **

    返回RDD中第一个元素

  • **代码说明: **

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
        // 返回RDD中元素的个数
        val firstResult: Int = rdd.first()
        println(firstResult)
    
    	//打印输出:
    	1
    

5.take():返回由RDD前n个元素组成的数据

  • 参数: (num: Int):元素的个数

  • 返回值: Array[T]:返回参数个数的元素

  • **功能说明: **

    返回一个由RDD前n个元素组成的数组

  • **代码说明: **

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
        // 返回RDD中前2个元素
        val takeResult: Array[Int] = rdd.take(2)
        println(takeResult.mkString(","))
    
    	//打印输出:
    	1,2
    

6.takeOrdered():返回RDD排序后前n个元素组成的数组

  • 参数: (num: Int)(implicit ord: Ordering[T])

    ​ num:取出元素的个数

    ​ implicit ord:排序的方式,默认为增序

  • 返回值: Array[T] :以数据形式返回

  • **功能说明: **

    返回RDD排序后前n个元素组成的数据

  • **代码说明: **

        val rdd = sc.parallelize(List(999,1,3,5,7,9,2,4,6,8,10),5)
    	
    	//倒序取出前4个元素
        val ints: Array[Int] = rdd.takeOrdered(4)(Ordering[Int].reverse)
    
        println(ints.mkString(","))
    
    	//打印输出:
    	999,10,9,8
    

7.aggregate()

  • 参数:(ZeroValue:U)(seqOp:(U,V=>U),combOp(U,U)=> U)

    ​ zeroValue:初始值,每个分区中相同K计算的初始值

    ​ seqOp:分区内计算函数

    ​ combOp:分区间计算函数

  • 返回值: U:经过函数计算后结果

  • **功能说明: **

    aggregate将每个分区内按照seqOP函数计算,分区间按照combOP计算。聚合时每个分区内计算使用初始值ZeroValue,分区间计算也使用一次ZeroValue,这点与aggregateByKey()不同。

  • **代码说明: **

        val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4),8)
    
        //与aggregateByKey不同,分区内每次计算使用初始值,分区间计算时也使用一次初始值
        //11+12+13+14+10*4+10
        val i: Int = rdd.aggregate(10)((res,elem)=>res+elem,(res,elem)=> res+elem)
    
        println(i)
    
    	//打印输出:
    	100
    

8.fold()

  • 参数: (zeroValue:V)(func:(V,V)=>V)

    ​ zeroValue:每个区计算的初始值

    ​ func:区内和区间计算的函数,即区内和区间计算逻辑相同

  • 返回值: V:返元素经过func计算结果

  • **功能说明: **

    aggregate在分区内和分区间计算逻辑相同时的简化操作。ZeroValue使用和aggregate相同。

  • **代码说明: **

        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),8)
    	
    	//分区内和分区间计算逻辑相同
        val i: Int = rdd.fold(10)(_+_)
    
        println(i)
    
    	//打印输出:
    	100
    

9.countByKey():统计每种Key的个数

  • 参数: 无参数

  • 返回值: Map[K, Long]:

  • **功能说明: **

    统计相同K的元素的个数

  • **代码说明: **

    val rdd: RDD[(Int, String)] =
       sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3,"c")))
    
    val map: collection.Map[Int, Long] = rdd.countByKey()
    
    println(map)
    
    //打印输出:
    Map(1 -> 3, 2 -> 1, 3 -> 2)
    

10.save相关算子

  • 参数:

    1. saveAsTextFile(path)保存成Text文件
    2. saveAsSequenceFile(path) 保存成Sequencefile文件
    3. saveAsObjectFile(path) 序列化成对象保存到文件
  • 返回值:

  • **功能说明: **

    1. 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    2. 将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。(只有kv类型RDD有该操作,单值的没有)
    3. 用于将RDD中的元素序列化成对象,存储到文件中。
  • **代码说明: **

            // 创建第一个RDD
            val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
    
            // 保存成Text文件
            rdd.saveAsTextFile("output")
    
            // 序列化成对象保存到文件
            rdd.saveAsObjectFile("output1")
    
            // 保存成Sequencefile文件
            rdd.map((_,1)).saveAsSequenceFile("output2")
    

11.foreach(f):遍历RDD中每一个元素

  • 参数: (f:T=>Unit):对RDD中元素遍历后操作

  • 返回值: Unit

  • **功能说明: **

    • 遍历RDD中每一个元素,一次应用函数f
    • 与collect()后在foreach不同在于foreach算子会多线程并行遍历,输出结果是无序的。
  • **代码说明: **

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
        // 收集后打印
        rdd.collect().foreach(elem => print(elem+" "))
    
      	println()
        println("****************")
    
        // foreach分布式打印
        rdd.foreach(elem => print(elem+" "))
    
    	//打印输出:
    	1 2 3 4 
    	****************
    	1 3 4 2 
    
  • 参数:

    1. saveAsTextFile(path)保存成Text文件
    2. saveAsSequenceFile(path) 保存成Sequencefile文件
    3. saveAsObjectFile(path) 序列化成对象保存到文件
  • 返回值:

  • **功能说明: **

    1. 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    2. 将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。(只有kv类型RDD有该操作,单值的没有)
    3. 用于将RDD中的元素序列化成对象,存储到文件中。
  • **代码说明: **

            // 创建第一个RDD
            val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
    
            // 保存成Text文件
            rdd.saveAsTextFile("output")
    
            // 序列化成对象保存到文件
            rdd.saveAsObjectFile("output1")
    
            // 保存成Sequencefile文件
            rdd.map((_,1)).saveAsSequenceFile("output2")
    

11.foreach(f):遍历RDD中每一个元素

  • 参数: (f:T=>Unit):对RDD中元素遍历后操作

  • 返回值: Unit

  • **功能说明: **

    • 遍历RDD中每一个元素,一次应用函数f
    • 与collect()后在foreach不同在于foreach算子会多线程并行遍历,输出结果是无序的。
  • **代码说明: **

        // 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
        // 收集后打印
        rdd.collect().foreach(elem => print(elem+" "))
    
      	println()
        println("****************")
    
        // foreach分布式打印
        rdd.foreach(elem => print(elem+" "))
    
    	//打印输出:
    	1 2 3 4 
    	****************
    	1 3 4 2 
    

分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进