1.四个需求
需求一:求contentsize的平均值、最小值、最大值
需求二:请各个不同返回值的出现的数据 ===> wordCount程序
需求三:获取访问次数超过N次的IP地址
需求四:获取访问次数最多的前K个endpoint的值 ==> TopN
2.主程序LogAnalyzer.scala
1 package com.ibeifeng.bigdata.spark.core.log 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 /** 7 * Apache日志分析 8 * Created by ibf on 01/15. 9 */ 10 object LogAnalyzer { 11 def main(args: Array[String]): Unit = { 12 val conf = new SparkConf() 13 .setAppName("log-analyzer") 14 .setMaster("local[*]") 15 .set("spark.eventLog.enabled", "true") 16 .set("spark.eventLog.dir", "hdfs://hadoop-senior01:8020/spark-history") 17 val sc = SparkContext.getOrCreate(conf) 18 19 // ================日志分析具体代码================== 20 // HDFS上日志存储路径 21 val path = "/beifeng/spark/access/access.log" 22 23 // 创建rdd 24 val rdd = sc.textFile(path) 25 26 // rdd转换,返回进行后续操作 27 val apacheAccessLog: RDD[ApacheAccessLog] = rdd 28 // 过滤数据 29 .filter(line => ApacheAccessLog.isValidateLogLine(line)) 30 .map(line => { 31 // 对line数据进行转换操作 32 ApacheAccessLog.parseLogLine(line) 33 }) 34 35 // 对多次时候用的rdd进行cache 36 apacheAccessLog.cache() 37 38 // 需求一:求contentsize的平均值、最小值、最大值 39 /* 40 * The average, min, and max content size of responses returned from the server. 41 * */ 42 val contentSizeRDD: RDD[Long] = apacheAccessLog 43 // 提取计算需要的字段数据 44 .map(log => (log.contentSize)) 45 46 // 对重复使用的RDD进行cache 47 contentSizeRDD.cache() 48 49 // 开始计算平均值、最小值、最大值 50 val totalContentSize = contentSizeRDD.sum() 51 val totalCount = contentSizeRDD.count() 52 val avgSize = 1.0 * totalContentSize / totalCount 53 val minSize = contentSizeRDD.min() 54 val maxSize = contentSizeRDD.max() 55 56 // 当RDD不使用的时候,进行unpersist 57 contentSizeRDD.unpersist() 58 59 // 结果输出 60 println(s"ContentSize Avg:${avgSize}, Min: ${minSize}, Max: ${maxSize}") 61 62 // 需求二:请各个不同返回值的出现的数据 ===> wordCount程序 63 /* 64 * A count of response code's returned. 65 * */ 66 val responseCodeResultRDD = apacheAccessLog 67 // 提取需要的字段数据, 转换为key/value键值对,方便进行reduceByKey操作 68 // 当连续出现map或者flatMap的时候,将多个map/flatMap进行合并 69 .map(log => (log.responseCode, 1)) 70 // 使用reduceByKey函数,按照key进行分组后,计算每个key出现的次数 71 .reduceByKey(_ + _) 72 73 // 结果输出 74 println(s"""ResponseCode :${responseCodeResultRDD.collect().mkString(",")}""") 75 76 // 需求三:获取访问次数超过N次的IP地址 77 // 需求三额外:对IP地址进行限制,部分黑名单IP地址不统计 78 /* 79 * All IPAddresses that have accessed this server more than N times. 80 * 1. 计算IP地址出现的次数 ===> WordCount程序 81 * 2. 数据过滤 82 * */ 83 val blackIP = Array("200-55-104-193.dsl.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net") 84 // 由于集合比较大,将集合的内容广播出去 85 val broadCastIP = sc.broadcast(blackIP) 86 val N = 10 87 val ipAddressRDD = apacheAccessLog 88 // 过滤IP地址在黑名单中的数据 89 .filter(log => !broadCastIP.value.contains(log.ipAddress)) 90 // 获取计算需要的IP地址数据,并将返回值转换为Key/Value键值对类型 91 .map(log => (log.ipAddress, 1L)) 92 // 使用reduceByKey函数进行聚合操作 93 .reduceByKey(_ + _) 94 // 过滤数据,要求IP地址必须出现N次以上 95 .filter(tuple => tuple._2 > N) 96 // 获取满足条件IP地址, 为了展示方便,将下面这行代码注释 97 // .map(tuple => tuple._1) 98 99 // 结果输出100 println(s"""IP Address :${ipAddressRDD.collect().mkString(",")}""")101 102 // 需求四:获取访问次数最多的前K个endpoint的值 ==> TopN103 /*104 * The top endpoints requested by count.105 * 1. 先计算出每个endpoint的出现次数106 * 2. 再进行topK的一个获取操作,获取出现次数最多的前K个值107 * */108 val K = 10109 val topKValues = apacheAccessLog110 // 获取计算需要的字段信息,并返回key/value键值对111 .map(log => (log.endpoint, 1))112 // 获取每个endpoint对应的出现次数113 .reduceByKey(_ + _)114 // 获取前10个元素, 而且使用我们自定义的排序类115 .top(K)(LogSortingUtil.TupleOrdering)116 // 如果只需要endpoint的值,不需要出现的次数,那么可以通过map函数进行转换117 // .map(_._1)118 119 // 结果输出120 println(s"""TopK values:${topKValues.mkString(",")}""")121 122 123 // 对不在使用的rdd,去除cache124 apacheAccessLog.unpersist()125 126 // ================日志分析具体代码==================127 128 sc.stop()129 }130 }
3.需要的辅助类一(返回匹配的日志)
1 package com.ibeifeng.bigdata.spark.core.log 2 3 import scala.util.matching.Regex 4 5 /** 6 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 7 * Created by ibf on 01/15. 8 */ 9 case class ApacheAccessLog(10 ipAddress: String, // IP地址11 clientId: String, // 客户端唯一标识符12 userId: String, // 用户唯一标识符13 serverTime: String, // 服务器时间14 method: String, // 请求类型/方式15 endpoint: String, // 请求的资源16 protocol: String, // 请求的协议名称17 responseCode: Int, // 请求返回值:比如:200、40118 contentSize: Long // 返回的结果数据大小19 )20 21 /**22 * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 1284623 * on 01/15.24 * 提供一些操作Apache Log的工具类供SparkCore使用25 */26 object ApacheAccessLog {27 // Apache日志的正则28 val PARTTERN: Regex =29 """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r30 31 /**32 * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false33 *34 * @param line35 * @return36 */37 def isValidateLogLine(line: String): Boolean = {38 val options = PARTTERN.findFirstMatchIn(line)39 40 if (options.isEmpty) {41 false42 } else {43 true44 }45 }46 47 /**48 * 解析输入的日志数据49 *50 * @param line51 * @return52 */53 def parseLogLine(line: String): ApacheAccessLog = {54 if (!isValidateLogLine(line)) {55 throw new IllegalArgumentException("参数格式异常")56 }57 58 // 从line中获取匹配的数据59 val options = PARTTERN.findFirstMatchIn(line)60 61 // 获取matcher62 val matcher = options.get63 64 // 构建返回值65 ApacheAccessLog(66 matcher.group(1), // 获取匹配字符串中第一个小括号中的值67 matcher.group(2),68 matcher.group(3),69 matcher.group(4),70 matcher.group(5),71 matcher.group(6),72 matcher.group(7),73 matcher.group(8).toInt,74 matcher.group(9).toLong75 )76 }77 }
4.需要的辅助类二(自定义的一个二元组的比较器,方便进行TopN)
1 package com.ibeifeng.bigdata.spark.core.log 2 3 /** 4 * Created by ibf on 01/15. 5 */ 6 object LogSortingUtil { 7 8 /** 9 * 自定义的一个二元组的比较器10 */11 object TupleOrdering extends scala.math.Ordering[(String, Int)] {12 override def compare(x: (String, Int), y: (String, Int)): Int = {13 // 按照出现的次数进行比较,也就是按照二元组的第二个元素进行比较14 x._2.compare(y._2)15 }16 }17 18 }