案例一:分析tomcat的访问日志,求访问量最高的两个网页
1、对每个jps的访问量求和
2、排序
3、取前两条记录
结果:ArrayBuffer((oracle.jsp,9), (hadoop.jsp,9))
代码:
package log
import org.apache.spark.{SparkConf, SparkContext}
object AnalizeLog {
def main(args: Array[String]): Unit = {
//创建一个Context对象
val conf = new SparkConf().setAppName("MyTomcatLogCount").setMaster("local");
val sc = new SparkContext(conf)
/**
*
* 读入日志,解析每行日志,找到访问的jsp网页
*
* */
val rdd1 = sc.textFile("D:\\BigData\\sc\\localhost_access_log.2017-07-30.txt").map(
line => {
//解析字符串,找到jsp的名字
//1、得到两个双引号的位置
//192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
//192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
val index1 = line.indexOf("\"") //需要转义
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1,index2) //得到两个空格的位置 GET /MyDemoWeb/oracle.jsp HTTP/1.1
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4) // /MyDemoWeb/oracle.jsp
//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/")+1) //得到xxx.jsp
//返回
(jspName,1)
//("hadoop.jsp",1)
}
)
//按照jsp的名字进行聚合操作 类似wordcount
val rdd2 = rdd1.reduceByKey(_+_)
//排序,按照value进行排序
val rdd3 = rdd2.sortBy(_._2,false)
//取出访问量最大的两个网页
println("访问量最大的两个网页是:"+rdd3.take(2).toBuffer)
sc.stop()
}
}
案例二:分析tomcat的访问日志,根据网页的名字进行分区(类似MapReduce中的自定义分区)
结果: 网页的名字 访问日志
oracle.jsp 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
oracle.jsp 192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
代码
package log
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable.HashMap
object AnalizeLog {
def main(args: Array[String]): Unit = {
//创建一个Context对象
val conf = new SparkConf().setAppName("MyTomcatLogCount").setMaster("local");
val sc = new SparkContext(conf)
/**
*
* 读入日志,解析每行日志,找到访问的jsp网页
*
* */
val rdd1 = sc.textFile("D:\\BigData\\sc\\localhost_access_log.2017-07-30.txt").map(
line => {
//解析字符串,找到jsp的名字
//1、得到两个双引号的位置
//192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
//192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
val index1 = line.indexOf("\"") //需要转义
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1,index2) //得到两个空格的位置 GET /MyDemoWeb/oracle.jsp HTTP/1.1
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4) // /MyDemoWeb/oracle.jsp
//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/")+1) //得到xxx.jsp
//返回
(jspName,line)
}
)
val rdd2 = rdd1.map(_._1).distinct().collect()//去重,聚合
//创建分区规则
val myPartitioner = new MyWebPartitioner(rdd2)
//对rdd1 进行分区
val rdd3 = rdd1.partitionBy(myPartitioner)
//输出
rdd3.saveAsTextFile("D:\\BigData\\sc\\tmp")
sc.stop()
}
class MyWebPartitioner(jspList:Array[String]) extends Partitioner{
//定义一个集合来保存分区的条件
//String :代表jsp的名字,Int 代表对应的分区号
val partitionMap = new HashMap[String ,Int]()
var partID = 0 //分区号
for (jsp <- jspList){
//有一个jsp 建立一个分区
partitionMap.put(jsp,partID)
partID += 1 //分区号
}
//返回多少个分区
override def numPartitions: Int = partitionMap.size
//根据jsp的名字,得到对应的分区
override def getPartition(key: Any): Int = {
partitionMap.getOrElse(key.toString,0)
}
}
}
案例三:把上面分析的结果,保存到Oracle中(知识点:在哪里建立Connection?): 对于非序列化的对象,如何处理?