大数据学习29:Spark Core编程案例

案例一:分析tomcat的访问日志,求访问量最高的两个网页

	1、对每个jps的访问量求和
	2、排序
	3、取前两条记录
	结果:ArrayBuffer((oracle.jsp,9), (hadoop.jsp,9))

代码:

package log

import org.apache.spark.{SparkConf, SparkContext}


object AnalizeLog {
  def main(args: Array[String]): Unit = {
    //创建一个Context对象
    val conf = new SparkConf().setAppName("MyTomcatLogCount").setMaster("local");
    val sc = new SparkContext(conf)

    /**
      *
      * 读入日志,解析每行日志,找到访问的jsp网页
      *
      * */
    val rdd1 =  sc.textFile("D:\\BigData\\sc\\localhost_access_log.2017-07-30.txt").map(
      line => {
        //解析字符串,找到jsp的名字
        //1、得到两个双引号的位置
        //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
        //192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
        val index1 = line.indexOf("\"") //需要转义
        val index2 = line.lastIndexOf("\"")
        val line1 = line.substring(index1+1,index2)  //得到两个空格的位置 GET /MyDemoWeb/oracle.jsp HTTP/1.1
        val index3 = line1.indexOf(" ")
        val index4 = line1.lastIndexOf(" ")
        val line2 = line1.substring(index3+1,index4) //  /MyDemoWeb/oracle.jsp

        //得到jsp的名字
        val jspName = line2.substring(line2.lastIndexOf("/")+1) //得到xxx.jsp

        //返回
        (jspName,1)

        //("hadoop.jsp",1)
      }
    )
    //按照jsp的名字进行聚合操作 类似wordcount
    val rdd2 = rdd1.reduceByKey(_+_)

    //排序,按照value进行排序
    val rdd3 = rdd2.sortBy(_._2,false)
    //取出访问量最大的两个网页
    println("访问量最大的两个网页是:"+rdd3.take(2).toBuffer)
    sc.stop()
  }
}

案例二:分析tomcat的访问日志,根据网页的名字进行分区(类似MapReduce中的自定义分区)

	结果: 网页的名字    访问日志
	       oracle.jsp    192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
           oracle.jsp    192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242

代码

package log
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable.HashMap

object AnalizeLog {
  def main(args: Array[String]): Unit = {
    //创建一个Context对象
    val conf = new SparkConf().setAppName("MyTomcatLogCount").setMaster("local");
    val sc = new SparkContext(conf)

    /**
      *
      * 读入日志,解析每行日志,找到访问的jsp网页
      *
      * */
    val rdd1 =  sc.textFile("D:\\BigData\\sc\\localhost_access_log.2017-07-30.txt").map(
      line => {
        //解析字符串,找到jsp的名字
        //1、得到两个双引号的位置
        //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
        //192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
        val index1 = line.indexOf("\"") //需要转义
        val index2 = line.lastIndexOf("\"")
        val line1 = line.substring(index1+1,index2)  //得到两个空格的位置 GET /MyDemoWeb/oracle.jsp HTTP/1.1
        val index3 = line1.indexOf(" ")
        val index4 = line1.lastIndexOf(" ")
        val line2 = line1.substring(index3+1,index4) //  /MyDemoWeb/oracle.jsp

        //得到jsp的名字
        val jspName = line2.substring(line2.lastIndexOf("/")+1) //得到xxx.jsp

        //返回
        (jspName,line)
      }
    )
    
    val rdd2 = rdd1.map(_._1).distinct().collect()//去重,聚合

    //创建分区规则
    val myPartitioner = new MyWebPartitioner(rdd2)

    //对rdd1 进行分区
    val rdd3 = rdd1.partitionBy(myPartitioner)

    //输出
    rdd3.saveAsTextFile("D:\\BigData\\sc\\tmp")
    sc.stop()

  }

  class  MyWebPartitioner(jspList:Array[String]) extends Partitioner{
    //定义一个集合来保存分区的条件
    //String :代表jsp的名字,Int 代表对应的分区号
    val partitionMap = new HashMap[String ,Int]()
    var partID = 0 //分区号
    for (jsp <- jspList){
      //有一个jsp 建立一个分区
      partitionMap.put(jsp,partID)

      partID += 1 //分区号
    }

    //返回多少个分区
    override def numPartitions: Int = partitionMap.size

    //根据jsp的名字,得到对应的分区
    override def getPartition(key: Any): Int = {
      partitionMap.getOrElse(key.toString,0)
    }
  }
}

案例三:把上面分析的结果,保存到Oracle中(知识点:在哪里建立Connection?): 对于非序列化的对象,如何处理?


更多精彩内容