博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
024 关于spark中日志分析案例
阅读量:7072 次
发布时间:2019-06-28

本文共 7009 字,大约阅读时间需要 23 分钟。

1.四个需求

  需求一:求contentsize的平均值、最小值、最大值

  需求二:请各个不同返回值的出现的数据 ===> wordCount程序

  需求三:获取访问次数超过N次的IP地址

  需求四:获取访问次数最多的前K个endpoint的值 ==> TopN

 

2.主程序LogAnalyzer.scala

1 package com.ibeifeng.bigdata.spark.core.log  2   3 import org.apache.spark.rdd.RDD  4 import org.apache.spark.{SparkConf, SparkContext}  5   6 /**  7   * Apache日志分析  8   * Created by ibf on 01/15.  9   */ 10 object LogAnalyzer { 11   def main(args: Array[String]): Unit = { 12     val conf = new SparkConf() 13       .setAppName("log-analyzer") 14       .setMaster("local[*]") 15       .set("spark.eventLog.enabled", "true") 16       .set("spark.eventLog.dir", "hdfs://hadoop-senior01:8020/spark-history") 17     val sc = SparkContext.getOrCreate(conf) 18  19     // ================日志分析具体代码================== 20     // HDFS上日志存储路径 21     val path = "/beifeng/spark/access/access.log" 22  23     // 创建rdd 24     val rdd = sc.textFile(path) 25  26     // rdd转换,返回进行后续操作 27     val apacheAccessLog: RDD[ApacheAccessLog] = rdd 28       // 过滤数据 29       .filter(line => ApacheAccessLog.isValidateLogLine(line)) 30       .map(line => { 31         // 对line数据进行转换操作 32         ApacheAccessLog.parseLogLine(line) 33       }) 34  35     // 对多次时候用的rdd进行cache 36     apacheAccessLog.cache() 37  38     // 需求一:求contentsize的平均值、最小值、最大值 39     /* 40     * The average, min, and max content size of responses returned from the server. 41     * */ 42     val contentSizeRDD: RDD[Long] = apacheAccessLog 43       // 提取计算需要的字段数据 44       .map(log => (log.contentSize)) 45  46     // 对重复使用的RDD进行cache 47     contentSizeRDD.cache() 48  49     // 开始计算平均值、最小值、最大值 50     val totalContentSize = contentSizeRDD.sum() 51     val totalCount = contentSizeRDD.count() 52     val avgSize = 1.0 * totalContentSize / totalCount 53     val minSize = contentSizeRDD.min() 54     val maxSize = contentSizeRDD.max() 55  56     // 当RDD不使用的时候,进行unpersist 57     contentSizeRDD.unpersist() 58  59     // 结果输出 60     println(s"ContentSize Avg:${avgSize}, Min: ${minSize}, Max: ${maxSize}") 61  62     // 需求二:请各个不同返回值的出现的数据 ===> wordCount程序 63     /* 64     * A count of response code's returned. 65     * */ 66     val responseCodeResultRDD = apacheAccessLog 67       // 提取需要的字段数据, 转换为key/value键值对,方便进行reduceByKey操作 68       // 当连续出现map或者flatMap的时候,将多个map/flatMap进行合并 69       .map(log => (log.responseCode, 1)) 70       // 使用reduceByKey函数,按照key进行分组后,计算每个key出现的次数 71       .reduceByKey(_ + _) 72  73     // 结果输出 74     println(s"""ResponseCode :${responseCodeResultRDD.collect().mkString(",")}""") 75  76     // 需求三:获取访问次数超过N次的IP地址 77     // 需求三额外:对IP地址进行限制,部分黑名单IP地址不统计 78     /* 79     * All IPAddresses that have accessed this server more than N times. 80     * 1. 计算IP地址出现的次数 ===> WordCount程序 81     * 2. 数据过滤 82     * */ 83     val blackIP = Array("200-55-104-193.dsl.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net") 84     // 由于集合比较大,将集合的内容广播出去 85     val broadCastIP = sc.broadcast(blackIP) 86     val N = 10 87     val ipAddressRDD = apacheAccessLog 88       // 过滤IP地址在黑名单中的数据 89       .filter(log => !broadCastIP.value.contains(log.ipAddress)) 90       // 获取计算需要的IP地址数据,并将返回值转换为Key/Value键值对类型 91       .map(log => (log.ipAddress, 1L)) 92       // 使用reduceByKey函数进行聚合操作 93       .reduceByKey(_ + _) 94       // 过滤数据,要求IP地址必须出现N次以上 95       .filter(tuple => tuple._2 > N) 96     // 获取满足条件IP地址, 为了展示方便,将下面这行代码注释 97     //      .map(tuple => tuple._1) 98  99     // 结果输出100     println(s"""IP Address :${ipAddressRDD.collect().mkString(",")}""")101 102     // 需求四:获取访问次数最多的前K个endpoint的值 ==> TopN103     /*104     * The top endpoints requested by count.105     * 1. 先计算出每个endpoint的出现次数106     * 2. 再进行topK的一个获取操作,获取出现次数最多的前K个值107     * */108     val K = 10109     val topKValues = apacheAccessLog110       // 获取计算需要的字段信息,并返回key/value键值对111       .map(log => (log.endpoint, 1))112       // 获取每个endpoint对应的出现次数113       .reduceByKey(_ + _)114       // 获取前10个元素, 而且使用我们自定义的排序类115       .top(K)(LogSortingUtil.TupleOrdering)116     // 如果只需要endpoint的值,不需要出现的次数,那么可以通过map函数进行转换117     //      .map(_._1)118 119     // 结果输出120     println(s"""TopK values:${topKValues.mkString(",")}""")121 122 123     // 对不在使用的rdd,去除cache124     apacheAccessLog.unpersist()125 126     // ================日志分析具体代码==================127 128     sc.stop()129   }130 }

 

3.需要的辅助类一(返回匹配的日志)

1 package com.ibeifeng.bigdata.spark.core.log 2  3 import scala.util.matching.Regex 4  5 /** 6   * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 7   * Created by ibf on 01/15. 8   */ 9 case class ApacheAccessLog(10                             ipAddress: String, // IP地址11                             clientId: String, // 客户端唯一标识符12                             userId: String, // 用户唯一标识符13                             serverTime: String, // 服务器时间14                             method: String, // 请求类型/方式15                             endpoint: String, // 请求的资源16                             protocol: String, // 请求的协议名称17                             responseCode: Int, // 请求返回值:比如:200、40118                             contentSize: Long // 返回的结果数据大小19                           )20 21 /**22   * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 1284623   * on 01/15.24   * 提供一些操作Apache Log的工具类供SparkCore使用25   */26 object ApacheAccessLog {27   // Apache日志的正则28   val PARTTERN: Regex =29   """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r30 31   /**32     * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false33     *34     * @param line35     * @return36     */37   def isValidateLogLine(line: String): Boolean = {38     val options = PARTTERN.findFirstMatchIn(line)39 40     if (options.isEmpty) {41       false42     } else {43       true44     }45   }46 47   /**48     * 解析输入的日志数据49     *50     * @param line51     * @return52     */53   def parseLogLine(line: String): ApacheAccessLog = {54     if (!isValidateLogLine(line)) {55       throw new IllegalArgumentException("参数格式异常")56     }57 58     // 从line中获取匹配的数据59     val options = PARTTERN.findFirstMatchIn(line)60 61     // 获取matcher62     val matcher = options.get63 64     // 构建返回值65     ApacheAccessLog(66       matcher.group(1), // 获取匹配字符串中第一个小括号中的值67       matcher.group(2),68       matcher.group(3),69       matcher.group(4),70       matcher.group(5),71       matcher.group(6),72       matcher.group(7),73       matcher.group(8).toInt,74       matcher.group(9).toLong75     )76   }77 }

 

4.需要的辅助类二(自定义的一个二元组的比较器,方便进行TopN)

1 package com.ibeifeng.bigdata.spark.core.log 2  3 /** 4   * Created by ibf on 01/15. 5   */ 6 object LogSortingUtil { 7  8   /** 9     * 自定义的一个二元组的比较器10     */11   object TupleOrdering extends scala.math.Ordering[(String, Int)] {12     override def compare(x: (String, Int), y: (String, Int)): Int = {13       // 按照出现的次数进行比较,也就是按照二元组的第二个元素进行比较14       x._2.compare(y._2)15     }16   }17 18 }

 

转载地址:http://wwkml.baihongyu.com/

你可能感兴趣的文章
Oracle Goldengate Director软件截面图
查看>>
shell $$ 的详解
查看>>
重装linux服务器后开不了机
查看>>
Linq to entity 执行多个字段排序的方法
查看>>
nginx开启后主机无法访问虚拟机的nginx解决方案
查看>>
centos7 安装 keepalived
查看>>
oracle 查找OS进程id
查看>>
linux--mariadb数据库
查看>>
BlockChange | 区块链将如何颠覆金融服务业
查看>>
springmvc的@RequestMapping、@PathVariable、@RequestParam
查看>>
SQLite第二课 源码下载编译
查看>>
ibatis动态生成列时的列名无效
查看>>
通用汽车新增130辆测试无人车,配激光雷达
查看>>
python之通过“反射”实现不同的url指向不同函数进行处理(反射应用一)
查看>>
10.6 监控io性能;10.7 free;10.8 ps;10.9 查看网络状态;10.10 抓包
查看>>
delegate的用法
查看>>
Ubuntu <2TB sdb preseed示例
查看>>
Android开发之旅:组件生命周期(二)
查看>>
使用LVS+NAT搭建集群实现负载均衡
查看>>
LVM 磁盘分区扩容
查看>>