💻目录
一、前沿介绍
前面已经大致介绍了kafka streams的基本使用了,这里结合一个实际案例来进行练习使用kafka streams。
下面案例是一个商场购物的场景,就比如我们去一个购物商场购买东西时,在购买的时候。商场会记录下来我们这一次消费的信息,一般首先会先把银行卡等信息进行一个加***隐藏,然后再把信息分别发送发送给需要的topic,如累计积分的,把购买的金额转为积分返回给用户账号;根据购买产品的不同发送给不同的topic。具体如下:
执行流程
- 通过split把用户购买产品的记录存入到不同的分支 1. 咖啡写入caffee 处理器2. 电子产品写入electronics处理器
- 把支付的金额以积分的形式传入到pattem处理器
- 把transactionKey作为key,value为原始数据传入到purchase处理器
- 把支付的金额以积分的形式传入到reward处理器
- 再把原始数据全部写到data数据仓库去
使用到的实体类
二、代码实现
1、依赖
和前面类似,主要是kafka的相关依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.7</version></dependency>
2、实体类
2.1、Transaction
写入的购买信息
@Data@BuilderpublicclassTransaction{/**性*/privateString lastName;/**名*/privateString firstName;/**顾客id*/privateString customerId;/**银行卡号*/privateString creditCardNumber;/**商品名称*/privateString itemPurchased;/**店铺名称*/privateString department;/**数量*/privateInteger quantity;/**价格*/privateDouble price;/**购买时间*/privateString purchaseDate;/**邮政编码*/privateString zipCode;}
2.2、 TransactionKey
用作标记,转为key的
@Data@BuilderpublicclassTransactionKey{privateString customerId;privateString purchaseDate;}
2.3、TransactionPattern
记录购买信息
@Data@BuilderpublicclassTransactionPattern{privateString zipCode;privateString item;privateString date;privateDouble amount;}
2.4、CustomerReward
@Data@BuilderpublicclassCustomerReward{privateString customerId;privateString purchaseTotal;privateInteger rewardPoints;}
3、序列化工具类
3.1、序列化
/**
* 序列化
* @param <T>
*/publicclassJsonSerializer<T>implementsSerializer<T>{privateGson gson=newGson();publicvoidconfigure(Map<String,?> map,boolean b){}publicbyte[]serialize(String topic,T t){return gson.toJson(t).getBytes();}@Overridepublicvoidclose(){Serializer.super.close();}}
3.2、反序列化
/**
* 反序列化
* @param <T>
*/publicclassJsonDeserializer<T>implementsDeserializer<T>{privateGson gson=newGson();privateClass<T> deserializeClass;publicJsonDeserializer(Class<T> deserializeClass){this.deserializeClass=deserializeClass;}publicJsonDeserializer(){}@Override@SuppressWarnings("unchecked")publicvoidconfigure(Map<String,?> map,boolean b){if(deserializeClass ==null){
deserializeClass =(Class<T>) map.get("serializedClass");}}@OverridepublicTdeserialize(String topic,byte[] data){if(data ==null){returnnull;}return gson.fromJson(newString(data),deserializeClass);}@Overridepublicvoidclose(){}}
3.3、Serde仓库
用做直接通过调用实现Serde使用json序列化转换,也可以参考Serdes方法实现
/**
* 序列化和反序列化
*/publicclassJsonSerdes{/**获取Serde*/publicstaticTransactionPatternWrapSerdeTransactionPattern(){returnnewTransactionPatternWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(TransactionPattern.class));}publicstaticTransactionKeyWrapSerdeTransactionKey(){returnnewTransactionKeyWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(TransactionKey.class));}publicstaticCustomerRewardWrapSerdeCustomerReward(){returnnewCustomerRewardWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(CustomerReward.class));}publicstaticTransactionWrapSerdeTransaction(){returnnewTransactionWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(Transaction.class));}/**创建Serde*/privatefinalstaticclassTransactionPatternWrapSerdeextendsWrapSerde<TransactionPattern>{publicTransactionPatternWrapSerde(Serializer<TransactionPattern> serializer,Deserializer<TransactionPattern> deserializer){super(serializer, deserializer);}}privatefinalstaticclassTransactionKeyWrapSerdeextendsWrapSerde<TransactionKey>{publicTransactionKeyWrapSerde(Serializer<TransactionKey> serializer,Deserializer<TransactionKey> deserializer){super(serializer, deserializer);}}privatefinalstaticclassCustomerRewardWrapSerdeextendsWrapSerde<CustomerReward>{publicCustomerRewardWrapSerde(Serializer<CustomerReward> serializer,Deserializer<CustomerReward> deserializer){super(serializer, deserializer);}}privatefinalstaticclassTransactionWrapSerdeextendsWrapSerde<Transaction>{publicTransactionWrapSerde(Serializer<Transaction> serializer,Deserializer<Transaction> deserializer){super(serializer, deserializer);}}/** WrapSerde父类*/privatestaticclassWrapSerde<T>implementsSerde<T>{privatefinalSerializer<T> serializer;privatefinalDeserializer<T> deserializer;publicWrapSerde(Serializer<T> serializer,Deserializer<T> deserializer){this.serializer = serializer;this.deserializer = deserializer;}@OverridepublicSerializer<T>serializer(){return serializer;}@OverridepublicDeserializer<T>deserializer(){return deserializer;}}}
4、具体streams实现
使用上比较简单,主要是通过前面学的方法进行实现不同的处理器转换数据,然后在发送到不同的topic中去,编写好之后,我们需要创建需要使用到的topic
@Slf4jpublicclassShoppingStreams{privatestaticfinalStringBOOTSTRAP_SERVERS="localhost:9092";privatestaticfinalStringAPPLICATION_ID="shopping-streams";privatestaticfinalStringSELL_TRANSACTION_SOURCE_TOPIC="sell.transaction";privatestaticfinalStringSELL_TRANSACTION_PATTERN_TOPIC="sell.pattern.transaction";privatestaticfinalStringSELL_TRANSACTION_REWARDS_TOPIC="sell.rewards.transaction";privatestaticfinalStringSELL_TRANSACTION_COFFEE_TOPIC="sell.coffee.transaction";privatestaticfinalStringSELL_TRANSACTION_ELECT_TOPIC="sell.elect.transaction";privatestaticfinalStringSELL_TRANSACTION_PURCHASE_TOPIC="sell.purchase.transaction";publicstaticvoidmain(String[] args)throwsInterruptedException{Properties properties =newProperties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,APPLICATION_ID);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,"org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");StreamsConfig streamsConfig =newStreamsConfig(properties);Serde<String> stringSerde =Serdes.String();StreamsBuilder builder =newStreamsBuilder();// 1、到topic中去读取数据KStream<String,Transaction> k0 = builder
.stream(SELL_TRANSACTION_SOURCE_TOPIC,Consumed.with(stringSerde,JsonSerdes.Transaction()).withName("transaction-source").withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST));//指定偏移重置策略。当没有初始偏移量或偏移量超出范围时,消费将从最新的记录开始。// 2、把原始数据进行加密KStream<String,Transaction> k1 = k0.peek((k, v)-> log.info("k:{},v:{}", k, v)).mapValues(v ->{String encryption = v.getCreditCardNumber().replaceAll("(?<=^.{4}).*","****");
v.setCreditCardNumber(encryption);return v;},Named.as("pattern-sink"));// 2、记录商品购买
k1.mapValues(v ->TransactionPattern.builder().zipCode(v.getZipCode()).item(v.getItemPurchased()).date(v.getPurchaseDate().toString()).amount(v.getPrice()).build(),Named.as("transaction-pattern")).to(SELL_TRANSACTION_PATTERN_TOPIC,Produced.with(stringSerde,JsonSerdes.TransactionPattern()));// 3、奖励用户积分
k1.mapValues(v ->CustomerReward.builder().customerId(v.getCustomerId()).purchaseTotal(v.getItemPurchased()).rewardPoints(v.getPrice().intValue()).build(),Named.as("transaction-rewards")).to(SELL_TRANSACTION_REWARDS_TOPIC,Produced.with(stringSerde,JsonSerdes.CustomerReward()));// 4、把消费金额大于5的记录下来(标注为key,发送出去)
k1.filter((k, v)-> v.getPrice()>5).selectKey((k, v)->TransactionKey.builder().customerId(v.getCustomerId()).purchaseDate(v.getPurchaseDate()).build(),Named.as("transaction-purchase")).to(SELL_TRANSACTION_PURCHASE_TOPIC,Produced.with(JsonSerdes.TransactionKey(),JsonSerdes.Transaction()));// 5、把购买的商品根据类型分别发送到不同的topic中
k1.split(Named.as("branch-")).branch((k, v)-> v.getItemPurchased().equalsIgnoreCase("coffee"),Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_COFFEE_TOPIC,Produced.with(stringSerde,JsonSerdes.Transaction())))).branch((k, v)-> v.getItemPurchased().equalsIgnoreCase("elect"),Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_ELECT_TOPIC,Produced.with(stringSerde,JsonSerdes.Transaction())))).noDefaultBranch();// 模拟把数据全部写入到数据仓库
k1.print(Printed.<String,Transaction>toSysOut().withName("DW"));
k1.foreach((k, v)-> log.info("数据存入数据仓库=========>,k:{},v:{}", k, v));KafkaStreams kafkaStreams =newKafkaStreams(builder.build(), streamsConfig);CountDownLatch latch =newCountDownLatch(1);Runtime.getRuntime().addShutdownHook(newThread(()->{
kafkaStreams.close();
latch.countDown();
log.info("The Kafka Streams 执行关闭!");}));
kafkaStreams.start();
log.info("kafka streams 启动成功!>>>>");
latch.await();}}
5、其他测试使用
5.1、生产者
创建一个生产消息的类,往topic发送消息
/**
* 生产购物消息到kafka
*/@Slf4jpublicclassShoppingProducer{privatefinalstaticStringTOPIC_NAME="sell.transaction";privatestaticfinalStringBOOTSTRAP_SERVERS="localhost:9092";publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{Properties props =newProperties();// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);// 设置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 连接客户端KafkaProducer<String,String> producer =newKafkaProducer<>(props);// 发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)for(int i =0; i <5; i++){Transaction transaction =Transaction.builder().customerId("011223").itemPurchased("elect").quantity(i).zipCode("10100").firstName("李").lastName("四").price(i *100.0).purchaseDate(newDate().toString()).creditCardNumber("4322-1223-1123-000"+ i).department("体育西路35号").build();String json =newJSONObject(transaction).toString();// 默认partition数量和Broker创建的数量一致ProducerRecord<String,String> producerRecord =newProducerRecord<>(TOPIC_NAME,0,"my-keyValue3", json);// 同步send(producer,producerRecord);}}/**
* @param producer: 客户端对象
* @return void
* 同步发送
* @date 2024/3/22 17:09
*/privatestaticvoidsend(KafkaProducer<String,String> producer,ProducerRecord<String,String> producerRecord)throwsInterruptedException,ExecutionException{// 等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();
log.info("同步发送消息"+"topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}}
5.2、日志文件
因为kafka一直会刷日志,所以需要有一个日志文件屏蔽debug类型的日志输出
在
resources
路径下创建一个
logback.xml
文件
<configurationscon="true"scanPeriod="10 seconds"><includeresource="org/springframework/boot/logging/logback/base.xml"/><!-- 屏蔽kafka debug --><loggername="org.apache.kafka.clients"level="ERROR"/></configuration>
6、创建topic
首先需要有自己的kafka,如何创建可以看我前面的文章 🍅kafka在linux和docker安装这篇文章
- 进入容器
docker exec -it kafka-server /bin/bash
- 创建topic 分别把需要的几个topic全部创建好
/opt/kafka/bin/kafka-topics.sh --create --topic sell.transaction --bootstrap-server localhost.2:9092 --partitions 2 --replication-factor 1
三、测试结果
- 启动项目 如下,则代表启动成功
- 发送消息后,会出现对应的日志
- 进入topic查看 分别进入不同的topic看是否可以接收到不同的消息
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction
- sell.pattern.transaction
版权归原作者 方渐鸿 所有, 如有侵权,请联系我们删除。