0


【2024】kafka streams结合案例分析进行实际项目开发使用(3)

💻目录

一、前沿介绍

前面已经大致介绍了kafka streams的基本使用了,这里结合一个实际案例来进行练习使用kafka streams。
下面案例是一个商场购物的场景,就比如我们去一个购物商场购买东西时,在购买的时候。商场会记录下来我们这一次消费的信息,一般首先会先把银行卡等信息进行一个加***隐藏,然后再把信息分别发送发送给需要的topic,如累计积分的,把购买的金额转为积分返回给用户账号;根据购买产品的不同发送给不同的topic。具体如下:

执行流程

  1. 通过split把用户购买产品的记录存入到不同的分支 1. 咖啡写入caffee 处理器2. 电子产品写入electronics处理器
  2. 把支付的金额以积分的形式传入到pattem处理器
  3. 把transactionKey作为key,value为原始数据传入到purchase处理器
  4. 把支付的金额以积分的形式传入到reward处理器
  5. 再把原始数据全部写到data数据仓库去

在这里插入图片描述
使用到的实体类

在这里插入图片描述

二、代码实现

1、依赖

和前面类似,主要是kafka的相关依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.7</version></dependency>

2、实体类

2.1、Transaction

写入的购买信息

@Data@BuilderpublicclassTransaction{/**性*/privateString lastName;/**名*/privateString firstName;/**顾客id*/privateString customerId;/**银行卡号*/privateString creditCardNumber;/**商品名称*/privateString itemPurchased;/**店铺名称*/privateString department;/**数量*/privateInteger quantity;/**价格*/privateDouble price;/**购买时间*/privateString purchaseDate;/**邮政编码*/privateString zipCode;}

2.2、 TransactionKey

用作标记,转为key的

@Data@BuilderpublicclassTransactionKey{privateString customerId;privateString purchaseDate;}

2.3、TransactionPattern

记录购买信息

@Data@BuilderpublicclassTransactionPattern{privateString zipCode;privateString item;privateString date;privateDouble amount;}

2.4、CustomerReward

@Data@BuilderpublicclassCustomerReward{privateString customerId;privateString purchaseTotal;privateInteger rewardPoints;}

3、序列化工具类

3.1、序列化

/**
 * 序列化
 * @param <T>
 */publicclassJsonSerializer<T>implementsSerializer<T>{privateGson gson=newGson();publicvoidconfigure(Map<String,?> map,boolean b){}publicbyte[]serialize(String topic,T t){return gson.toJson(t).getBytes();}@Overridepublicvoidclose(){Serializer.super.close();}}

3.2、反序列化

/**
 * 反序列化
 * @param <T>
 */publicclassJsonDeserializer<T>implementsDeserializer<T>{privateGson gson=newGson();privateClass<T> deserializeClass;publicJsonDeserializer(Class<T> deserializeClass){this.deserializeClass=deserializeClass;}publicJsonDeserializer(){}@Override@SuppressWarnings("unchecked")publicvoidconfigure(Map<String,?> map,boolean b){if(deserializeClass ==null){
            deserializeClass =(Class<T>) map.get("serializedClass");}}@OverridepublicTdeserialize(String topic,byte[] data){if(data ==null){returnnull;}return gson.fromJson(newString(data),deserializeClass);}@Overridepublicvoidclose(){}}

3.3、Serde仓库

用做直接通过调用实现Serde使用json序列化转换,也可以参考Serdes方法实现

/**
 * 序列化和反序列化
 */publicclassJsonSerdes{/**获取Serde*/publicstaticTransactionPatternWrapSerdeTransactionPattern(){returnnewTransactionPatternWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(TransactionPattern.class));}publicstaticTransactionKeyWrapSerdeTransactionKey(){returnnewTransactionKeyWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(TransactionKey.class));}publicstaticCustomerRewardWrapSerdeCustomerReward(){returnnewCustomerRewardWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(CustomerReward.class));}publicstaticTransactionWrapSerdeTransaction(){returnnewTransactionWrapSerde(newJsonSerializer<>(),newJsonDeserializer<>(Transaction.class));}/**创建Serde*/privatefinalstaticclassTransactionPatternWrapSerdeextendsWrapSerde<TransactionPattern>{publicTransactionPatternWrapSerde(Serializer<TransactionPattern> serializer,Deserializer<TransactionPattern> deserializer){super(serializer, deserializer);}}privatefinalstaticclassTransactionKeyWrapSerdeextendsWrapSerde<TransactionKey>{publicTransactionKeyWrapSerde(Serializer<TransactionKey> serializer,Deserializer<TransactionKey> deserializer){super(serializer, deserializer);}}privatefinalstaticclassCustomerRewardWrapSerdeextendsWrapSerde<CustomerReward>{publicCustomerRewardWrapSerde(Serializer<CustomerReward> serializer,Deserializer<CustomerReward> deserializer){super(serializer, deserializer);}}privatefinalstaticclassTransactionWrapSerdeextendsWrapSerde<Transaction>{publicTransactionWrapSerde(Serializer<Transaction> serializer,Deserializer<Transaction> deserializer){super(serializer, deserializer);}}/** WrapSerde父类*/privatestaticclassWrapSerde<T>implementsSerde<T>{privatefinalSerializer<T> serializer;privatefinalDeserializer<T> deserializer;publicWrapSerde(Serializer<T> serializer,Deserializer<T> deserializer){this.serializer = serializer;this.deserializer = deserializer;}@OverridepublicSerializer<T>serializer(){return serializer;}@OverridepublicDeserializer<T>deserializer(){return deserializer;}}}

4、具体streams实现

使用上比较简单,主要是通过前面学的方法进行实现不同的处理器转换数据,然后在发送到不同的topic中去,编写好之后,我们需要创建需要使用到的topic

@Slf4jpublicclassShoppingStreams{privatestaticfinalStringBOOTSTRAP_SERVERS="localhost:9092";privatestaticfinalStringAPPLICATION_ID="shopping-streams";privatestaticfinalStringSELL_TRANSACTION_SOURCE_TOPIC="sell.transaction";privatestaticfinalStringSELL_TRANSACTION_PATTERN_TOPIC="sell.pattern.transaction";privatestaticfinalStringSELL_TRANSACTION_REWARDS_TOPIC="sell.rewards.transaction";privatestaticfinalStringSELL_TRANSACTION_COFFEE_TOPIC="sell.coffee.transaction";privatestaticfinalStringSELL_TRANSACTION_ELECT_TOPIC="sell.elect.transaction";privatestaticfinalStringSELL_TRANSACTION_PURCHASE_TOPIC="sell.purchase.transaction";publicstaticvoidmain(String[] args)throwsInterruptedException{Properties properties =newProperties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
        properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,"org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");StreamsConfig streamsConfig =newStreamsConfig(properties);Serde<String> stringSerde =Serdes.String();StreamsBuilder builder =newStreamsBuilder();//        1、到topic中去读取数据KStream<String,Transaction> k0 = builder
                .stream(SELL_TRANSACTION_SOURCE_TOPIC,Consumed.with(stringSerde,JsonSerdes.Transaction()).withName("transaction-source").withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST));//指定偏移重置策略。当没有初始偏移量或偏移量超出范围时,消费将从最新的记录开始。//       2、把原始数据进行加密KStream<String,Transaction> k1 = k0.peek((k, v)-> log.info("k:{},v:{}", k, v)).mapValues(v ->{String encryption = v.getCreditCardNumber().replaceAll("(?<=^.{4}).*","****");
                    v.setCreditCardNumber(encryption);return v;},Named.as("pattern-sink"));//         2、记录商品购买
        k1.mapValues(v ->TransactionPattern.builder().zipCode(v.getZipCode()).item(v.getItemPurchased()).date(v.getPurchaseDate().toString()).amount(v.getPrice()).build(),Named.as("transaction-pattern")).to(SELL_TRANSACTION_PATTERN_TOPIC,Produced.with(stringSerde,JsonSerdes.TransactionPattern()));//        3、奖励用户积分
        k1.mapValues(v ->CustomerReward.builder().customerId(v.getCustomerId()).purchaseTotal(v.getItemPurchased()).rewardPoints(v.getPrice().intValue()).build(),Named.as("transaction-rewards")).to(SELL_TRANSACTION_REWARDS_TOPIC,Produced.with(stringSerde,JsonSerdes.CustomerReward()));//        4、把消费金额大于5的记录下来(标注为key,发送出去)
        k1.filter((k, v)-> v.getPrice()>5).selectKey((k, v)->TransactionKey.builder().customerId(v.getCustomerId()).purchaseDate(v.getPurchaseDate()).build(),Named.as("transaction-purchase")).to(SELL_TRANSACTION_PURCHASE_TOPIC,Produced.with(JsonSerdes.TransactionKey(),JsonSerdes.Transaction()));//        5、把购买的商品根据类型分别发送到不同的topic中
        k1.split(Named.as("branch-")).branch((k, v)-> v.getItemPurchased().equalsIgnoreCase("coffee"),Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_COFFEE_TOPIC,Produced.with(stringSerde,JsonSerdes.Transaction())))).branch((k, v)-> v.getItemPurchased().equalsIgnoreCase("elect"),Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_ELECT_TOPIC,Produced.with(stringSerde,JsonSerdes.Transaction())))).noDefaultBranch();//        模拟把数据全部写入到数据仓库
        k1.print(Printed.<String,Transaction>toSysOut().withName("DW"));

        k1.foreach((k, v)-> log.info("数据存入数据仓库=========>,k:{},v:{}", k, v));KafkaStreams kafkaStreams =newKafkaStreams(builder.build(), streamsConfig);CountDownLatch latch =newCountDownLatch(1);Runtime.getRuntime().addShutdownHook(newThread(()->{
            kafkaStreams.close();
            latch.countDown();
            log.info("The Kafka Streams 执行关闭!");}));

        kafkaStreams.start();
        log.info("kafka streams 启动成功!>>>>");
        latch.await();}}

5、其他测试使用

5.1、生产者

创建一个生产消息的类,往topic发送消息

/**
 * 生产购物消息到kafka
 */@Slf4jpublicclassShoppingProducer{privatefinalstaticStringTOPIC_NAME="sell.transaction";privatestaticfinalStringBOOTSTRAP_SERVERS="localhost:9092";publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{Properties props =newProperties();//        设置参数
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);//        设置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//        连接客户端KafkaProducer<String,String> producer =newKafkaProducer<>(props);//        发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)for(int i =0; i <5; i++){Transaction transaction =Transaction.builder().customerId("011223").itemPurchased("elect").quantity(i).zipCode("10100").firstName("李").lastName("四").price(i *100.0).purchaseDate(newDate().toString()).creditCardNumber("4322-1223-1123-000"+ i).department("体育西路35号").build();String json =newJSONObject(transaction).toString();//          默认partition数量和Broker创建的数量一致ProducerRecord<String,String> producerRecord =newProducerRecord<>(TOPIC_NAME,0,"my-keyValue3", json);//        同步send(producer,producerRecord);}}/**
     * @param producer: 客户端对象
     * @return void
     * 同步发送
     * @date 2024/3/22 17:09
     */privatestaticvoidsend(KafkaProducer<String,String> producer,ProducerRecord<String,String> producerRecord)throwsInterruptedException,ExecutionException{//          等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();

        log.info("同步发送消息"+"topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}}

5.2、日志文件

因为kafka一直会刷日志,所以需要有一个日志文件屏蔽debug类型的日志输出

resources

路径下创建一个

logback.xml

文件

<configurationscon="true"scanPeriod="10 seconds"><includeresource="org/springframework/boot/logging/logback/base.xml"/><!-- 屏蔽kafka debug --><loggername="org.apache.kafka.clients"level="ERROR"/></configuration>

6、创建topic

首先需要有自己的kafka,如何创建可以看我前面的文章 🍅kafka在linux和docker安装这篇文章

  • 进入容器
docker exec -it kafka-server /bin/bash
  • 创建topic 分别把需要的几个topic全部创建好
/opt/kafka/bin/kafka-topics.sh --create --topic sell.transaction --bootstrap-server localhost.2:9092 --partitions 2 --replication-factor 1

三、测试结果

  • 启动项目 如下,则代表启动成功在这里插入图片描述
  • 发送消息后,会出现对应的日志在这里插入图片描述
  • 进入topic查看 分别进入不同的topic看是否可以接收到不同的消息/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction在这里插入图片描述
  • sell.pattern.transaction在这里插入图片描述
标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/weixin_52315708/article/details/139723857
版权归原作者 方渐鸿 所有, 如有侵权,请联系我们删除。

“【2024】kafka streams结合案例分析进行实际项目开发使用(3)”的评论:

还没有评论