第1关:单模式
编程要求
根据提示,在右侧编辑器补充代码,匹配数据集中两个连续的 "alert" 事件,代码中已经为你提供数据集。
测试说明
平台会对你编写的代码进行测试:
测试输入:无;
预期输出:
Pattern found: {start=[alert], end=[alert]}
答案代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class sign_mode {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 输入数据源
DataStream<String> input = env.fromElements(
"event1",
"alert",
"event2",
"alert",
"event3",
"alert",
"alert"
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
);
// ********** Begin **********
// 定义模式,这里我们寻找的是两个连续的 "alert" 事件
Pattern<String, ?> pattern = Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) {
return "alert".equals(value);
}
})
.next("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) {
return "alert".equals(value);
}
});
// 使用 CEP 库进行模式匹配,并打印出来
DataStream<String> result = CEP.pattern(input, pattern)
.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> pattern) {
String start = pattern.get("start").get(0); // 第一个 "alert" 事件
String end = pattern.get("end").get(0); // 第二个 "alert" 事件
return "Pattern found: {start=[" + start + "], end=[" + end + "]}";
}
});
result.print();
// ********** End **********
result.writeAsText("/root/files/result.csv");
env.execute();
}
}
第2关:模式序列
编程要求
根据提示,在右侧编辑器补充代码,进行组合模式匹配 A 后面跟着一个或多个(不确定连续的)B,然后跟着一个 C,题目已提供数据集。
测试说明
平台会对你编写的代码进行测试:
测试输入:无;
预期输出:
[B]
[B, B, B]
[B, B]
[B, B]
[B, B]
[B]
答案代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class more_mode {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 输入数据源,可以根据实际情况进行替换
DataStream<String> input = env.fromElements(
"A", "B", "C", "A", "B", "B", "C", "A", "C"
).assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
);
Pattern<String, ?> pattern = Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) {
return "A".equals(value);
}
})
.followedBy("middle")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) {
return "B".equals(value);
}
})
.oneOrMore()
.allowCombinations()
.followedBy("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) {
return "C".equals(value);
}
});
DataStream<List<String>> result1 = CEP.pattern(input, pattern)
.select(new PatternSelectFunction<String, List<String>>() {
@Override
public List<String> select(Map<String, List<String>> pattern) {
return pattern.get("middle");
}
});
// 输出结果到控制台
result1.print();
// 将结果写入文件
result1.writeAsText("/root/files/result.csv");
// 提交任务到 Flink 集群执行
env.execute("Flink CEP Combination Pattern Matching");
}
}
第3关:模式组
编程要求
根据提示,在右侧编辑器补充代码,在给定的数据流中,有一个订单事件类型,每个订单事件包含以下属性:
orderId:订单 ID,唯一标识一个订单
productId:产品 ID,唯一标识一个产品
quantity:数量,表示购买的产品数量
price:价格,表示产品的单价 现在需要使用 Flink CEP 来检测以下模式: 当一个订单购买的数量大于 10 并且价格大于 100 时,输出该订单的 orderId、productId 和总价(quantity * price)。
测试说明
平台会对你编写的代码进行测试:
测试输入:无;
预期输出:
Pattern 1 Match: OrderId=1, ProductId=product1, TotalPrice=1800.0
Pattern 1 Match: OrderId=3, ProductId=product1, TotalPrice=1320.0
Pattern 1 Match: OrderId=4, ProductId=product3, TotalPrice=3000.0
Pattern 1 Match: OrderId=6, ProductId=product2, TotalPrice=1155.0
Pattern 1 Match: OrderId=7, ProductId=product1, TotalPrice=1690.0
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class GroupMode {
public static class OrderEvent {
public int orderId;
public String productId;
public int quantity;
public double price;
public OrderEvent(int orderId, String productId, int quantity, double price) {
this.orderId = orderId;
this.productId = productId;
this.quantity = quantity;
this.price = price;
}
public double getTotalPrice() {
return quantity * price;
}
public int getQuantity() {
return quantity;
}
public double getPrice() {
return price;
}
public String getOrderId() {
return String.valueOf(orderId);
}
public String getProductId() {
return productId;
}
@Override
public String toString() {
return "OrderEvent{" +
"orderId=" + orderId +
", productId='" + productId + '\'' +
", quantity=" + quantity +
", price=" + price +
'}';
}
}
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 输入订单事件流,替换为你的实际数据源
DataStream<OrderEvent> orderEvents = env.fromElements(
new OrderEvent(1, "product1", 15, 120.0),
new OrderEvent(2, "product2", 8, 90.0),
new OrderEvent(3, "product1", 12, 110.0),
new OrderEvent(4, "product3", 20, 150.0),
new OrderEvent(5, "product1", 9, 95.0),
new OrderEvent(6, "product2", 11, 105.0),
new OrderEvent(7, "product1", 13, 130.0)
).assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
);
// ********** Begin **********
// 定义模式
Pattern<OrderEvent, ?> pattern = Pattern.<OrderEvent>begin("start")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) {
return value.getQuantity() > 10 && value.getPrice() > 100;
}
});
// 使用 CEP 库进行模式匹配
PatternStream<OrderEvent> patternStream = CEP.pattern(orderEvents, pattern);
// 选择并处理匹配结果
DataStream<String> result = patternStream.flatSelect(new MyPatternFlatSelectFunction());
// ********** End **********
// 输出结果到控制台
result.print();
// 将结果写入文件
result.writeAsText("/root/files/result.csv");
// 执行任务
env.execute("Order CEP Example");
}
public static class MyPatternFlatSelectFunction implements PatternFlatSelectFunction<OrderEvent, String> {
@Override
public void flatSelect(Map<String, List<OrderEvent>> pattern, Collector<String> out) {
List<OrderEvent> startEvents = pattern.get("start");
for (OrderEvent event : startEvents) {
out.collect("Pattern 1 Match: OrderId=" + event.getOrderId() + ", ProductId=" + event.getProductId() + ", TotalPrice=" + event.getTotalPrice());
}
}
}
}
第4关:检测模式
编程要求
根据提示,在右侧编辑器补充代码,在给定的数据流中,有一个传感器数据事件类型,每个传感器数据事件包含以下属性:
sensorId:传感器 ID,唯一标识一个传感器。
timestamp:时间戳,表示传感器数据被采集的时间。
temperature:温度,表示传感器测量到的环境温度值现在需要使用Flink CEP来检测以下模式: 当一个传感器的温度超过 60 度,并且后续的传感器温度也超过 60 度时,输出这两个传感器的时间戳和 temperature 值。
数据集已经在代码文件中提供了。
测试说明
平台会对你编写的代码进行测试:
测试输入:无;
预期输出:
Two Consecutive High Temperatures: First Sensor - SensorId=sensor2, Timestamp=2, Temperature=65.0 | Second Sensor - SensorId=sensor1, Timestamp=3, Temperature=70.0
Two Consecutive High Temperatures: First Sensor - SensorId=sensor1, Timestamp=3, Temperature=70.0 | Second Sensor - SensorId=sensor3, Timestamp=4, Temperature=62.0
Two Consecutive High Temperatures: First Sensor - SensorId=sensor3, Timestamp=4, Temperature=62.0 | Second Sensor - SensorId=sensor2, Timestamp=5, Temperature=75.0
Two Consecutive High Temperatures: First Sensor - SensorId=sensor2, Timestamp=5, Temperature=75.0 | Second Sensor - SensorId=sensor1, Timestamp=6, Temperature=80.0
Two Consecutive High Temperatures: First Sensor - SensorId=sensor1, Timestamp=6, Temperature=80.0 | Second Sensor - SensorId=sensor2, Timestamp=7, Temperature=68.0
代码如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class check_mode {
public static class SensorEvent {
public String sensorId;
public long timestamp;
public double temperature;
public SensorEvent(String sensorId, long timestamp, double temperature) {
this.sensorId = sensorId;
this.timestamp = timestamp;
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorEvent{" +
"sensorId='" + sensorId + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<SensorEvent> sensorEvents = env.fromElements(
new SensorEvent("sensor1", 1L, 55.0),
new SensorEvent("sensor2", 2L, 65.0),
new SensorEvent("sensor1", 3L, 70.0),
new SensorEvent("sensor3", 4L, 62.0),
new SensorEvent("sensor2", 5L, 75.0),
new SensorEvent("sensor1", 6L, 80.0),
new SensorEvent("sensor2", 7L, 68.0)
).assignTimestampsAndWatermarks(
WatermarkStrategy.<SensorEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.timestamp)
);
// ********** Begin **********
// 定义模式
Pattern<SensorEvent, ?> pattern = Pattern.<SensorEvent>begin("first")
.where(new SimpleCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent value) {
return value.temperature > 60.0;
}
})
.next("second")
.where(new SimpleCondition<SensorEvent>() {
@Override
public boolean filter(SensorEvent value) {
return value.temperature > 60.0;
}
});
// 创建 PatternStream
PatternStream<SensorEvent> patternStream = CEP.pattern(sensorEvents, pattern);
// 处理匹配结果
DataStream<String> result = patternStream.select(new MySecondPatternSelectFunction());
// ********** End **********
// 输出结果到控制台
result.print();
// 将结果写入文件
result.writeAsText("/root/files/result.csv");
// 执行任务
env.execute();
}
public static class MySecondPatternSelectFunction implements PatternSelectFunction<SensorEvent, String> {
@Override
public String select(Map<String, List<SensorEvent>> pattern) throws Exception {
SensorEvent firstEvent = pattern.get("first").get(0);
SensorEvent secondEvent = pattern.get("second").get(0);
return "Two Consecutive High Temperatures: First Sensor - SensorId=" + firstEvent.sensorId + ", Timestamp=" + firstEvent.timestamp + ", Temperature=" + firstEvent.temperature +
" | Second Sensor - SensorId=" + secondEvent.sensorId + ", Timestamp=" + secondEvent.timestamp + ", Temperature=" + secondEvent.temperature;
}
}
}
第5关:检测用户行为实例
编程要求
根据提示,在右侧编辑器 Begin-End 区间内补充代码,获取文件中的数据,计算出用户连续三次登陆的时间。
输出格式为:
user_1连续三次登陆失败!登陆时间:2000, 3000, 5000
数据格式为:
user_1, 192.168.0.1, fail, 2000L
user_1, 192.168.0.1, fail, 3000L
user_2, 192.168.0.2, fail, 4000L
user_1, 192.168.0.1, success, 5000L
user_3, 192.168.0.3, success, 6000L
....
文件路径为:/data/workspace/myshixun/step1/Data/User.txt
最后需要使用:.writeAsText("/root/files/result.csv") 结果写入到文件内。
测试说明
平台会对你编写的代码进行测试:
测试输入:无;
预期输出:
user_1 连续三次登陆失败!登陆时间:11000, 12000, 14000
user_2 连续三次登陆失败!登陆时间:4000, 7000, 8000
user_2 连续三次登陆失败!登陆时间:7000, 8000, 10000
user_2 连续三次登陆失败!登陆时间:8000, 10000, 13000
代码如下:
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
class LoginEvent {
public String userID;
public String ipAddress;
public String eventType;
public long timeStamp;
public LoginEvent(String userID, String ipAddress, String eventType, long timeStamp) {
this.userID = userID;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timeStamp = timeStamp;
}
public String getUserID(){
return userID;
}
}
public class one {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// ********** Begin **********
// 获取 /data/workspace/myshixun/step1/Data/User.txt 文件中的数据,计算出用户连续三次登陆的时间
// 获取登陆事件流,并提取时间戳、生成水位线
DataStream<LoginEvent> loginEvents = env.readTextFile("/data/workspace/myshixun/step1/Data/User.txt")
.map(line -> {
String[] parts = line.split(",");
return new LoginEvent(parts[0], parts[1], parts[2], Long.parseLong(parts[3]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LoginEvent>() {
@Override
public long extractAscendingTimestamp(LoginEvent element) {
return element.timeStamp * 1000L; // 转换为毫秒
}
})
.keyBy(LoginEvent::getUserID);
// 定义 Pattern,连续的三个登陆失败事件
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("first") // 以第一个登陆失败事件开始
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return "fail".equals(event.eventType);
}
})
.next("second") // 接着是第二个登陆失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return "fail".equals(event.eventType);
}
})
.next("third") // 接着是第三个登陆失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return "fail".equals(event.eventType);
}
});
// 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
DataStream<String> patternStream = CEP.pattern(loginEvents, pattern)
.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context ctx, Collector<String> out) throws Exception {
LoginEvent first = map.get("first").get(0);
LoginEvent second = map.get("second").get(0);
LoginEvent third = map.get("third").get(0);
out.collect(first.userID + " 连续三次登陆失败!登陆时间:" +
first.timeStamp + ", " + second.timeStamp + ", " + third.timeStamp);
}
});
// ********** End **********
// 写入结果目录
patternStream.writeAsText("/root/files/result.csv");
env.execute();
}
}
版权归原作者 TomSmith001 所有, 如有侵权,请联系我们删除。