0


springboot集成socket服务

1,先在配置文件中配置Socket监听的端口,由于JDK1.8中已经整合了Socket服务到java.net包中,因此不需要引入其他依赖了

在application.yml中配置下

#Socket配置
socket:
  port: 8503

2,配置Socket连接类,这一步类似配置一个控制器(Controller),主要用于接收客户端的连接请求,我这里将它写在了控制器类中

package com.linzhuo.app.controller;import com.linzhuo.app.service.IBusinessSocketService;import org.springframework.beans.factory.annotation.Value;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.io.BufferedReader;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;

/**
 * @author: jss
 * Date: 2023/7/25 上午10:58
 */
@Slf4j
@Component
public class SocketServers {

    //注入被开放的端口
    @Value("${socket.port}")
    private Integer port;

    //这个是业务处理的接口
    @Resource
    private IBusinessSocketService socketService;

    @PostConstruct
    public void socketStart(){
        //直接另起一个线程挂起Socket服务
        new Thread(this::socketServer).start();}

    private void socketServer(){
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        try (ServerSocket serverSocket = new ServerSocket(port)){
            log.info("socket端口在:{}中开启并持续监听=====>", port);while(Boolean.TRUE){
                Socket clientSocket = serverSocket.accept();
                //设置流读取的超时时间,这里设置为10s。超时后自动断开连接
                clientSocket.setSoTimeout(10000);
                //是否与客户端保持持续连接,这行代码在本示例中,并没有作用,因为后面的逻辑处理完成后,会自动断开连接.
                clientSocket.setKeepAlive(Boolean.TRUE);
                log.info("发现客户端连接Socket:{}:{}===========>", clientSocket.getInetAddress().getHostAddress(),
                        clientSocket.getPort());
                //这里通过线程池启动ClientHandler方法中的run方法.
                executorService.execute(new ClientHandler(clientSocket, socketService));}} catch (Exception e){
            log.error("Socket服务启动异常!", e);}}

    public static void main(String[] args) throws Exception {
        requestInfoToSocketServer();}

    private static void requestInfoToSocketServer(){
        try (Socket socket = new Socket("127.0.0.1", 8503);
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true)){
//            Object phone ="15210161743";
            out.write("15210161743");
            out.flush();
            //记得调用shutdownOutput()方法,否则服务端的流读取会一直等待
            socket.shutdownOutput();
            //开始接收服务端的消息
            BufferedReader in= new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println("接收到服务端的回复:" + in.readLine());} catch (Exception e){
            System.out.println("Socket传输数据异常!" + e.getMessage());}}}

3,配置自定义客户端类,这里也将写在了控制器类中,也可以自己去写在其他文件夹里

package com.linzhuo.app.controller;import com.linzhuo.app.service.IBusinessSocketService;import com.linzhuo.app.util.SnowFlakeUtil;import com.linzhuo.app.util.StringUtil;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.springframework.util.StringUtils;import java.io.IOException;import java.io.PrintWriter;import java.net.Socket;import java.util.concurrent.TimeUnit;

/**
 * @author: jss
 * Date: 2023/7/25 上午11:06
 */
@Slf4j
public class ClientHandler implements Runnable {

    private final Socket clientSocket;

    private final IBusinessSocketService socketService;

    public ClientHandler(Socket clientSocket, IBusinessSocketService socketService){
        this.clientSocket = clientSocket;
        this.socketService = socketService;}

    @Override
    @SneakyThrows
    public void run(){
        //SnowFlakeUtil 雪花ID生成工具类,后面会统一给出
        String logId = SnowFlakeUtil.getId();
        String hostIp = clientSocket.getInetAddress().getHostAddress();
        String port = String.valueOf(clientSocket.getPort());
        try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), Boolean.TRUE)){
            //这里的StringUtil是自己写的工具类,后面会统一给出
            String requestInfo = StringUtil.readToStreamToString(clientSocket.getInputStream());if(requestInfo != null){
                log.info("监听到客户端消息:{},监听日志ID为:{}", requestInfo, logId);
                socketService.executeBusinessCode(requestInfo, logId, out);
                clientSocket.shutdownOutput();
                TimeUnit.SECONDS.sleep(1L);}} catch (IOException e){
            log.error("与客户端:[{}:{}]通信异常!错误信息:{}", hostIp, port, e.getMessage());} finally {
            log.info("客户端:[{}:{}]Socket连接已关闭,日志ID为:{}========>", hostIp, port, logId);}}}

4,定义业务接口,service类

package com.linzhuo.app.service;import java.io.PrintWriter;

/**
 * @author: jss
 * Date: 2023/7/25 上午11:11
 */
public interface IBusinessSocketService {
    /**
     * 从Socket中接受消息并处理
     *
     * @param requestInfo 请求报文
     * @param logId       日志ID
     * @param writer      回写给客户端消息的回写类(Socket自带)
     */
    void executeBusinessCode(String requestInfo, String logId, PrintWriter writer);}

5,定义业务实现类,serviceImpl

package com.linzhuo.app.service.impl;import com.linzhuo.app.service.IBusinessSocketService;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import org.springframework.util.StringUtils;import javax.annotation.Resource;import java.io.PrintWriter;

/**
 * @author: jss
 * Date: 2023/7/25 上午11:12
 */
@Slf4j
@Service
public class IBusinessSocketServiceImpl implements IBusinessSocketService {

    @Resource
    LzLoginServiceImpl loginService;
    @Override
    public void executeBusinessCode(String requestInfo, String logId, PrintWriter writer){
        Object responseMsg;
        boolean isSuccess = Boolean.TRUE;
        try {if(StringUtils.isEmpty(requestInfo)){return;}
            //执行你的业务操作
            log.info("接收到的手机为:{}", requestInfo);
//            responseMsg ="回填给客户端的信息,可以是任何格式的对象34";
//这里调用了自己的其他接口,接收客户端发送的手机号 ,进行发送验证码
            Object phoneCode = loginService.getPhoneCode(requestInfo, "1");
            responseMsg = phoneCode;} catch (Exception e){
            isSuccess = Boolean.FALSE;
            e.printStackTrace();
            responseMsg ="回填给客户端的信息(业务处理错误的情况下)";}
        try {
            //将响应报文通过PrintWriter回写给客户端
            writer.println(responseMsg);} catch (Exception e){
            log.error("Socket客户端数据返回异常!当前日志ID:[{}],异常信息:{}", logId, e);}}}

6,两个相关的工具类

第一个SnowFlakeUtil

package com.linzhuo.app.util;

/**************************************************
 * copyright (c)2022
 * created by jss
 * date:       2022-9-18
 * description: 雪花算法工具类
 *
 **************************************************/
public class SnowFlakeUtil {

    private long workerId;
    private long datacenterId;
    private long sequence = 0L;
    private long twepoch = 1288834974657L;                              //  Thu, 04 Nov 2010 01:42:54 GMT 标记时间 用来计算偏移量,距离当前时间不同,得到的数据的位数也不同
    private long workerIdBits = 5L;                                     //  物理节点ID长度
    private long datacenterIdBits = 5L;                                 //  数据中心ID长度
    private long maxWorkerId =-1L ^ (-1L << workerIdBits);             //  最大支持机器节点数0~31,一共32个
    private long maxDatacenterId =-1L ^ (-1L << datacenterIdBits);     //  最大支持数据中心节点数0~31,一共32个
    private long sequenceBits = 12L;                                    //  序列号12位, 4095,同毫秒内生成不同id的最大个数
    private long workerIdShift = sequenceBits;                          //  机器节点左移12位
    private long datacenterIdShift = sequenceBits + workerIdBits;       //  数据中心节点左移17位
    private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; //  时间毫秒数左移22位
    private long sequenceMask =-1L ^ (-1L << sequenceBits);                          // 用于和当前时间戳做比较,以获取最新时间
    private long lastTimestamp = -1L;

    //成员类,SnowFlakeUtil的实例对象的保存域
    private static class IdGenHolder {
        private static final SnowFlakeUtil instance = new SnowFlakeUtil();}

    //外部调用获取SnowFlakeUtil的实例对象,确保不可变
    public static SnowFlakeUtil get(){return IdGenHolder.instance;}

    //初始化构造,无参构造有参函数,默认节点都是0
    public SnowFlakeUtil(){
        this(0L, 0L);}

    //设置机器节点和数据中心节点数,都是 0-31
    public SnowFlakeUtil(long workerId, long datacenterId){if(workerId > maxWorkerId || workerId <0){
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));}if(datacenterId > maxDatacenterId || datacenterId <0){
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));}
        this.workerId = workerId;
        this.datacenterId = datacenterId;}

    //线程安全的id生成方法
    @SuppressWarnings("all")
    public synchronized long nextId(){
        //获取当前毫秒数
        long timestamp = timeGen();
        //如果服务器时间有问题(时钟后退) 报错。
        if(timestamp < lastTimestamp){
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}
        //如果上次生成时间和当前时间相同,在同一毫秒内
        if(lastTimestamp == timestamp){
            //sequence自增,因为sequence只有12bit,所以和sequenceMask相与一下,去掉高位
            sequence =(sequence + 1)& sequenceMask;
            //判断是否溢出,也就是每毫秒内超过4095,当为4096时,与sequenceMask相与,sequence就等于0
            if(sequence ==0){
                //自旋等待到下一毫秒
                timestamp = tilNextMillis(lastTimestamp);}}else{
            //如果和上次生成时间不同,重置sequence,就是下一毫秒开始,sequence计数重新从0开始累加,每个毫秒时间内,都是从0开始计数,最大4095
            sequence = 0L;}
        lastTimestamp = timestamp;
        // 最后按照规则拼出ID 64位
        // 000000000000000000000000000000000000000000  00000            00000       000000000000
        //1位固定整数   time                                       datacenterId   workerId    sequence
        return((timestamp - twepoch)<< timestampLeftShift)|(datacenterId << datacenterIdShift)|(workerId << workerIdShift)| sequence;}

    //比较当前时间和过去时间,防止时钟回退(机器问题),保证给的都是最新时间/最大时间
    protected long tilNextMillis(long lastTimestamp){
        long timestamp = timeGen();while(timestamp <= lastTimestamp){
            timestamp = timeGen();}return timestamp;}

    //获取当前的时间戳(毫秒)
    protected long timeGen(){return System.currentTimeMillis();}

    /**
     * 获取全局唯一编码
     */
    public static String getId(){
        long id= SnowFlakeUtil.get().nextId();return Long.toString(id);}}

第二个工具类为StringUtil

package com.linzhuo.app.util;import lombok.SneakyThrows;import java.io.ByteArrayOutputStream;import java.io.InputStream;

/**
 * <Description> ******
 *
 * @author jss
 * @version 1.0
 * @date 2022/04/17
 */
public class StringUtil {

    @SneakyThrows
    public static String readToStreamToString(InputStream is){
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        byte[] buffer = new byte[2048];
        int bytesRead;
        /*
        这一步可能会卡住,因为客户端如果传输完数据以后,
        如果没有调用socket.shutdownOutput()方法,会导致服务端不知道流是否已传输完毕,
        等待我们之前设置的10S流读取时间后,连接就会被自动关掉
        如果出现这种情况,服务端可以通过其它方式判断。例如换行符或者特殊字符等,只需要在
        while条件中加一个&&判断即可.例如我这里的业务结束标记是字符: ">",那么判断逻辑如下
        若客户端调用了shutdownOutput()方法,则不需要这个判断
        */
        while(!bos.toString().contains(">")&&(bytesRead = is.read(buffer))!= -1){
            bos.write(buffer, 0, bytesRead);}return bos.toString();}}

7,最后测试一下,用Main方法或者另外一个Springboot项目

public static void main(String[] args) throws Exception {
        requestInfoToSocketServer();}

    private static void requestInfoToSocketServer(){
        try (Socket socket = new Socket("127.0.0.1", 8503);
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true)){
//            Object phone ="15210161743";
            out.write("15210161743");
            out.flush();
            //记得调用shutdownOutput()方法,否则服务端的流读取会一直等待
            socket.shutdownOutput();
            //开始接收服务端的消息
            BufferedReader in= new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println("接收到服务端的回复:" + in.readLine());} catch (Exception e){
            System.out.println("Socket传输数据异常!" + e.getMessage());}}

out.flush();这里需要手动刷新下,才会发送到服务器端,自动刷新不生效 需要手动刷新
自己测试了可以使用

标签: spring boot java 后端

本文转载自: https://blog.csdn.net/qq_36189997/article/details/131916216
版权归原作者 shan~~ 所有, 如有侵权,请联系我们删除。

“springboot集成socket服务”的评论:

还没有评论