文章目录
走过路过大神帮忙给个正确解决方案~~~~~
一、框架及逻辑介绍
1.背景服务介绍
微服务结构,目前有A、B、C三个服务。
- A服务:做一些工具类的功能
- B服务:类似于门户,调用A、C服务来给到前端
- C服务:基础模块,日志、权限、数据维护
2.问题逻辑介绍
- A服务:实现blast功能,该功能是异步任务,需要几分钟的时间才能执行完成,所以采用了消息队列的方式通知功能完成,可查看数据。当代码执行完成之后通过rabbitmq发送消息到B服务
- B服务:消费消息,调用C服务存储完成通知数据
- C服务:提供FeignClient接口
二、代码
1.A服务
业务代码,详细的逻辑部分就不列出代码部分了, 测试确认无问题。
@Api(tags ="blast工具")@RequiredArgsConstructor@RequestMapping("blast")@RestControllerpublicclassBlastController{privatefinalBlastService blastService;@PostMapping("uniport")@ApiOperation("蛋白质序列对比")publicApiResponse<Boolean>check(@RequestBodyBlastEntity blastEntity){boolean flag = blastService.check(blastEntity);if(flag){returnApiResponse.success(flag);}else{returnApiResponse.failure("序列比对失败");}}@GetMapping("uniportDetail")@ApiOperation("蛋白质序列对比详情")publicApiResponse<BlastLogEntity>selectDetailById(@ApiParam("主键")@RequestParam("id")Integer id){returnApiResponse.success(blastService.selectDetailById(id));}}
通过FeignClient提供功能接口给B服务(门户)
@FeignClient(name ="tool", url ="${customer.tool-url}")publicinterfaceDataClient{@PostMapping("/blast/uniport")@ApiOperation("蛋白质序列对比")ApiResponse<Boolean>checkBlastUniprot(@RequestBodyBlastEntity blastEntity);@GetMapping("/blast/uniportDetail")@ApiOperation("蛋白质序列对比详情")ApiResponse<BlastLogEntity>selectDetailById(@ApiParam("主键")@RequestParam("id")Integer id);}
通过Rabbitmq发送消息
@ConfigurationpublicclassDirectRabbitMQConfig{//创建一个名为TestDirectExchange的Direct类型的交换机@BeanDirectExchangedirectExchange(){// durable:是否持久化,默认是false,持久化交换机。// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机returnnewDirectExchange(RabbitMqConstants.EXCHANGE_NAME,true,false);}//创建一个名为insertChassisDirectQueue的队列@BeanpublicQueueinsertBlastDirectQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。returnnewQueue(RabbitMqConstants.INSERT_BLAST_NAME,true);}//绑定交换机和队列@BeanBindinginsertBlastDirect(){returnBindingBuilder.bind(insertBlastDirectQueue()).to(directExchange()).with(RabbitMqConstants.INSERT_BLAST_KEY);}}
2.B服务
消费消息,根据输出日志,可以看到消费消息成功。
但是走到
apiClient.addLogMessage(logMessageEntity);
不报错,C服务无响应,断点查看apiClient是可以看到C服务的信息localhost:8080的。
@RabbitHandler@RabbitListener(bindings ={@QueueBinding(
value =@Queue(value = INSERT_BLAST_NAME, durable ="true"),
exchange =@Exchange(value =RabbitMqConstants.EXCHANGE_NAME),
key = INSERT_BLAST_KEY)})publicvoidinsertBlast(Map map,Channel channel,Message message){try{
LOGGER.info("【生成的消息-blast序列比对结果】:{}", map.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);BlastLogEntity blastLogEntity =TypeSwitchUtil.toSnakeObject(JSON.toJSONString(map),BlastLogEntity.class);if(null== blastLogEntity ||StringUtils.isEmpty(blastLogEntity.getContent())){
LOGGER.warn("【生成的消息-blast序列比对结果】:{}", map.toString());}else{LogMessageEntity logMessageEntity =newLogMessageEntity();
logMessageEntity.setTargetId(blastLogEntity.getId());
logMessageEntity.setUserId(String.valueOf(blastLogEntity.getUserId()));
logMessageEntity.setMsgType(MsgStatusEnum.BLAST.getCode());
logMessageEntity.setContent(MsgStatusEnum.BLAST.getMsg());
apiClient.addLogMessage(logMessageEntity);}}catch(Exception e){
LOGGER.error(e.getMessage());}}
3.C服务
@FeignClient(name ="api", url ="${customer.url}", configuration =FeignConfig.class)publicinterfaceApiClient{@PostMapping("/cms/logMessage/add")@ApiOperation("新增系统通知发送记录")ApiResponse<Boolean>addLogMessage(@RequestBodyLogMessageEntity logMessageEntity);}
三、解决思路
1.确认B调用C服务接口是否能正常调通
在B服务中,通过Controller暴露接口,调用C服务FeignClient的
@PostMapping("/cms/logMessage/add")
接口,通过swagger测试,数据成功存储到数据库中。
——接口没有问题,服务之间通信也没有问题
2.确认B服务是否能正常调用A服务
在B服务中,调用A服务接口,测试成功。
——服务之间通信没有问题
3.确认消息能否正常消费
在B服务中,调用A服务接口,A服务输出发送消息日志。B服务消费输出日志。
——消息无问题
4.总结
- 服务之间通信没有问题
- 消息提供和消费没有问题
综合猜测可能是线程或事务问题。
四、修改代码验证
1.B服务异步调用C服务接口——失败
@RabbitHandler@RabbitListener(bindings ={@QueueBinding(
value =@Queue(value = INSERT_BLAST_NAME, durable ="true"),
exchange =@Exchange(value =RabbitMqConstants.EXCHANGE_NAME),
key = INSERT_BLAST_KEY)})publicvoidinsertBlast(Map map,Channel channel,Message message){try{
LOGGER.info("【生成的消息-blast序列比对结果】:{}", map.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);BlastLogEntity blastLogEntity =TypeSwitchUtil.toSnakeObject(JSON.toJSONString(map),BlastLogEntity.class);if(null== blastLogEntity ||StringUtils.isEmpty(blastLogEntity.getContent())){
LOGGER.warn("【生成的消息-blast序列比对结果】:{}", map.toString());}else{LogMessageEntity logMessageEntity =newLogMessageEntity();
logMessageEntity.setTargetId(blastLogEntity.getId());
logMessageEntity.setUserId(String.valueOf(blastLogEntity.getUserId()));
logMessageEntity.setMsgType(MsgStatusEnum.BLAST.getCode());
logMessageEntity.setContent(MsgStatusEnum.BLAST.getMsg());CompletableFuture.runAsync(()->{
apiClient.addLogMessage(logMessageEntity);});}}catch(Exception e){
LOGGER.error(e.getMessage());}}
经测试,未能解决问题。
2.将消费消息放到C服务中,消费时直接调用service——成功
@RabbitHandler@RabbitListener(bindings ={@QueueBinding(
value =@Queue(value ="blast.insert.queue", durable ="true"),
exchange =@Exchange(value ="tool"),
key ="blast.insert")})publicvoidinsertBlast(Map map,Channel channel,Message message){try{
LOGGER.info("【生成的消息-blast序列比对结果】:{}", map.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);if(null== map ||StringUtils.isEmpty(map.get("id").toString())){
LOGGER.warn("【生成的消息-blast序列比对结果】:{}", map.toString());}else{LogMessageEntity logMessageEntity =newLogMessageEntity();
logMessageEntity.setTargetId(Integer.valueOf(map.get("id").toString()));
logMessageEntity.setUserId(String.valueOf(map.get("user_id")));
logMessageEntity.setMsgType(MsgStatusEnum.BLAST.getCode());
logMessageEntity.setContent(MsgStatusEnum.BLAST.getMsg());
logMessageService.save(logMessageEntity);}}catch(Exception e){
LOGGER.error(e.getMessage());}}
五、结论
在使用消息队列(MQ)调用FeignClient服务时,采取以下步骤:
- 配置消息队列:首先,确保已正确配置和启动您所使用的消息队列,例如 RabbitMQ 或者 Kafka。
- 创建消息生产者:创建一个消息生产者,用于将消息发送到消息队列。您可以使用消息队列的客户端库或框架来实现此功能。
- 发送消息:在需要调用FeignClient服务的地方,将消息发送到消息队列。消息中包含请求的相关信息,例如服务名称、路径、参数等。
- 创建消息消费者:创建一个消息消费者,用于从消息队列接收消息并处理。
- 处理消息:在消息消费者中,处理接收到的消息。根据消息中的请求信息,使用FeignClient来调用相应的服务。
虽然通过直接调用service的方式成功进行数据存储了,这样做也没有违背原有的逻辑。
但是问题仍然没有一个正确的解决方式,希望走过路过的大神能指点一二,在线求助。
走过路过大神帮忙给个正确解决方案~~~~~
版权归原作者 陈年小趴菜 所有, 如有侵权,请联系我们删除。