0


MapReduce 实践题:Web 访问日志分析与异常检测

文章目录

作业描述

MapReduce 实践题:Web 访问日志分析与异常检测

题目背景

你被要求设计和实现一个基于 MapReduce 的大规模 Web 访问日志分析与异常检测系统。该系统的目标是从每日数百万条访问日志中提取有用的信息,并检测出潜在的异常访问行为。访问日志文件格式如下:

127.0.0.1 - - [10/Oct/2021:13:55:36 -0700]"GET /index.html HTTP/1.1"2001043192.168.0.1 - - [10/Oct/2021:13:56:12 -0700]"POST /login HTTP/1.1"2002326...
数据集说明
  • IP 地址:例如,127.0.0.1
  • 时间戳:例如,[10/Oct/2021:13:55:36 -0700]
  • 请求方法:例如,"GET""POST"
  • 请求 URL:例如,"/index.html"
  • HTTP 响应码:例如,200404500
  • 响应大小:例如,1043
任务要求
  1. 数据预处理: - 解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。- 将解析后的数据格式化为结构化格式(例如,JSON)。
  2. 访问统计: - 统计每个 IP 地址在一天中的访问次数。- 统计每个请求 URL 在一天中的访问次数。
  3. 异常检测: - 检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。- 检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,统计每个 IP 地址的异常请求次数,并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。
  4. 结果输出: - 输出访问统计结果:每个 IP 地址的访问次数,每个请求 URL 的访问次数。- 输出异常检测结果:异常高访问频率的 IP 地址及其访问次数,潜在的恶意请求 IP 地址及其异常请求次数和总请求次数的比例。
输入数据示例
127.0.0.1 - - [10/Oct/2021:13:55:36 -0700]"GET /index.html HTTP/1.1"2001043192.168.0.1 - - [10/Oct/2021:13:56:12 -0700]"POST /login HTTP/1.1"2002326...
输出数据示例

访问统计结果

IP访问次数:
127.0.0.1  150192.168.0.1  200

URL访问次数:
/index.html  300
/login  400

异常检测结果

异常高访问频率 IP:
192.168.0.1  1200

潜在恶意请求 IP:
127.0.0.1  5025.0%
实现步骤
  1. 数据预处理 Mapper: - 解析日志记录,提取必要字段并输出结构化数据。
  2. 访问统计 Mapper 和 Reducer: - Mapper:统计每个 IP 地址和每个 URL 的访问次数。- Reducer:汇总每个 IP 地址和每个 URL 的访问次数。
  3. 异常检测 Mapper 和 Reducer: - Mapper:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。- Reducer:计算每个 IP 地址访问次数的均值和标准差,标记异常高访问频率的 IP 地址;统计每个 IP 地址的异常请求次数并计算异常请求比例,标记潜在的恶意请求 IP 地址。

解题思路

  1. 数据预处理 Mapper:解析日志记录,提取必要字段并输出结构化数据。
  2. 访问统计 Mapper 和 Reducer:统计每个 IP 地址和每个 URL 的访问次数。
  3. 异常检测 Mapper 和 Reducer:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。
  4. 主方法:设置三个 MapReduce 作业:数据预处理、访问统计和异常检测。

1. 数据预处理

**

PreprocessMapper

**

packageorg.example.mapreduce.t1;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;/**
 * @author 撕得失败的标签
 * @version 1.0
 * @description: 数据预处理
 * @date 2024/6/22 22:35
 */publicclassPreprocessMapperextendsMapper<LongWritable,Text,Text,Text>{/**
     * @description:
     * 1. 解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。
     * 2. 将解析后的数据格式化为结构化格式(例如,JSON)。
     * @author 撕得失败的标签
     * @date 2024/6/23 11:09
     */@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{String line = value.toString();String[] strings = line.split(" ");if(strings.length ==10){// 提取匹配到的字段String ipAddress = strings[0];String timestamp = strings[3]+" "+ strings[4];String requestMethod = strings[5];String requestUrl = strings[6];String httpStatusCode = strings[8];String responseSize = strings[9];
            context.write(newText(ipAddress),newText(timestamp +","+ requestMethod +","+ requestUrl +","+ httpStatusCode +","+ responseSize));}}}

2. 访问统计

**

AccessStatistics

**

packageorg.example.mapreduce.t1;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.io.LongWritable;importjava.io.IOException;/**
 * @author 撕得失败的标签
 * @version 1.0
 * @description: 访问统计
 * @date 2024/6/22 22:55
 */publicclassAccessStatistics{/**
     * @description:
     * 1. 统计每个 IP 地址在一天中的访问次数。
     * 2. 统计每个请求 URL 在一天中的访问次数。
     * @author 撕得失败的标签
     * @date 2024/6/23 11:08
     */publicstaticclassMapextendsMapper<LongWritable,Text,Text,LongWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsInterruptedException,IOException{String line = value.toString();String[] strings = line.split(" ");// 统计一天的,以 20/Jun/2024 为例if(strings[3].contains("20/Jun/2024")){// IP
                context.write(newText(strings[0]),newLongWritable(1));// URL
                context.write(newText(strings[6]),newLongWritable(1));}}}publicstaticclassReduceextendsReducer<Text,LongWritable,Text,LongWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<LongWritable> values,Context context)throwsIOException,InterruptedException{long sum =0;for(LongWritable value : values){
                sum += value.get();}
            context.write(key,newLongWritable(sum));}}}

3. 异常检测

**

AnomalyDetection

**

packageorg.example.mapreduce.t1;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.io.LongWritable;importjava.io.IOException;importjava.util.HashMap;/**
 * @author 撕得失败的标签
 * @version 1.0
 * @description: 异常检测
 * @date 2024/6/23 11:08
 */publicclassAnomalyDetection{/**
     * @description:
     * 1. 检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。
     * 2. 检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,
     *    统计每个 IP 地址的异常请求次数,
     *    并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。
     * @author 撕得失败的标签
     * @date 2024/6/23 11:08
     */publicstaticclassMapextendsMapper<LongWritable,Text,Text,LongWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{String[] strings = value.toString().split(" ");String ip = strings[0];
            context.write(newText(ip),newLongWritable(1));String httpStatusCode = strings[8];if(httpStatusCode.startsWith("4")|| httpStatusCode.startsWith("5")){String anomaly ="+"+ ip;
                context.write(newText(anomaly),newLongWritable(1));}}}publicstaticclassReduceextendsReducer<Text,LongWritable,Text,LongWritable>{privatefinalHashMap<String,Long> ipToCount =newHashMap<String,Long>();privatefinalHashMap<String,Long> ipToAnomalyCount =newHashMap<String,Long>();@Overrideprotectedvoidreduce(Text key,Iterable<LongWritable> values,Context context)throwsIOException,InterruptedException{long sum =0;for(LongWritable value : values){
                sum += value.get();}//            context.write(key, new LongWritable(sum));String ip = key.toString();if(ip.startsWith("+")){
                ip = ip.substring(1);
                ipToAnomalyCount.put(ip, sum);}
            ipToCount.put(ip, sum);}@Overrideprotectedvoidcleanup(Context context)throwsIOException,InterruptedException{// 实现异常检测的逻辑long sum =0;for(String k : ipToCount.keySet()){
                sum += ipToCount.get(k);}double avg =(double)(sum / ipToCount.size());double std =0;for(String k : ipToCount.keySet()){
                std +=Math.pow(ipToCount.get(k)- avg,2);}// 异常高访问频率 IPfor(String k : ipToCount.keySet()){if(ipToCount.get(k)> avg +3* std){
                    context.write(newText(k),newLongWritable(ipToCount.get(k)));}}// 潜在恶意请求 IPfor(String k : ipToAnomalyCount.keySet()){double anomaly =(double) ipToAnomalyCount.get(k)/ ipToCount.get(k);if(anomaly >0.2){
                    context.write(newText(k +"\t"+String.format("%.1f", anomaly *100)+"%"),newLongWritable(ipToAnomalyCount.get(k)));}}}}}

4. 主方法

**

Main

**

packageorg.example.mapreduce.t1;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;importjava.io.BufferedReader;importjava.io.InputStreamReader;importjava.util.LinkedList;importjava.util.List;/**
 * @author 撕得失败的标签
 * @version 1.0
 * @description: 主方法
 * @date 2024/6/22 22:34
 */publicclassMain{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{// 创建配置信息Configuration conf =newConfiguration();
        conf.set("fs.default.name","hdfs://hadoop102:9000");// 1. 数据预处理 PreprocessMapperJob preprocessJob =Job.getInstance(conf,"preprocess job");
        preprocessJob.setJarByClass(Main.class);
        preprocessJob.setMapperClass(PreprocessMapper.class);
        preprocessJob.setOutputKeyClass(Text.class);
        preprocessJob.setOutputValueClass(Text.class);FileInputFormat.addInputPath(preprocessJob,newPath("/m1"));FileSystem fs =FileSystem.get(conf);Path outPath =newPath("/t1/preprocess");if(fs.exists(outPath)){
            fs.delete(outPath,true);}FileOutputFormat.setOutputPath(preprocessJob, outPath);
        preprocessJob.waitForCompletion(true);// 2. 访问统计 AccessStatisticsJob accessStatisticsJob =Job.getInstance(conf,"access statistics job");
        accessStatisticsJob.setJarByClass(Main.class);
        accessStatisticsJob.setMapperClass(AccessStatistics.Map.class);
        accessStatisticsJob.setReducerClass(AccessStatistics.Reduce.class);
        accessStatisticsJob.setOutputKeyClass(Text.class);
        accessStatisticsJob.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(accessStatisticsJob,newPath("/m1"));FileSystem fs1 =FileSystem.get(conf);Path outPath1 =newPath("/t1/statistics");if(fs1.exists(outPath1)){
            fs1.delete(outPath1,true);}FileOutputFormat.setOutputPath(accessStatisticsJob, outPath1);
        accessStatisticsJob.waitForCompletion(true);// 3. 异常检测 AnomalyDetectionJob anomalyDetectionJob =Job.getInstance(conf,"anomaly detection job");
        anomalyDetectionJob.setJarByClass(Main.class);
        anomalyDetectionJob.setMapperClass(AnomalyDetection.Map.class);
        anomalyDetectionJob.setReducerClass(AnomalyDetection.Reduce.class);
        anomalyDetectionJob.setOutputKeyClass(Text.class);
        anomalyDetectionJob.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(anomalyDetectionJob,newPath("/m1"));FileSystem fs2 =FileSystem.get(conf);Path outPath2 =newPath("/t1/anomaly");if(fs2.exists(outPath2)){
            fs2.delete(outPath2,true);}FileOutputFormat.setOutputPath(anomalyDetectionJob, outPath2);
        anomalyDetectionJob.waitForCompletion(true);// 4. 输出结果 Output// 访问统计结果:FileSystem fs3 =FileSystem.get(conf);Path outPath3 =newPath("/t1/statistics/part-r-00000");BufferedReader br =newBufferedReader(newInputStreamReader(fs3.open(outPath3)));List<String> ip =newLinkedList<String>();List<String> url =newLinkedList<String>();String line;while((line = br.readLine())!=null){if(line.startsWith("/")){
                url.add(line);}else{
                ip.add(line);}}// IP访问次数:System.out.println("\nIP访问次数:");for(String s : ip){System.out.println(s);}// URL访问次数:System.out.println("\nURL访问次数:");for(String s : url){System.out.println(s);}// 异常检测结果:FileSystem fs4 =FileSystem.get(conf);Path outPath4 =newPath("/t1/anomaly/part-r-00000");BufferedReader br1 =newBufferedReader(newInputStreamReader(fs4.open(outPath4)));List<String> potential =newLinkedList<String>();List<String> anomaly =newLinkedList<String>();String line1;while((line1 = br1.readLine())!=null){String[] strings = line1.split("\t");if(strings.length ==2){
                anomaly.add(line1);}else{
                potential.add(line1);}}// 异常高访问频率 IP:System.out.println("\n异常高访问频率 IP:");if(anomaly.size()==0){System.out.println("无");}else{for(String s : anomaly){System.out.println(s);}}// 潜在恶意请求 IP:System.out.println("\n潜在异常高访问频率 IP:");if(potential.size()==0){System.out.println("无");}else{for(String s : potential){String[] strings = s.split("\t");System.out.println(strings[0]+"\t"+ strings[2]+"\t"+ strings[1]);}}}}

5. 结果输出

IP访问次数:10.0.0.133400310.0.0.233435010.0.0.333305610.0.0.433394710.0.0.5333263127.0.0.1332347127.0.0.2333025127.0.0.3332450127.0.0.4333005127.0.0.5333428192.168.0.1334054192.168.0.2332883192.168.0.3333681192.168.0.4333133192.168.0.5333375

URL访问次数:/cart    713975/checkout    713453/contact    715382/home.html    712570/index.html    715544/login    714255/products    714821

异常高访问频率 IP:
无

潜在异常高访问频率 IP:192.168.0.222249866.8%192.168.0.122216566.5%127.0.0.522177866.5%192.168.0.422209666.7%127.0.0.422215666.7%192.168.0.322222766.6%192.168.0.522207066.6%10.0.0.422224366.6%10.0.0.322196666.6%10.0.0.522234766.7%10.0.0.222266466.6%10.0.0.122249366.6%127.0.0.322146466.6%127.0.0.222219766.7%127.0.0.122170266.7%

Process finished with exit code 0
标签: mapreduce

本文转载自: https://blog.csdn.net/qq_61828116/article/details/139904508
版权归原作者 撕得失败的标签 所有, 如有侵权,请联系我们删除。

“MapReduce 实践题:Web 访问日志分析与异常检测”的评论:

还没有评论