Spark之常用Action算子

1、collect
collect 回收算子,会将结果回收到Driver端,如果结果比较大,就不要回收,这样的话会造成Driver端的OOM。

 def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("foreach").setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/words")
    val result: Array[String] = lines.collect()
    result.foreach(println)
  }

2、collectAsMap
将K,V格式的RDD回收到Driver端作为Map使用

def main(args: Array[String]): Unit = {
    val conf  = new SparkConf()
    conf.setMaster("local").setAppName("collectAsMap")
    val sc = new SparkContext(conf)
    val weightInfos = sc.parallelize(List[(String,Double)](new Tuple2("zhangsan",78.4),new Tuple2("lisi",32.6),new Tuple2("wangwu",90.9)))
    val stringToDouble: collection.Map[String, Double] = weightInfos.collectAsMap()
    stringToDouble.foreach(tp=>{println(tp)})
    sc.stop()
  }

3、count
统计RDD共有多少行数据

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("count")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/sampleData.txt")
    val result: Long = lines.count()
    println(result)
    sc.stop()
  }

4、countByKey
统计相同的key 出现的个数

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("countByKey").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(List[(String,Integer)](("a",1),("a",100),("a",1000),("b",2),("b",200),("c",3)))
    val result: collection.Map[String, Long] = rdd.countByKey()
    result.foreach(print)
  }

5、countByValue
计数RDD中相同的value 出现的次数,不必须是K,V格式的RDD

  def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("countByKey").setMaster("local")
      val sc = new SparkContext(conf)
      val rdd = sc.makeRDD(List[(String,Integer)](("a",1),("a",1),("a",1000),("b",2),("b",200),("c",3),("c",3)))
      val result: collection.Map[(String, Integer), Long] = rdd.countByValue()
      result.foreach(print)
  }

6、first
取出第一个元素

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("first")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/words")
    val str = lines.first()
    println(str)
  }

7、foreach
遍历RDD中的每个元素

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("foreach").setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/words")
    lines.foreach(println)
  }

8、foreachPartition

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("foreachPartition")
    val sc = new SparkContext(conf)
    val infos = sc.parallelize(List[String]("a","b","c","d","e","f","g"),4)
//    infos.foreachAsync()
  }

9、reduce

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("countByKey").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(Array[Int](1,2,3,4,5))
    val result: Int = rdd.reduce((v1, v2) => {
      v1 + v2
    })
    println(result)
  }

10、take
取出RDD中的前N个元素

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("take").setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/words")
    val array = lines.take(3)
    array.foreach(println)
}

11、takeSample

  • takeSample(withReplacement,num,seed)
  • 随机抽样将数据结果拿回Driver端使用,返回Array。
  • withReplacement:有无放回抽样
  • num:抽样的条数
  • num:种子
def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("takeSample")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/words")
    val result: Array[String] = lines.takeSample(false, 300, 100)
    result.foreach(println)
  }

更多精彩内容