0


【开源项目】消息推送平台austin介绍

项目介绍

核心功能:统一的接口发送各种类型消息,对消息生命周期全链路追踪。

意义:只要公司内部有发送消息的需求,都应该要有类似

austin

的项目。消息推送平台对各类消息进行统一发送处理,这有利于对功能的收拢,以及提高业务需求开发的效率。

项目地址

https://github.com/ZhongFuCheng3y/austin

项目拆解

下发消息接口,分为群发和单发。接口参数主要有模板id(发送消息的内容模板),参数[用来替换模板参数,接收人],api可以存在多个,但是具体处理的方法只需要批量参数就可以。

单发的接口请求实体类

publicclassSendRequest{/**
     * 执行业务类型
     * send:发送消息
     * recall:撤回消息
     */privateString code;/**
     * 消息模板Id
     * 【必填】
     */privateLong messageTemplateId;/**
     * 消息相关的参数
     * 当业务类型为"send",必传
     */privateMessageParam messageParam;}

群发的接口实体类

publicclassBatchSendRequest{/**
     * 执行业务类型
     * 必传,参考 BusinessCode枚举
     */privateString code;/**
     * 消息模板Id
     * 必传
     */privateLong messageTemplateId;/**
     * 消息相关的参数
     * 必传
     */privateList<MessageParam> messageParamList;}

单个消息的实体

publicclassMessageParam{/**
     * @Description: 接收者
     * 多个用,逗号号分隔开
     * 【不能大于100个】
     * 必传
     */privateString receiver;/**
     * @Description: 消息内容中的可变部分(占位符替换)
     * 可选
     */privateMap<String,String> variables;/**
     * @Description: 扩展参数
     * 可选
     */privateMap<String,String> extra;}

消息模板中定义了发送渠道,消息渠道决定消息的处理器。

AssembleAction

转换实体类。核心逻辑有

  • 将模板中的可变参数转化成文本内容
  • 根据消息渠道获取消息实体类
  • 生成业务编码
AssembleAction#getContentModelValue

ContentHolderUtil.replacePlaceHolder(originValue, variables);

用来处理可替换变量。

privatestaticContentModelgetContentModelValue(MessageTemplate messageTemplate,MessageParam messageParam){// 得到真正的ContentModel 类型Integer sendChannel = messageTemplate.getSendChannel();Class<?extendsContentModel> contentModelClass =ChannelType.getChanelModelClassByCode(sendChannel);// 得到模板的 msgContent 和 入参Map<String,String> variables = messageParam.getVariables();JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());// 通过反射 组装出 contentModelField[] fields =ReflectUtil.getFields(contentModelClass);ContentModel contentModel =ReflectUtil.newInstance(contentModelClass);for(Field field : fields){String originValue = jsonObject.getString(field.getName());if(StrUtil.isNotBlank(originValue)){String resultValue =ContentHolderUtil.replacePlaceHolder(originValue, variables);Object resultObj =JSONUtil.isJsonObj(resultValue)?JSONUtil.toBean(resultValue, field.getType()): resultValue;ReflectUtil.setFieldValue(contentModel, field, resultObj);}}// 如果 url 字段存在,则在url拼接对应的埋点参数String url =(String)ReflectUtil.getFieldValue(contentModel, LINK_NAME);if(StrUtil.isNotBlank(url)){String resultUrl =TaskInfoUtils.generateUrl(url, messageTemplate.getId(), messageTemplate.getTemplateType());ReflectUtil.setFieldValue(contentModel, LINK_NAME, resultUrl);}return contentModel;}

SendMqAction

负责发送消息,根据

austin.mq.pipeline

的值获取不同的消息发送者,有MQ发消息,有EventBus,有SpringEvent。

@Slf4j@ServicepublicclassSendMqActionimplementsBusinessProcess<SendTaskModel>{@AutowiredprivateSendMqService sendMqService;@Value("${austin.business.topic.name}")privateString sendMessageTopic;@Value("${austin.business.recall.topic.name}")privateString austinRecall;@Value("${austin.business.tagId.value}")privateString tagId;@Value("${austin.mq.pipeline}")privateString mqPipeline;@Overridepublicvoidprocess(ProcessContext<SendTaskModel> context){SendTaskModel sendTaskModel = context.getProcessModel();try{if(BusinessCode.COMMON_SEND.getCode().equals(context.getCode())){String message = JSON.toJSONString(sendTaskModel.getTaskInfo(),newSerializerFeature[]{SerializerFeature.WriteClassName});
                sendMqService.send(sendMessageTopic, message, tagId);}elseif(BusinessCode.RECALL.getCode().equals(context.getCode())){String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(),newSerializerFeature[]{SerializerFeature.WriteClassName});
                sendMqService.send(austinRecall, message, tagId);}}catch(Exception e){
            context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
            log.error("send {} fail! e:{},params:{}", mqPipeline,Throwables.getStackTraceAsString(e), JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));}}}
ConsumeServiceImpl#consume2Send

,负责处理消息

@Overridepublicvoidconsume2Send(List<TaskInfo> taskInfoLists){String topicGroupId =GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));for(TaskInfo taskInfo : taskInfoLists){
            logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(),AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
            taskPendingHolder.route(topicGroupId).execute(task);}}
Task#run

,负责任务的具体处理

@Overridepublicvoidrun(){// 0. 丢弃消息if(discardMessageService.isDiscard(taskInfo)){return;}// 1. 屏蔽消息
        shieldService.shield(taskInfo);// 2.平台通用去重if(CollUtil.isNotEmpty(taskInfo.getReceiver())){
            deduplicationRuleService.duplication(taskInfo);}// 3. 真正发送消息if(CollUtil.isNotEmpty(taskInfo.getReceiver())){
            handlerHolder.route(taskInfo.getSendChannel()).doHandler(taskInfo);}}
EmailHandler#handler

,邮件处理器专门处理邮件的消息

@Overridepublicbooleanhandler(TaskInfo taskInfo){EmailContentModel emailContentModel =(EmailContentModel) taskInfo.getContentModel();MailAccount account =getAccountConfig(taskInfo.getSendAccount());try{File file =StrUtil.isNotBlank(emailContentModel.getUrl())?AustinFileUtils.getRemoteUrl2File(dataPath, emailContentModel.getUrl()):null;String result =Objects.isNull(file)?MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(),true):MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(),true, file);}catch(Exception e){
            log.error("EmailHandler#handler fail!{},params:{}",Throwables.getStackTraceAsString(e), taskInfo);returnfalse;}returntrue;}
ShieldServiceImpl#shield

,发送消息判断是否需要白天屏蔽。将消息存储到Redis中,开启xxl-job从redis中获取数据。

@Overridepublicvoidshield(TaskInfo taskInfo){if(ShieldType.NIGHT_NO_SHIELD.getCode().equals(taskInfo.getShieldType())){return;}/**
         * example:当消息下发至austin平台时,已经是凌晨1点,业务希望此类消息在次日的早上9点推送
         * (配合 分布式任务定时任务框架搞掂)
         */if(isNight()){if(ShieldType.NIGHT_SHIELD.getCode().equals(taskInfo.getShieldType())){
                logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());}if(ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())){
                redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo,SerializerFeature.WriteClassName),(DateUtil.offsetDay(newDate(),1).getTime()/1000)-DateUtil.currentSeconds());
                logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());}
            taskInfo.setReceiver(newHashSet<>());}}/**
     * 小时 < 8 默认就认为是凌晨(夜晚)
     *
     * @return
     */privatebooleanisNight(){returnLocalDateTime.now().getHour()<8;}

总结一下

  1. 下发消息接口,分为群发和单发。接口参数主要有模板id(发送消息的内容模板),模板中定义了消息渠道(消息类型决定消息的处理器),参数[用来替换模板参数,接收人],api可以存在多个,但是具体处理的方法只需要批量参数就可以。
  2. 根据模板组装数据,替换变量。
  3. 消息发送,使用监听器或者消息队列,进行异步解耦。消息发送的业务和消息接收的业务拆解开来。
  4. 根据发送渠道获取对应的消息处理器(用邮件还是短信),使用策略模式进行不同的消息渠道拆分,用具体的消息处理器进行处理消息。

在这里插入图片描述

标签: 开源 java servlet

本文转载自: https://blog.csdn.net/qq_42985872/article/details/129930921
版权归原作者 秋装什么 所有, 如有侵权,请联系我们删除。

“【开源项目】消息推送平台austin介绍”的评论:

还没有评论