0


WebSocket服务端消息推送

前言:移动互联网蓬勃发展的今天,大部分手机 APP和网站都提供了消息推送功能,如新闻客户端的热点新闻推荐,IM 工具的聊天消息提醒,电商产品促销信息,企业应用的通知和审批流程等等。推送对于提高产品活跃度、提高功能模块使用率、提升用户粘性、提升用户留存率起到了重要作用,作为 APP 和网站运营中一个关键的渠道,对消息推送的合理运用能有效促进目标的实现。

一、浅析web端的消息推送原理

股票曲线实时变化,在线IM聊天等等,Web系统里总是能见到消息推送的应用。消息推送用好了能增强用户体验,实现消息推送有N种解决方案。

1.1、什么是消息推送

消息推送(Push)指运营人员通过自己的产品或第三方工具对用户当前网页或移动设备进行的主动消息推送。用户可以在网页上或移动设备锁定屏幕和通知栏看到push消息通知。以此来实现用户的多层次需求,使得用户能够自己设定所需要的信息频道,得到即时消息,简单说就是一种定制信息的实现方式。我们平时浏览邮箱时突然弹出消息提示收到新邮件就属于web端消息推送,在手机锁屏上看到的微信消息等等都属于APP消息推送。

Web网站推送:

当我们在浏览网站观望犹豫时,突然看到了系统发来一条消息,一位神秘的神豪老板竟然爆出了麻痹戒指!!!我的天,于是我果断开始了游戏!这消息很及时!

APP移动推送:

上述两种经典场景,是生活中比较常见的场景,也引出了两大推送种类,Web端消息推送和移动端消息推送。本篇博客主要介绍Web推送,顺便提一句移动端App常见第三方推送SDK有极光推送、小米推送等等。

1.2、Web端实现消息推送的四种方式

主要介绍web端其中的四种实现方式:短轮询、Comet长轮询、Server-sent、WebSocket。

(1)短轮询

指在特定的的时间间隔(如每10秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端的浏览器。浏览器做处理后进行显示。无论后端此时是否有新的消息产生,都会进行响应。字面上看,这种方式是最简单的。这种方式的优点是,后端编写非常简单,逻辑不复杂。但是缺点是请求中大部分中是无用的,浪费了带宽和服务器资源。总结来说,简单粗暴,适用于小型(偷懒)应用。

(2)Comet长轮询

长轮询是客户端向服务器发送Ajax请求,服务器接到请求后hold住连接,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求;长连接是在页面中的iframe发送请求到服务端,服务端hold住请求并不断将需要返回前端的数据封装成调用javascript函数的形式响应到前端,前端不断收到响应并处理。Comet的实现原理和短轮询相比,很明显少了很多无用请求,减少了带宽压力,实现起来比短轮询复杂一丢丢。想比用短轮询的同学有梦想时,就可以用Comet来实现自己的推送。

长轮询的优点很明显,在无消息的情况下不会频繁的请求,耗费资小并且实现了服务端主动向前端推送的功能,但是服务器hold连接会消耗资源,返回数据顺序无保证,难于管理维护。WebQQ(好像挂了)就是这样实现的。

(3)Server-sent

服务器推指的是HTML5规范中提供的服务端事件EventSource,浏览器在实现了该规范的前提下创建一个EventSource连接后,便可收到服务端的发送的消息,实现一个单向通信。客户端进行监听,并对响应的信息处理显示。该种方式已经实现了服务端主动推送至前端的功能。优点是在单项传输数据的场景中完全满足需求,开发人员扩展起来基本不需要改后端代码,直接用现有框架和技术就可以集成。

(4)WebSocket

WebSocket是HTML5下一种新的协议,是基于TCP的应用层协议,只需要一次连接,便可以实现全双工通信,客户端和服务端可以相互主动发送消息。客户端进行监听,并对响应的消息处理显示。

这个技术相信基本都听说过,就算没写过代码,也大概知道干嘛的。通过名字就能知道,这是一个Socket连接,一个能在浏览器上用的Socket连接。WebSocket是HTML5标准中的一个内容,浏览器通过javascript脚本手动创建一个TCP连接与服务端进行通讯。优点是双向通信,都可以主动发送消息,既可以满足“问”+“答”的响应机制,也可以实现主动推送的功能。缺点就是编码相对来说会多点,服务端处理更复杂(我觉得当一条有情怀的咸鱼就应该用这个!)。

1.3、实现个性化的推送

上面说了很多实现方案,针对自己系统的应用场景选择合适的推送方案才是合理的,因此最后简单说一下实现个性化推送的两种方式。第一种很简单,直接使用第三方实现的推送,无需复杂的开发运维,直接可以使用。第二种就是自己封装,可以选择如今较为火热的WebSocket来实现系统的消息推送。

①直接用第三方的消息推送服务(并发量多了会收费)

在这里推荐一个第三方推送平台,GoEasy。

推荐理由是GoEasy的理念符合我们的选择(可参考http://t.cn/Ex6jg3q):

(1)更简单的方式将消息从服务器端推送至客户端
(2)更简单的方式将消息从各种客户端推送至客户端

GoEasy具体的使用方式这里不再赘述,详见官网。对于后端后端开发者,可直接使用Rest方式调用推送,对于前端或web开发者,可以从web客户端用javascript脚本进行调用推送。

②封装自己的推送服务

如果是一个老系统进行扩展,那么更推荐使用Server-sent,服务端改动量不会很大。如果是新系统,更推荐websocket,实现的功能功能更全面。

我们如果需要使用websocket技术实现自己的推送服务,需要注意哪些点,或者说需要踩哪些坑呢,本文列出几点供大家参考:

长连接的心跳激活处理;

服务端调优实现高并发量client同时在线(单机服务器可以实现百万并发长连接);

群发消息;

服务端维持多用户的状态;

从WebSocket中获取HttpSession进行用户相关操作;

等等等….


二、WebSocket简介

2.1、websocket的由来

我们经常用的是HTTP协议,而HTTP协议是一种无状态的协议,要实现有状态的会话必须借助一些外部机制如session/cookie或者Token,这或多或少会带来一些不便,尤其是服务端和客户端需要实时交换数据的时候(监控,聊天),这个问题更加明显。为了适应这种环境,websocket就产生了,目的是即时通讯,替代轮询。

2.2、websocket概述

WebSocket 是HTML5一种新的协议。它实现了浏览器与服务器全双工通信(full-duplex)。一开始的握手需要借助HTTP请求完成。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

websocket的特点或作用

  • 允许服务端主动向客户端推送数据
  • 在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

websocket使用的优点

  • 更强的实时性
  • 保持连接状态,创建一次连接后,之后通信时可以省略部分状态信息。较少的控制开销,在连接创建后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对较小。在不包含扩展的情况下,对于服务器到客户端的内容,此头部大小只有2至10字节(和数据包长度有关);对于客户端到服务器的内容,此头部还需要加上额外的4字节的掩码。相对于HTTP请求每次都要携带完整的头部,此项开销显著减少了

websocket使用的缺点

由于websocket使用的持久连接,与服务器一直保持连接,对服务器压力很大

2.3、websocket请求头和响应头

浏览器发送websocket请求头类似如下:

下面是对请求头部解释(比http协议多了Upgrade和Connection,是告诉服务器包协议设计ws):

  • Accept-Encoding:浏览器可以接受的数据的压缩类型。
  • Accept-Language:浏览器可以接受的语言类型。
  • Cache-Control:no-cache不使用强缓存。
  • Connection:Upgrade 通知服务器通信协议提升。
  • Host:主机名。
  • Origin:用于验证浏览器域名是否在服务器许可范围内。
  • Pragma:no-cache HTTP/1.0定义的不使用本地缓存。
  • Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits
  • Sec-WebSocket-Key:lb69kw8CsB4CrSk9tKa3 g== 握手协议密钥,base64编码的16字节的随机字符串。
  • Sec-WebSocket-Version:13 版本号。
  • Upgrade:websocket 使用websocket协议进行传输数据,而不使用HTTP/1.1。
  • User-Agent:用户代理字符串。

服务器接收到客户端请求后做出响应并返回如下:

下面是服务器返回的头部解释:

  • Connection:Upgrade 通信协议提升。
  • Date:通信时间
  • Upgrade: websocket 传输协议升级为websocket。
  • Sec-WebSocket-Extensions:permessage-deflate
  • Sec-WebSocket-Accept:q9g5u1WfIWaAjNgMmjlTQTqkS/k= 将Sec-WebSocket-Key的值进行一定的运算和该值进行比较来判断是否是目标服务器响应了WebSocket请求。
  • Upgrade: 使用websocket协议进行数据传输

2.4、WebSocket和Socket的区别

短答案:就像Java和JavaScript,并没有什么太大的关系,但又不能说完全没关系。可以这么说:

  • 命名方面,Socket是一个深入人心的概念,WebSocket借用了这一概念;
  • 使用方面,完全两个东西。

Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口(不是协议,为了方便使用TCP或UDP而抽象出来的一层,是位于应用层和传输控制层之间的一组接口)。

WebSocket是应用层协议。

2.5、向指定用户发送WebSocket消息并处理对方不在线的情况

给指定用户发送消息:

  • 如果接收者在线,则直接发送消息;
  • 否则将消息存储到redis,等用户上线后主动拉取未读消息。

2.6、WebSocket心跳机制

在使用WebSocket的过程中,有时候会遇到网络异常断开的情况,但是在网络断开的时候服务器端并没有触发onclose的事件。这样会有:服务器会继续向客户端发送多余的连接,并且这些数据还会丢失。所以就需要一种机制来检测客户端和服务端是否处于正常的连接状态,因此就有了WebSocket的心跳机制了。还有心跳,说明还活着,没有心跳说明已经挂掉了。

心跳机制

心跳机制是每隔一段时间会向服务器发送一个数据包,告诉服务器自己还活着,同时客户端会确认服务器端是否还活着,如果还活着的话,就会回传一个数据包给客户端来确定服务器端也还活着,否则的话,有可能是网络断开连接了,需要重连。

2.7、Netty可以实现WebSocket

Netty是由jboss提供的一款开源框架,常用于搭建RPC中的TCP服务器和WebSocket服务器,甚至是类似Tomcat的web服务器,反正就是各种网络服务器,在处理高并发的项目中,功能丰富且性能良好,基于Java中NIO的二次封装,具有比原生NIO更好更稳健的体验。


三、基于Netty实现WebSocket消息推送

因为产品需求,要实现服务端推送消息至客户端,并且支持客户端对用户点对点消息发送的社交功能。服务端给客户端推送消息,可以选择原生的WebSocket,或者更加高级的Netty框架实现。

在此我极力推荐netty,因为一款好的框架一般都是在原生的基础上进行包装成更加实用方便,很多我们需要自己考虑的问题都基本可以不用去考虑,不过此文不会去讲netty有多么的高深莫测,因为这些概念性的东西随处可见,而是通过实战来达到推送消息的目的。

这个小节,我们主要讲解下如何整合Netty和WebSocket。我们需要使用netty对接websocket连接,实现双向通信,这一步需要有服务端的netty程序,用来处理客户端的websocket连接操作,例如建立连接,断开连接,收发数据等。

WebSocket消息推送实现思路:

前端使用WebSocket与服务端创建连接的时候,将用户ID传给服务端,服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中。

如果需要给所有用户发送消息,直接执行channel组的writeAndFlush()方法;

如果需要给指定用户发送消息,根据用户ID查询到对应的channel,然后执行writeAndFlush()方法;

前端获取到服务端推送的消息之后,将消息内容展示到文本域中

下面是具体的代码实现,基本上每一步操作都配有注释说明,配合注释看应该还是比较容易理解的。

3.1、引入Netty的依赖

netty-all包含了netty的所有封装,hutool-all封装了常用的一些依赖,如Json相关

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.33.Final</version>
</dependency>

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.2.3</version>
</dependency>

3.2、修改配置文件application.yml

server:
  port: 8899

#netty的配置信息(端口号,webSocket路径)
webSocket:
  netty:
    port: 58080
    path: /webSocket
    readerIdleTime: 30 #读空闲超时时间设置(Netty心跳检测配置)
    writerIdleTime: 30 #写空闲超时时间设置(Netty心跳检测配置)
    allIdleTime: 30 #读写空闲超时时间设置(Netty心跳检测配置)

3.3、创建NettyConfig

在NettyConfig中定义一个单例的channel组,管理所有的channel,再定义一个map,管理用户与channel的对应关系

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;

/**
 * NettyConfig类
 *
 * @author hs
 * @date 2021-09-18
 */
public class NettyConfig {

    /**
     * 定义一个channel组,管理所有的channel
     * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放用户与Chanel的对应信息,用于给指定用户发送消息
     */
    private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

    private NettyConfig() {}

    /**
     * 获取channel组
     * @return
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 获取用户channel map
     * @return
     */
    public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
        return userChannelMap;
    }
}

3.4、创建Netty的初始化类NettyServer(重点)

定义两个EventLoopGroup,bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之间的读写操作,需要说明的是,需要开启一个新的线程来执行netty server,要不然会阻塞主线程,到时候就无法调用项目的其他controller接口了。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

/**
 * Netty初始化服务
 *
 * @author hs
 */
@Component
public class NettyServer{

    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port}")
    private int port;

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path}")
    private String webSocketPath;

    /**
     * 在Netty心跳检测中配置 - 读空闲超时时间设置
     */
    @Value("${webSocket.netty.readerIdleTime}")
    private long readerIdleTime;

    /**
     * 在Netty心跳检测中配置 - 写空闲超时时间设置
     */
    @Value("${webSocket.netty.writerIdleTime}")
    private long writerIdleTime;

    /**
     * 在Netty心跳检测中配置 - 读写空闲超时时间设置
     */
    @Value("${webSocket.netty.allIdleTime}")
    private long allIdleTime;

    @Autowired
    private WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    /**
     * 启动
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
        bootstrap.group(bossGroup,workGroup);
        // 设置NIO类型的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 设置监听端口
        bootstrap.localAddress(new InetSocketAddress(port));
        // 连接到达时会创建一个通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 心跳检测(一般情况第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型)
                ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
                // 流水线管理通道中的处理程序(Handler),用来处理业务
                // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以块的方式来写的处理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
                /*
                    说明:
                    1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
                    2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求
                 */
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                /*
                    说明:
                    1、对应webSocket,它的数据是以帧(frame)的形式传递
                    2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
                    3、核心功能是将http协议升级为ws协议,保持长连接
                */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                // 自定义的handler,处理业务逻辑
                ch.pipeline().addLast(webSocketHandler);
            }
        });
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
        // 对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
    }

    /**
     * 释放资源
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if(bossGroup != null){
            bossGroup.shutdownGracefully().sync();
        }
        if(workGroup != null){
            workGroup.shutdownGracefully().sync();
        }
    }

    /**
     * 初始化(新线程开启)
     */
    @PostConstruct()
    public void init() {
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

注意:启动方法需要开启一个新线程执行netty server服务,服务中配置了IdleStateHandler心跳检测,此类要在创建一个通道的第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型)

3.5、具体实现业务的WebSocketHandler(重点)

创建Netty配置的操作执行类WebSocketHandler,userEventTriggered为心跳检测超时所调用的方法,超时后ctx.channel().close()执行完毕会主动调用handlerRemoved删除通道及用户信息。

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 操作执行类
 *
 * TextWebSocketFrame类型,表示一个文本帧
 * @author hs
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);

    /**
     * 一旦连接,第一个被执行
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        // 当用户ID已存入通道内,则不进行写入,只有第一次建立连接时才会存入,其他情况发送uid则为心跳需求
        if(!NettyConfig.getUserChannelMap().containsKey(uid)){
            log.info("服务器收到消息:{}",msg.text());
            NettyConfig.getUserChannelMap().put(uid,ctx.channel());
            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(uid);
            // 回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器连接成功!"));
        }else{
            // 前端定时请求,保持心跳连接,避免服务端误删通道
            ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
        }
    }

    /**
     * 移除通道及关联用户
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    /**
     * 异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}",cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 心跳检测相关方法 - 会主动调用handlerRemoved
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if(event.state() == IdleState.ALL_IDLE){
                //清除超时会话
                ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
                writeAndFlush.addListener(new ChannelFutureListener() {

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        ctx.channel().close();
                    }
                });
            }
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * 删除用户与channel的对应关系
     * @param ctx
     */
    private void removeUserId(ChannelHandlerContext ctx){
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getUserChannelMap().remove(userId);
        log.info("删除用户与channel的对应关系,uid:{}",userId);
    }
}

3.6、具体消息推送的接口

/**
 * 推送消息接口
 *
 * @author hs
 */
public interface PushService {

    /**
     * 推送给指定用户
     * @param userId 用户ID
     * @param msg 消息信息
     */
    void pushMsgToOne(String userId,String msg);

    /**
     * 推送给所有用户
     * @param msg 消息信息
     */
    void pushMsgToAll(String msg);

    /**
     * 获取当前连接数
     * @return 连接数
     */
    int getConnectCount();
}

3.7、消息推送接口实现类

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 推送消息接口实现类
 *
 * @author hs
 */
@Service
public class PushServiceImpl implements PushService {

    /**
     * 推送给指定用户
     * @param userId 用户ID
     * @param msg 消息信息
     */
    @Override
    public void pushMsgToOne(String userId, String msg){
        ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap();
        Channel channel = userChannelMap.get(userId);
        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }

    /**
     * 推送给所有用户
     * @param msg 消息信息
     */
    @Override
    public void pushMsgToAll(String msg){
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }

    /**
     * 获取当前连接数
     * @return 连接数
     */
    @Override
    public int getConnectCount() {
        return NettyConfig.getChannelGroup().size();
    }
}

3.8、提供消息推送服务的Controller

主要为了测试

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * 请求Controller(用于postman测试)
 *
 * @author hs
 */
@RestController
@RequestMapping("/push")
public class PushController {

    @Autowired
    private PushService pushService;

    /**
     * 推送给所有用户
     * @param msg 消息信息
     */
    @PostMapping("/pushAll")
    public void pushToAll(@RequestParam("msg") String msg){
        pushService.pushMsgToAll(msg);
    }

    /**
     * 推送给指定用户
     * @param userId 用户ID
     * @param msg 消息信息
     */
    @PostMapping("/pushOne")
    public void pushMsgToOne(@RequestParam("userId") String userId,@RequestParam("msg") String msg){
        pushService.pushMsgToOne(userId,msg);
    }

    /**
     * 获取当前连接数
     */
    @GetMapping("/getConnectCount")
    public int getConnectCout(){
        return pushService.getConnectCount();
    }
}

3.9、Web前端通过websocket与服务端连接

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="/static/jquery-2.2.4.min.js" charset="utf-8"></script>
</head>
<body>
<script>
    var socket;
    var userId = "123456";
    // 判断当前浏览器是否支持webSocket
    if(window.WebSocket){
        socket = new WebSocket("ws://127.0.0.1:58080/webSocket")
        // 相当于channel的read事件,ev 收到服务器回送的消息
        socket.onmessage = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "
" + ev.data;
        }
        // 相当于连接开启
        socket.onopen = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = "连接开启了..."
            socket.send(
                JSON.stringify({
                    // 连接成功将,用户ID传给服务端
                    uid: userId
                })
            );
        }

        //接受到服务端关闭连接时的回调方法
        socket.onclose = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "
" + "连接关闭了...";
        }

   // 监听窗口事件,当窗口关闭时,主动断开websocket连接,防止连接没断开就关闭窗口,server端报错
         window.onbeforeunload = function(){
            socket.close();
         }

    }
    else
    {
        alert("当前浏览器不支持webSocket")
    }

    // 如果前端需要保持连接,则需要定时往服务器针对自己发送请求,返回的参数和发送参数一致则证明时间段内有交互,服务端则不进行连接断开操作
    var int = self.setInterval("clock()",10000);
    function clock() {
        socket.send(
            JSON.stringify({
                // 连接成功将,用户ID传给服务端
                uid: userId
            })
        );
    }

</script>
<form onsubmit="return false">
    <textarea id="responseText" style="height: 150px; width: 300px;"></textarea>
    <br>
    <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>

3.10、WebSocket断开的原因

原因有很多,最好在WebSocket断开时,将错误打印出来。

ws.onclose = function (ev) {
  console.log('websocket 断开: ' + ev.code + ' ' + ev.reason + ' ' + ev.wasClean)
  console.log(ev)
}

错误状态码:

WebSocket断开时,会触发

CloseEvent

, CloseEvent会在连接关闭时发送给使用 WebSockets 的客户端. 它在 WebSocket 对象的 onclose 事件监听器中使用。CloseEvent的code字段表示了WebSocket断开的原因。可以从该字段中分析断开的原因。

CloseEvent有三个字段需要注意, 通过分析这三个字段,一般就可以找到断开原因

  • CloseEvent.code: code是错误码,是整数类型
  • CloseEvent.reason: reason是断开原因,是字符串
  • CloseEvent.wasClean: wasClean表示是否正常断开,是布尔值。一般异常断开时,该值为false

状态码

名称

描述

0–999

保留段, 未使用.

1000

CLOSE_NORMAL

正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.

1001

CLOSE_GOING_AWAY

终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.

1002

CLOSE_PROTOCOL_ERROR

由于协议错误而中断连接.

1003

CLOSE_UNSUPPORTED

由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).

1004

保留

. 其意义可能会在未来定义.

1005

CLOSE_NO_STATUS

保留

. 表示没有收到预期的状态码.

1006

CLOSE_ABNORMAL

保留

. 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).

1007

Unsupported Data

由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).

1008

Policy Violation

由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.

1009

CLOSE_TOO_LARGE

由于收到过大的数据帧而断开连接.

1010

Missing Extension

客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.

1011

Internal Error

客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.

1012

Service Restart

服务器由于重启而断开连接.

1013

Try Again Later

服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接.

1014

由 WebSocket标准保留以便未来使用.

1015

TLS Handshake

保留. 表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).

1016–1999

由 WebSocket标准保留以便未来使用.

2000–2999

由 WebSocket拓展保留使用.

3000–3999

可以由库或框架使用. 不应由应用使用. 可以在 IANA 注册, 先到先得.

4000–4999

可以由应用使用.


四、WebSocket和Http之长连接和短连接区别

4.1、HTTP1.0、HTTP1.1 和 HTTP2.0 的区别

HTTP是一个应用层协议,无状态的,端口号为80。主要的版本有1.0/1.1/2.0.

(1) HTTP/1.0

一次请求-响应,建立一个连接,用完关闭;

(2) HTTP/1.1

HTTP 1.1支持长连接(PersistentConnection)和请求的流水线(Pipelining)处理

串行化单线程处理,可以同时在同一个tcp链接上发送多个请求,但是只有响应是有顺序的,只 有上一个请求完成后,下一个才能响应。一旦有任务处理超时等,后续任务只能被阻塞(线头阻塞);

(3)HTTP/2

HTTP2支持多路复用,所以通过同一个连接实现多个http请求传输变成了可能。请求并行执行,某任务耗时严重,不会影响到任务正常执行。

4.2、什么是websocket?

Websocket是html5提出的一个协议规范,是为解决客户端与服务端实时通信。本质上是一个基于tcp,先通过HTTP/HTTPS协议发起一条特殊的http请求进行握手后创建一个用于交换数据的TCP连接。

WebSocket优势: 浏览器和服务器只需要要做一个握手的动作,在建立连接之后,双方可以在任意时刻相互推送信息。同时,服务器与客户端之间交换的头信息很小。

4.3、什么是Http长连接和短连接

在HTTP/1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断TCP连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个Web资源,浏览器就会重新建立一个HTTP会话。

而从HTTP/1.1起,默认使用长连接,用以保持连接特性。使用长连接的HTTP协议,会在响应头加入这行代码:

Connection:keep-alive

在使用长连接的情况下,当一个网页打开完成后,客户端和服务器之间用于传输HTTP数据的TCP连接不会关闭,客户端再次访问这个服务器时,会继续使用这一条已经建立的连接。Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中设定这个时间。实现长连接需要客户端和服务端都支持长连接。

HTTP协议的长连接和短连接,实质上是TCP协议的长连接和短连接。

4.4、http和websocket的长连接区别

HTTP1.1通过使用Connection:keep-alive进行长连接,HTTP 1.1默认进行持久连接。在一次 TCP 连接中可以完成多个 HTTP 请求,但是对每个请求仍然要单独发 header,Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中设定这个时间。这种长连接是一种“伪链接”

websocket的长连接,是一个真的全双工。长连接第一次tcp链路建立之后,后续数据可以双方都进行发送,不需要发送请求头。

keep-alive双方并没有建立正真的连接会话,服务端可以在任何一次请求完成后关闭。WebSocket 它本身就规定了是正真的、双工的长连接,两边都必须要维持住连接的状态。

4.5、HTTP2.0和HTTP1.X相比的新特性

  • 新的二进制格式(Binary Format),HTTP1.x的解析是基于文本。基于文本协议的格式解析存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合。基于这种考虑HTTP2.0的协议解析决定采用二进制格式,实现方便且健壮。
  • 多路复用(MultiPlexing),即连接共享,即每一个request都是是用作连接共享机制的。一个request对应一个id,这样一个连接上可以有多个request,每个连接的request可以随机的混杂在一起,接收方可以根据request的 id将request再归属到各自不同的服务端请求里面。
  • header压缩,如上文中所言,对前面提到过HTTP1.x的header带有大量信息,而且每次都要重复发送,HTTP2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小。
  • 服务端推送(server push),同SPDY一样,HTTP2.0也具有server push功能。

参考链接:

WebSocket使用

SpringBoot+WebSocket+Netty实现消息推送


本文转载自: https://blog.csdn.net/m0_67391121/article/details/125345879
版权归原作者 m0_67391121 所有, 如有侵权,请联系我们删除。

“WebSocket服务端消息推送”的评论:

还没有评论