0


springboot工程中使用tcp协议

文章目录

一、概述

在上文JAVA TCP协议初体验 中,我们使用java实现了tcp协议的一个雏形,实际中大部分项目都已采用springboot,那么,怎么在springboot中整合tcp协议呢?如何实现服务器controller通过tcp协议下发命令到tcp client执行,并且在controller中获取执行结果?

二、实现思路

为了方便演示,本文我们将TcpClient、TcpServer放在同一工程,具体做法为:

  1. 拷贝之前文章的TcpClient、TcpServer代码
  2. 提取服务器、客户端启动代码,使用@PostConstruct注解修饰
  3. 开发controller,定义客户端发送消息、服务器发送消息

三、代码结构

在这里插入图片描述

四、代码分析

1. 服务端发送命令

参考DemoController L67-68,这边通过封装命令对象,转换为字符串,发送到客户端。

#mermaid-svg-HDmI19qWZzGqUKfs {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .error-icon{fill:#552222;}#mermaid-svg-HDmI19qWZzGqUKfs .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-HDmI19qWZzGqUKfs .marker{fill:#333333;stroke:#333333;}#mermaid-svg-HDmI19qWZzGqUKfs .marker.cross{stroke:#333333;}#mermaid-svg-HDmI19qWZzGqUKfs svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-HDmI19qWZzGqUKfs .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster-label text{fill:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster-label span{color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .label text,#mermaid-svg-HDmI19qWZzGqUKfs span{fill:#333;color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .node rect,#mermaid-svg-HDmI19qWZzGqUKfs .node circle,#mermaid-svg-HDmI19qWZzGqUKfs .node ellipse,#mermaid-svg-HDmI19qWZzGqUKfs .node polygon,#mermaid-svg-HDmI19qWZzGqUKfs .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-HDmI19qWZzGqUKfs .node .label{text-align:center;}#mermaid-svg-HDmI19qWZzGqUKfs .node.clickable{cursor:pointer;}#mermaid-svg-HDmI19qWZzGqUKfs .arrowheadPath{fill:#333333;}#mermaid-svg-HDmI19qWZzGqUKfs .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-HDmI19qWZzGqUKfs .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-HDmI19qWZzGqUKfs .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-HDmI19qWZzGqUKfs .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster text{fill:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster span{color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-HDmI19qWZzGqUKfs :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

       封装命令 
     

       转换为字符串 
     

       发送到客户端 
     
Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
 initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));

2. 客户端处理

if(msg.contains("commandId")){Command cmd =JsonBeanUtils.jsonToBean(msg,Command.class);sendMsg("hello Command!"+JsonBeanUtils.beanToJson(cmd));}

3. 结果回调

这边使用了观察者模式

#mermaid-svg-SkebDWNVex4QzLIm {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SkebDWNVex4QzLIm .error-icon{fill:#552222;}#mermaid-svg-SkebDWNVex4QzLIm .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-SkebDWNVex4QzLIm .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-SkebDWNVex4QzLIm .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-SkebDWNVex4QzLIm .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-SkebDWNVex4QzLIm .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-SkebDWNVex4QzLIm .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-SkebDWNVex4QzLIm .marker{fill:#333333;stroke:#333333;}#mermaid-svg-SkebDWNVex4QzLIm .marker.cross{stroke:#333333;}#mermaid-svg-SkebDWNVex4QzLIm svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-SkebDWNVex4QzLIm .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-SkebDWNVex4QzLIm .cluster-label text{fill:#333;}#mermaid-svg-SkebDWNVex4QzLIm .cluster-label span{color:#333;}#mermaid-svg-SkebDWNVex4QzLIm .label text,#mermaid-svg-SkebDWNVex4QzLIm span{fill:#333;color:#333;}#mermaid-svg-SkebDWNVex4QzLIm .node rect,#mermaid-svg-SkebDWNVex4QzLIm .node circle,#mermaid-svg-SkebDWNVex4QzLIm .node ellipse,#mermaid-svg-SkebDWNVex4QzLIm .node polygon,#mermaid-svg-SkebDWNVex4QzLIm .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-SkebDWNVex4QzLIm .node .label{text-align:center;}#mermaid-svg-SkebDWNVex4QzLIm .node.clickable{cursor:pointer;}#mermaid-svg-SkebDWNVex4QzLIm .arrowheadPath{fill:#333333;}#mermaid-svg-SkebDWNVex4QzLIm .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-SkebDWNVex4QzLIm .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-SkebDWNVex4QzLIm .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-SkebDWNVex4QzLIm .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-SkebDWNVex4QzLIm .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-SkebDWNVex4QzLIm .cluster text{fill:#333;}#mermaid-svg-SkebDWNVex4QzLIm .cluster span{color:#333;}#mermaid-svg-SkebDWNVex4QzLIm div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-SkebDWNVex4QzLIm :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

       定义可观察对象 
     

       添加观察者 
     

       观察者业务处理 
     

定义可观察对象

classNewClientextendsObservable

添加观察者

// 注册观察者addObserver(SpringContextUtils.getBean(ResultCallBack.class));

观察者业务处理

/**
 * 结果回调处理
 */@Slf4j@ServicepublicclassResultCallBackimplementsObserver{Map<String,String> result =newConcurrentHashMap<>();ExecutorService executorService =Executors.newFixedThreadPool(10);/**
     * 观察者接受数据处理
     */@Overridepublicvoidupdate(Observable observable,Object msg){
        log.info("### accept #### {}", msg);try{if(msg instanceofString){String json =StringUtils.substringAfter((String)msg,"hello Command!");Command command =JsonBeanUtils.jsonToBean(json,Command.class);
                log.info("{}", command);
                result.put(command.getCommandId(), json);}}catch(IOException e){
            log.error(e.getMessage());}}
    。。。

4. 接口获取处理结果

DemoController 这里使用了3种不同的方式处理,大家可以比较处理的异同

/**
     * 客户端业务处理结果更多是通过回调实现的
     * 
     * @param cmd
     * @return
     * @throws IOException
     */@ApiOperation("server发消息Callable")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/callable")publicCallable<JsonResult<?>>sendMsgFromServer01(String msg)throwsIOException{return()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
            initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <50; i++){
                result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){returnJsonResult.success(result,"获取处理结果成功");}else{TimeUnit.MILLISECONDS.sleep(100);}}returnJsonResult.error("响应超时,请重试");};}/**
     * 客户端业务处理结果更多是通过回调实现的(WebAsyncTask升级版callable,增加超时异常等处理)
     * 
     * @param cmd
     * @return
     * @throws IOException
     */@ApiOperation("server发消息webAsyncTask")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/webAsyncTask")publicWebAsyncTask<String>sendMsgFromServer02(String msg)throwsIOException{WebAsyncTask<String> webAsyncTask =newWebAsyncTask<String>(2000L, executor,()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
            initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <100; i++){
                result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){return result;}else{TimeUnit.MILLISECONDS.sleep(50);}}return"响应超时,请重试";});
        webAsyncTask.onCompletion(()-> log.info("调用完成"));
        webAsyncTask.onError(()->{
            log.error("业务处理出错");return"error";});
        webAsyncTask.onTimeout(()->{
            log.info("业务处理超时");return"Time Out";});return webAsyncTask;}/**
     * 客户端业务处理结果更多是通过回调实现的
     * 
     * @param cmd
     * @return
     * @throws IOException
     * @throws TimeoutException
     * @throws ExecutionException
     * @throws InterruptedException
     */@ApiOperation("server发消息")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg")publicDeferredResult<String>sendMsgFromServer(String msg)throwsIOException,InterruptedException,ExecutionException,TimeoutException{String clientName = initManager.getClient().getClientName();Command command =newCommand().setClientId(clientName).setText(msg);
        initManager.getServer().sendMsg(clientName,JsonBeanUtils.beanToJson(command));// 异步返回结果DeferredResult<String> deferredResult =newDeferredResult<>(20000L,"失败");
        deferredResult.onCompletion(()-> log.info("调用完成"));
        deferredResult.onTimeout(()->{
            log.info("调用超时");
            deferredResult.setResult("调用超时");});
        resultCallBack.processResult(deferredResult, command.getCommandId());return deferredResult;}

五、代码放送

https://gitcode.com/00fly/tcp-show/tree/main/springboot-tcp

**

或者使用下面的备份文件恢复成原始的项目代码

**

如何恢复,请移步查阅:神奇代码恢复工具

//goto docker\docker-compose.yml
version:'3.7'
services:
  springboot-tcp:
    image: registry.cn-shanghai.aliyuncs.com/00fly/springboot-tcp:1.0.0
    container_name: springboot-tcp
    deploy:
      resources:
        limits:
          cpus:'1.0'
          memory:200M
        reservations:
          cpus:'0.05'
          memory:200M
    ports:-8080:8081
    restart: on-failure
    logging:
      driver: json-file
      options:
        max-size:'5m'
        max-file:'1'//goto docker\restart.sh
#!/bin/bash
docker-compose down && docker-compose up -d && docker stats
//goto docker\stop.sh
#!/bin/bash
docker-compose down
//goto docker\wait-for.sh
#!/bin/sh

TIMEOUT=15
QUIET=0echoerr(){if["$QUIET"-ne 1]; then printf "%s\n""$*"1>&2; fi
}usage(){
  exitcode="$1"
  cat << USAGE >&2Usage:
  $cmdname host:port [-t timeout][-- command args]-q |--quiet                        Do not output any status messages
  -t TIMEOUT |--timeout=timeout      Timeout in seconds, zero for no timeout
  -- COMMAND ARGS                     Execute command withargs after the test finishes
USAGE
  exit "$exitcode"}wait_for(){for i in `seq $TIMEOUT` ;do
    nc -z "$HOST""$PORT">/dev/null2>&1

    result=$?if[ $result -eq 0]; then
      if[ $# -gt 0]; then
        exec "$@"
      fi
      exit 0
    fi
    sleep 1
  done
  echo "Operation timed out">&2
  exit 1}while[ $# -gt 0]docase"$1" in
    *:*)
    HOST=$(printf "%s\n""$1"| cut -d :-f 1)
    PORT=$(printf "%s\n""$1"| cut -d :-f 2)
    shift 1;;-q |--quiet)
    QUIET=1
    shift 1;;-t)
    TIMEOUT="$2"if["$TIMEOUT"=""]; then break; fi
    shift 2;;--timeout=*)
    TIMEOUT="${1#*=}"
    shift 1;;--)
    shift
    break;;--help)
    usage 0;;*)
    echoerr "Unknown argument: $1"
    usage 1;;
  esac
done

if["$HOST"=""-o "$PORT"=""]; then
  echoerr "Error: you need to provide a host and port to test."
  usage 2
fi

wait_for "$@"//goto Dockerfile
#基础镜像
FROM adoptopenjdk/openjdk8-openj9:alpine-slim

COPY docker/wait-for.sh /
RUN chmod +x /wait-for.sh && \
    ln -sf /usr/share/zoneinfo/Asia/Shanghai/etc/localtime && \
    echo 'Asia/Shanghai'>/etc/timezone

#引入运行包
COPY target/*.jar /app.jar

#指定交互端口
EXPOSE 8081

CMD ["--server.port=8081"]

#项目的启动方式
ENTRYPOINT ["java", "-Djava.security.egd=file:/dev/./urandom", "-Xshareclasses", "-Xquickstart", "-jar", "/app.jar"]
//goto pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.fly</groupId>
    <artifactId>springboot-tcp</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub>
        <java.version>1.8</java.version>
        <skipTests>true</skipTests>
    </properties>

    <dependencies>
        <!-- Compile -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>knife4j-spring-boot-starter</artifactId>
            <version>2.0.8</version>
        </dependency>

        <!-- 异步日志,需要加入disruptor依赖 -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <!-- Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <!-- 阿里云maven仓库 -->
    <repositories>
        <repository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>https://maven.aliyun.com/repository/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>https://maven.aliyun.com/repository/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>

            <!-- 添加docker-maven插件 -->
            <plugin>
                <groupId>io.fabric8</groupId>
                <artifactId>docker-maven-plugin</artifactId>
                <version>0.40.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>build</goal>
                            <!--<goal>push</goal>-->
                            <!--<goal>remove</goal>-->
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <!-- 连接到带docker环境的linux服务器编译image -->
                    <!-- <dockerHost>http://192.168.182.10:2375</dockerHost> -->

                    <!-- Docker 推送镜像仓库地址 -->
                    <pushRegistry>${docker.hub}</pushRegistry>
                    <images>
                        <image>
                            <name>
                                ${docker.hub}/00fly/${project.artifactId}:${project.version}</name>
                            <build>
                                <dockerFileDir>${project.basedir}</dockerFileDir>
                            </build>
                        </image>
                    </images>
                </configuration>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <excludes>
                    <exclude>**/*.java</exclude></excludes></resource><resource><directory>src/main/resources</directory><includes><include>**/**</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>
</project>
//goto src\main\java\com\fly\BootApplication.java
package com.fly;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BootApplication
{
    public static void main(String[] args)
    {
        SpringApplication.run(BootApplication.class, args);
    }
}
//goto src\main\java\com\fly\core\config\AsyncThreadPoolConfig.java
package com.fly.core.config;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import lombok.extern.slf4j.Slf4j;

/**
 * 
 * 异步线程池配置
 * 
 * @author 00fly
 * @version [版本号, 2023年10月22日]
 * @see [相关类/方法]
 * @since [产品/模块版本]
 */@Slf4j@Configuration@EnableAsyncpublicclassAsyncThreadPoolConfigimplementsAsyncConfigurer{@BeanThreadPoolTaskExecutortaskExecutor(){int processors =Runtime.getRuntime().availableProcessors();ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(Math.max(processors,5));
        executor.setMaxPoolSize(Math.max(processors,5)*2);
        executor.setQueueCapacity(10000);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("asyncTask-");// ThreadPoolExecutor类有几个内部实现类来处理这类情况:// AbortPolicy 丢弃任务,抛运行时异常// CallerRunsPolicy 执行任务// DiscardPolicy 忽视,什么都不会发生// DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
        executor.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardPolicy());return executor;}@OverridepublicExecutorgetAsyncExecutor(){returntaskExecutor();}@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex, method, params)->{
            log.info("Exception message - {}", ex.getMessage());
            log.info("Method name - {}", method.getName());for(Object param : params){
                log.info("Parameter value - {}", param);}};}}//goto src\main\java\com\fly\core\config\Knife4jConfig.javapackagecom.fly.core.config;importjava.util.Collections;importjava.util.List;importorg.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Import;importcom.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j;importio.swagger.annotations.ApiOperation;importspringfox.bean.validators.configuration.BeanValidatorPluginsConfiguration;importspringfox.documentation.builders.ApiInfoBuilder;importspringfox.documentation.builders.PathSelectors;importspringfox.documentation.builders.RequestHandlerSelectors;importspringfox.documentation.service.ApiInfo;importspringfox.documentation.service.ApiKey;importspringfox.documentation.spi.DocumentationType;importspringfox.documentation.spring.web.plugins.Docket;importspringfox.documentation.swagger2.annotations.EnableSwagger2WebMvc;/**
 * Knife4jConfig
 *
 */@EnableKnife4j@Configuration@EnableSwagger2WebMvc@ConditionalOnWebApplication@Import(BeanValidatorPluginsConfiguration.class)publicclassKnife4jConfig{/**
     * 开发、测试环境接口文档打开
     * 
     * @return
     * @see [类、类#方法、类#成员]
     */@BeanDocketcreateRestApi(){returnnewDocket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).enable(true).select().apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class)).paths(PathSelectors.any())// 包下的类,生成接口文档.build().securitySchemes(security());}privateApiInfoapiInfo(){returnnewApiInfoBuilder().title("数据接口API").description("接口文档").termsOfServiceUrl("http://00fly.online/").version("1.0.0").build();}privateList<ApiKey>security(){returnCollections.singletonList(newApiKey("token","token","header"));}}//goto src\main\java\com\fly\core\config\WebClientConfig.javapackagecom.fly.core.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.web.reactive.function.client.WebClient;/**
 * 配置WebClient
 */@ConfigurationpublicclassWebClientConfig{@BeanWebClientwebClient(){returnWebClient.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build();}}//goto src\main\java\com\fly\core\config\WebMvcConfig.javapackagecom.fly.core.config;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;importorg.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.http.MediaType;importorg.springframework.http.converter.HttpMessageConverter;importorg.springframework.http.converter.StringHttpMessageConverter;importorg.springframework.http.converter.json.MappingJackson2HttpMessageConverter;importorg.springframework.web.servlet.config.annotation.ContentNegotiationConfigurer;importorg.springframework.web.servlet.config.annotation.ViewControllerRegistry;importorg.springframework.web.servlet.config.annotation.WebMvcConfigurer;/**
 * 
 * mvc配置
 * 
 * @author 00fly
 * @version [版本号, 2021年4月23日]
 * @see [相关类/方法]
 * @since [产品/模块版本]
 */@Configuration@ConditionalOnWebApplicationpublicclassWebMvcConfigimplementsWebMvcConfigurer{@OverridepublicvoidconfigureMessageConverters(finalList<HttpMessageConverter<?>> converters){
        converters.add(stringHttpMessageConverter());
        converters.add(mappingJackson2HttpMessageConverter());}@OverridepublicvoidconfigureContentNegotiation(finalContentNegotiationConfigurer configurer){
        configurer.defaultContentType(MediaType.APPLICATION_JSON);
        configurer.ignoreUnknownPathExtensions(false);
        configurer.favorPathExtension(true);
        configurer.favorParameter(false);finalMap<String,MediaType> mediaTypes =newConcurrentHashMap<>(3);
        mediaTypes.put("atom",MediaType.APPLICATION_ATOM_XML);
        mediaTypes.put("html",MediaType.TEXT_HTML);
        mediaTypes.put("json",MediaType.APPLICATION_JSON);
        configurer.mediaTypes(mediaTypes);}@BeanStringHttpMessageConverterstringHttpMessageConverter(){returnnewStringHttpMessageConverter();}@BeanMappingJackson2HttpMessageConvertermappingJackson2HttpMessageConverter(){finalMappingJackson2HttpMessageConverter messageConverter =newMappingJackson2HttpMessageConverter();finalList<MediaType> list =newArrayList<>();
        list.add(MediaType.APPLICATION_JSON);
        list.add(MediaType.APPLICATION_XML);
        list.add(MediaType.TEXT_PLAIN);
        list.add(MediaType.TEXT_HTML);
        list.add(MediaType.TEXT_XML);
        messageConverter.setSupportedMediaTypes(list);return messageConverter;}/**
     * 等价于mvc中<mvc:view-controller path="/" view-name="redirect:index" /><br>
     * 等价于mvc中<mvc:view-controller path="/index" view-name="index.html" />
     * 
     * @param registry
     */@OverridepublicvoidaddViewControllers(finalViewControllerRegistry registry){
        registry.addViewController("/").setViewName("redirect:index");
        registry.addViewController("/index").setViewName("index.html");}}//goto src\main\java\com\fly\core\exception\GlobalExceptionHandler.javapackagecom.fly.core.exception;importjava.util.ArrayList;importjava.util.Collections;importjava.util.List;importjava.util.stream.Collectors;importorg.apache.commons.lang3.StringUtils;importorg.springframework.validation.BindException;importorg.springframework.validation.BindingResult;importorg.springframework.web.bind.MethodArgumentNotValidException;importorg.springframework.web.bind.annotation.ExceptionHandler;importorg.springframework.web.bind.annotation.RestControllerAdvice;importcom.fly.core.JsonResult;importlombok.extern.slf4j.Slf4j;/**
 * 统一异常处理器
 * 
 * @author 00fly
 * @version [版本号, 2018-09-11]
 * @see [相关类/方法]
 * @since [产品/模块版本]
 */@Slf4j@RestControllerAdvicepublicclassGlobalExceptionHandler{@ExceptionHandler(Exception.class)publicJsonResult<?>handleBadRequest(Exception exception){// JSR303参数校验异常if(exception instanceofBindException){BindingResult bindingResult =((BindException)exception).getBindingResult();if(null!= bindingResult && bindingResult.hasErrors()){List<String> errMsg =newArrayList<>();
                bindingResult.getFieldErrors().stream().forEach(fieldError ->{
                    errMsg.add(fieldError.getDefaultMessage());});Collections.sort(errMsg);returnJsonResult.error(StringUtils.join(errMsg,","));}}if(exception instanceofMethodArgumentNotValidException){BindingResult bindingResult =((MethodArgumentNotValidException)exception).getBindingResult();if(null!= bindingResult && bindingResult.hasErrors()){// stream写法优化returnJsonResult.error(bindingResult.getFieldErrors().stream().map(e -> e.getDefaultMessage()).sorted().collect(Collectors.joining(",")));}}// 其余情况
        log.error("Error: handleBadRequest StackTrace : {}", exception);returnJsonResult.error(StringUtils.defaultString(exception.getMessage(),"系统异常,请联系管理员"));}}//goto src\main\java\com\fly\core\JsonResult.javapackagecom.fly.core;importio.swagger.annotations.ApiModel;importio.swagger.annotations.ApiModelProperty;importlombok.Data;importlombok.experimental.Accessors;/**
 * 
 * 结果对象
 * 
 * @author 00fly
 * @version [版本号, 2021年5月2日]
 * @see [相关类/方法]
 * @since [产品/模块版本]
 */@Data@Accessors(chain =true)@ApiModel(description ="Json格式消息体")publicclassJsonResult<T>{@ApiModelProperty(value ="数据对象")privateT data;@ApiModelProperty(value ="是否成功", required =true, example ="true")privateboolean success;@ApiModelProperty(value ="错误码")privateString errorCode;@ApiModelProperty(value ="提示信息")privateString message;publicJsonResult(){super();}publicstatic<T>JsonResult<T>success(T data){JsonResult<T> r =newJsonResult<>();
        r.setData(data);
        r.setSuccess(true);return r;}publicstatic<T>JsonResult<T>success(T data,String msg){JsonResult<T> r =newJsonResult<>();
        r.setData(data);
        r.setMessage(msg);
        r.setSuccess(true);return r;}publicstaticJsonResult<?>success(){JsonResult<Object> r =newJsonResult<>();
        r.setSuccess(true);return r;}publicstaticJsonResult<Object>error(String code,String msg){JsonResult<Object> r =newJsonResult<>();
        r.setSuccess(false);
        r.setErrorCode(code);
        r.setMessage(msg);return r;}publicstaticJsonResult<Object>error(String msg){returnerror("500", msg);}}//goto src\main\java\com\fly\core\runner\WebStartedRunner.javapackagecom.fly.core.runner;importorg.apache.commons.lang3.StringUtils;importorg.apache.commons.lang3.SystemUtils;importorg.springframework.boot.CommandLineRunner;importorg.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.stereotype.Component;importcom.fly.core.utils.SpringContextUtils;@Component@Configuration@ConditionalOnWebApplicationpublicclassWebStartedRunner{@Bean@ConditionalOnWebApplicationCommandLineRunnerinit(){return args ->{if(SystemUtils.IS_OS_WINDOWS)// 防止非windows系统报错,启动失败{String url =SpringContextUtils.getServerBaseURL();if(StringUtils.containsNone(url,"-"))// junit port:-1{Runtime.getRuntime().exec("cmd /c start /min "+ url);Runtime.getRuntime().exec("cmd /c start /min "+ url +"/doc.html");}}};}}//goto src\main\java\com\fly\core\utils\JsonBeanUtils.javapackagecom.fly.core.utils;importjava.io.IOException;importcom.fasterxml.jackson.core.type.TypeReference;importcom.fasterxml.jackson.databind.DeserializationFeature;importcom.fasterxml.jackson.databind.JavaType;importcom.fasterxml.jackson.databind.ObjectMapper;/**
 * JsonBean转换工具
 * 
 * @author 00fly
 *
 */publicclassJsonBeanUtils{privatestaticObjectMapper objectMapper =newObjectMapper();/**
     * bean转json字符串
     * 
     * @param bean
     * @return
     * @throws IOException
     */publicstaticStringbeanToJson(Object bean)throwsIOException{returnbeanToJson(bean,false);}/**
     * bean转json字符串
     * 
     * @param bean
     * @param pretty 是否格式美化
     * @return
     * @throws IOException
     */publicstaticStringbeanToJson(Object bean,boolean pretty)throwsIOException{String jsonText = objectMapper.writeValueAsString(bean);if(pretty){return objectMapper.readTree(jsonText).toPrettyString();}return objectMapper.readTree(jsonText).toString();}/**
     * json字符串转bean
     * 
     * @param jsonText
     * @return
     * @throws IOException
     */publicstatic<T>TjsonToBean(String jsonText,Class<T> clazz)throwsIOException{
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,true);return objectMapper.readValue(jsonText, clazz);}/**
     * json字符串转bean
     * 
     * @param jsonText
     * @param clazz
     * @param ingoreError 是否忽略无法识别字段
     * @return
     * @throws IOException
     */publicstatic<T>TjsonToBean(String jsonText,Class<T> clazz,boolean ingoreError)throwsIOException{
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,!ingoreError);return objectMapper.readValue(jsonText, clazz);}/**
     * json字符串转bean
     * 
     * @param jsonText
     * @return
     * @throws IOException
     */publicstatic<T>TjsonToBean(String jsonText,JavaType javaType)throwsIOException{return objectMapper.readValue(jsonText, javaType);}/**
     * json字符串转bean
     * 
     * @param jsonText
     * @return
     * @throws IOException
     */publicstatic<T>TjsonToBean(String jsonText,TypeReference<T> typeRef)throwsIOException{return objectMapper.readValue(jsonText, typeRef);}}//goto src\main\java\com\fly\core\utils\SpringContextUtils.javapackagecom.fly.core.utils;importjava.net.InetAddress;importjava.net.UnknownHostException;importjavax.servlet.ServletContext;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.BeansException;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.stereotype.Component;importorg.springframework.util.Assert;importlombok.extern.slf4j.Slf4j;/**
 * Spring Context 工具类
 */@Slf4j@ComponentpublicclassSpringContextUtilsimplementsApplicationContextAware{privatestaticApplicationContext applicationContext;privatestaticString SERVER_BASE_URL =null;@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{
        log.info("###### execute setApplicationContext ######");SpringContextUtils.applicationContext = applicationContext;}publicstaticApplicationContextgetApplicationContext(){return applicationContext;}publicstatic<T>TgetBean(Class<T> clazz){Assert.notNull(applicationContext,"applicationContext is null");return applicationContext.getBean(clazz);}/**
     * execute @PostConstruct May be SpringContextUtils not inited, throw NullPointerException
     * 
     * @return
     */publicstaticStringgetActiveProfile(){Assert.notNull(applicationContext,"applicationContext is null");String[] profiles = applicationContext.getEnvironment().getActiveProfiles();returnStringUtils.join(profiles,",");}/**
     * can use in @PostConstruct
     * 
     * @param context
     * @return
     */publicstaticStringgetActiveProfile(ApplicationContext context){Assert.notNull(context,"context is null");String[] profiles = context.getEnvironment().getActiveProfiles();returnStringUtils.join(profiles,",");}/**
     * get web服务基准地址,一般为 http://${ip}:${port}/${contentPath}
     * 
     * @return
     * @throws UnknownHostException
     * @see [类、类#方法、类#成员]
     */publicstaticStringgetServerBaseURL()throwsUnknownHostException{ServletContext servletContext =getBean(ServletContext.class);Assert.notNull(servletContext,"servletContext is null");if(SERVER_BASE_URL ==null){String ip =InetAddress.getLocalHost().getHostAddress();
            SERVER_BASE_URL ="http://"+ ip +":"+getProperty("server.port")+ servletContext.getContextPath();}return SERVER_BASE_URL;}/**
     * getProperty
     * 
     * @param key eg:server.port
     * @return
     * @see [类、类#方法、类#成员]
     */publicstaticStringgetProperty(String key){return applicationContext.getEnvironment().getProperty(key,"");}}//goto src\main\java\com\fly\tcp\base\InitManager.javapackagecom.fly.tcp.base;importjavax.annotation.PostConstruct;importorg.apache.commons.lang3.StringUtils;importorg.springframework.stereotype.Service;/**
 * 初始化管理类
 */@ServicepublicclassInitManager{TcpServer server =newTcpServer();TcpClient client =newTcpClient("CLIENT_1");publicTcpServergetServer(){return server;}publicTcpClientgetClient(){return client;}/**
     * 启动TcpServer、TcpClient
     */@PostConstructprivatevoidinit(){if(server.startServer("0.0.0.0",8000)){newThread(server).start();}// docker环境下优先使用docker-compose中environment值String serverIp =StringUtils.defaultIfBlank(System.getenv().get("TCP_SERVER"),"127.0.0.1");if(client.connectServer(serverIp,8000)){newThread(client).start();}}}//goto src\main\java\com\fly\tcp\base\TcpClient.javapackagecom.fly.tcp.base;importjava.io.DataInputStream;importjava.io.DataOutputStream;importjava.io.IOException;importjava.net.InetAddress;importjava.net.Socket;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;importcom.fly.core.utils.JsonBeanUtils;importcom.fly.tcp.entity.Command;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassTcpClientimplementsRunnable{privateString ip;privateint port;privateSocket socket;privateDataOutputStream dataOutputStream;privateString clientName;privateboolean isClientCoreRun =false;privateScheduledExecutorService scheduler =Executors.newScheduledThreadPool(2);privateExecutorService executor =Executors.newFixedThreadPool(2);publicTcpClient(String clientName){super();this.clientName = clientName;}publicStringgetClientName(){return clientName;}/**
     * 
     * @param ip 服务端IP
     * @param port 服务端PORT
     * @return
     */publicbooleanconnectServer(String ip,int port){try{this.ip = ip;this.port = port;
            socket =newSocket(InetAddress.getByName(ip), port);
            log.info("****** TcpClient will connect to Server {}:{}", ip, port);
            scheduler.scheduleAtFixedRate(this::checkConnection,0,10,TimeUnit.SECONDS);
            isClientCoreRun =true;
            dataOutputStream =newDataOutputStream(socket.getOutputStream());
            dataOutputStream.writeUTF(clientName);
            dataOutputStream.flush();}catch(IOException e){
            log.error(e.getMessage());
            isClientCoreRun =false;}return isClientCoreRun;}/**
     * 检查TCP连接
     */privatevoidcheckConnection(){if(socket ==null|| socket.isClosed()){
            log.error("Connection lost, attempting to reconnect");reconnect();}}privatevoidreconnect(){try{
            socket =newSocket(InetAddress.getByName(ip), port);
            log.info("****** TcpClient will connect to Server {}:{}", ip, port);
            isClientCoreRun =true;
            executor.execute(newReceiveMsg());
            dataOutputStream =newDataOutputStream(socket.getOutputStream());
            dataOutputStream.writeUTF(clientName);
            dataOutputStream.flush();}catch(IOException e){
            log.error(e.getMessage());
            isClientCoreRun =false;}}/**
     * 发送报文
     */publicvoidsendMsg(String msg){try{
            dataOutputStream.writeUTF(msg);
            dataOutputStream.flush();}catch(IOException e){
            log.error(e.getMessage());closeClientConnect();}}/**
     * 断开客户端与服务端的连接
     */publicvoidcloseClientConnect(){if(dataOutputStream !=null){try{
                dataOutputStream.close();
                isClientCoreRun =false;if(socket !=null){
                    socket.close();}}catch(IOException e){
                log.error(e.getMessage());}}}@Overridepublicvoidrun(){
        executor.execute(newReceiveMsg());}classReceiveMsgimplementsRunnable{privateDataInputStream dataInputStream;publicReceiveMsg(){try{// 数据输入流
                dataInputStream =newDataInputStream(socket.getInputStream());}catch(IOException e){
                log.error(e.getMessage());}}@Overridepublicvoidrun(){try{// server停止后, 会影响接受消息线程工作while(isClientCoreRun){String msg = dataInputStream.readUTF();
                    log.info("{} get msg: {}", clientName, msg);if(msg.contains("commandId")){Command cmd =JsonBeanUtils.jsonToBean(msg,Command.class);sendMsg("hello Command!"+JsonBeanUtils.beanToJson(cmd));}}}catch(IOException e){
                log.error(e.getMessage());// 防止重连失败closeClientConnect();}}}}//goto src\main\java\com\fly\tcp\base\TcpServer.javapackagecom.fly.tcp.base;importjava.io.DataInputStream;importjava.io.DataOutputStream;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.net.ServerSocket;importjava.net.Socket;importjava.util.HashMap;importjava.util.Map;importjava.util.Observable;importorg.apache.commons.lang3.StringUtils;importorg.apache.commons.lang3.time.DateFormatUtils;importcom.fly.core.utils.SpringContextUtils;importcom.fly.tcp.service.ResultCallBack;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassTcpServerimplementsRunnable{privateServerSocket serverSocket;privateboolean isServerCoreRun =false;privateMap<String,NewClient> allClient =newHashMap<>();publicbooleanstartServer(String ip,int port){try{
            serverSocket =newServerSocket();
            serverSocket.bind(newInetSocketAddress(ip, port));
            isServerCoreRun =true;}catch(IOException e){
            log.error(e.getMessage());
            isServerCoreRun =false;}return isServerCoreRun;}/**
     * 关闭服务
     *
     * #1 断开与所有客户端的连接,并将客户端容器中的所有已连接的客户端清空。 #2 关闭服务器套接字
     */publicvoidcloseServer(){try{
            isServerCoreRun =false;for(Map.Entry<String,NewClient> all :this.allClient.entrySet()){
                all.getValue().isNewClientRun =false;
                all.getValue().socket.close();}
            allClient.clear();
            serverSocket.close();}catch(IOException e){
            log.error(e.getMessage());}}/**
     * 向客户端发送报文
     */publicvoidsendMsg(String clientName,String msg){if(allClient.containsKey(clientName)){
            allClient.get(clientName).sendMsg(msg);}}@Overridepublicvoidrun(){try{
            log.info("TcpServer will start");while(isServerCoreRun){// 阻塞式等待客户端连接Socket socket = serverSocket.accept();String clientName =newDataInputStream(socket.getInputStream()).readUTF();String clientIP = socket.getInetAddress().getHostAddress();int clientPort = socket.getPort();String clientConnectDateTime =DateFormatUtils.format(System.currentTimeMillis(),"yyyy-MM-dd HH:mm:ss");NewClient newClient =newNewClient(socket, clientName, clientIP, clientPort, clientConnectDateTime);
                allClient.put(clientName, newClient);
                log.info("**** add new client ===> {}", allClient.keySet());newThread(newClient).start();}}catch(IOException e){
            log.error(e.getMessage());}}/**
     * 客户端线程(被观察者)
     */classNewClientextendsObservableimplementsRunnable{// 客户端套接字privateSocket socket;// 数据输入流privateDataInputStream dataInputStream;// 数据输出流privateDataOutputStream dataOutputStream;// 客户端运行(收、发报文)状态privateboolean isNewClientRun =true;// 客户端的名称privateString clientName;// 客户端的IP地址privateString clientIP;// 构造方法初始化成员属性publicNewClient(Socket socket,String clientName,String clientIP,int clientPort,String clientConnectDateTime){this.socket = socket;this.clientName = clientName;this.clientIP = clientIP;try{// 注册观察者addObserver(SpringContextUtils.getBean(ResultCallBack.class));// 创建客户端数据输入、输出流
                dataInputStream =newDataInputStream(socket.getInputStream());
                dataOutputStream =newDataOutputStream(socket.getOutputStream());}catch(IOException e){
                log.error(e.getMessage());closeCurrentClient();}}@Overridepublicvoidrun(){try{// 客户端在运行才能收发报文while(this.isNewClientRun){// 获取到客户端发送的报文String msg = dataInputStream.readUTF();if(StringUtils.isNotBlank(msg)){
                        log.info("clientName: {}, clientIP: {}, send msg ===> {}", clientName, clientIP, msg);}// 通知观察者处理if(StringUtils.startsWith(msg,"hello Command")){setChanged();notifyObservers(msg);continue;}// 向客户端传送数据int index =0;for(String key : allClient.keySet()){
                        index++;if(StringUtils.equals(key, clientName)){
                            allClient.get(key).sendMsg("from server "+ msg +StringUtils.repeat("-----", index));}}}}catch(IOException e){
                log.error(e.getMessage());closeCurrentClient();}}/**
         * 断开当前客户端的连接释放资源
         */publicvoidcloseCurrentClient(){try{// 结束客户端的运行状态
                isNewClientRun =false;// 断开数据输出出流if(dataOutputStream !=null){
                    dataOutputStream.close();}// 断开数据输入出流if(dataInputStream !=null){
                    dataInputStream.close();}// 断开客户端套解析if(socket !=null){
                    socket.close();}// 将该客户端从客户端容器中删除
                allClient.remove(clientName);
                log.info("**** remove client ===> {}", allClient.keySet());}catch(IOException e){
                log.error(e.getMessage());}}/**
         * 发送报文
         */publicvoidsendMsg(String msg){try{// 发送报文
                dataOutputStream.writeUTF(msg);// 清空报文缓存
                dataOutputStream.flush();}catch(IOException e){
                log.error(e.getMessage());closeCurrentClient();}}}}//goto src\main\java\com\fly\tcp\DemoController.javapackagecom.fly.tcp;importjava.io.IOException;importjava.util.concurrent.Callable;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importorg.springframework.web.context.request.async.DeferredResult;importorg.springframework.web.context.request.async.WebAsyncTask;importcom.fly.core.JsonResult;importcom.fly.core.utils.JsonBeanUtils;importcom.fly.tcp.base.InitManager;importcom.fly.tcp.entity.Command;importcom.fly.tcp.service.ResultCallBack;importio.swagger.annotations.Api;importio.swagger.annotations.ApiImplicitParam;importio.swagger.annotations.ApiOperation;importlombok.extern.slf4j.Slf4j;@Slf4j@Api(tags ="tcp应用接口")@RestController@RequestMapping("/tcp")publicclassDemoController{@AutowiredInitManager initManager;@AutowiredResultCallBack resultCallBack;@AutowiredThreadPoolTaskExecutor executor;@ApiOperation("client发消息")@ApiImplicitParam(name ="msg", example ="忽如一夜春风来", required =true)@PostMapping("/client/sendMsg")publicStringsendMsgFromClient(String msg){
        initManager.getClient().sendMsg(msg);return msg;}/**
     * 客户端业务处理结果更多是通过回调实现的
     * 
     * @param cmd
     * @return
     * @throws IOException
     */@ApiOperation("server发消息Callable")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/callable")publicCallable<JsonResult<?>>sendMsgFromServer01(String msg)throwsIOException{return()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
            initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <50; i++){
                result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){returnJsonResult.success(result,"获取处理结果成功");}else{TimeUnit.MILLISECONDS.sleep(100);}}returnJsonResult.error("响应超时,请重试");};}/**
     * 客户端业务处理结果更多是通过回调实现的(WebAsyncTask升级版callable,增加超时异常等处理)
     * 
     * @param cmd
     * @return
     * @throws IOException
     */@ApiOperation("server发消息webAsyncTask")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/webAsyncTask")publicWebAsyncTask<String>sendMsgFromServer02(String msg)throwsIOException{WebAsyncTask<String> webAsyncTask =newWebAsyncTask<String>(2000L, executor,()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
            initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <100; i++){
                result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){return result;}else{TimeUnit.MILLISECONDS.sleep(50);}}return"响应超时,请重试";});
        webAsyncTask.onCompletion(()-> log.info("调用完成"));
        webAsyncTask.onError(()->{
            log.error("业务处理出错");return"error";});
        webAsyncTask.onTimeout(()->{
            log.info("业务处理超时");return"Time Out";});return webAsyncTask;}/**
     * 客户端业务处理结果更多是通过回调实现的
     * 
     * @param cmd
     * @return
     * @throws IOException
     * @throws TimeoutException
     * @throws ExecutionException
     * @throws InterruptedException
     */@ApiOperation("server发消息")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg")publicDeferredResult<String>sendMsgFromServer(String msg)throwsIOException,InterruptedException,ExecutionException,TimeoutException{String clientName = initManager.getClient().getClientName();Command command =newCommand().setClientId(clientName).setText(msg);
        initManager.getServer().sendMsg(clientName,JsonBeanUtils.beanToJson(command));// 异步返回结果DeferredResult<String> deferredResult =newDeferredResult<>(20000L,"失败");
        deferredResult.onCompletion(()-> log.info("调用完成"));
        deferredResult.onTimeout(()->{
            log.info("调用超时");
            deferredResult.setResult("调用超时");});
        resultCallBack.processResult(deferredResult, command.getCommandId());return deferredResult;}}//goto src\main\java\com\fly\tcp\entity\Command.javapackagecom.fly.tcp.entity;importjava.util.UUID;importlombok.Data;importlombok.experimental.Accessors;@Data@Accessors(chain =true)publicclassCommand{/**
     * 命令id
     */privateString commandId = UUID.randomUUID().toString();/**
     * 客户端id
     */privateString clientId;/**
     * 具体命令内容
     */privateString text;}//goto src\main\java\com\fly\tcp\service\ResultCallBack.javapackagecom.fly.tcp.service;importjava.io.IOException;importjava.util.Map;importjava.util.Observable;importjava.util.Observer;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.commons.lang3.StringUtils;importorg.springframework.stereotype.Service;importorg.springframework.web.context.request.async.DeferredResult;importcom.fly.core.utils.JsonBeanUtils;importcom.fly.tcp.entity.Command;importlombok.extern.slf4j.Slf4j;/**
 * 结果回调处理
 */@Slf4j@ServicepublicclassResultCallBackimplementsObserver{Map<String,String> result =newConcurrentHashMap<>();ExecutorService executorService =Executors.newFixedThreadPool(10);/**
     * 观察者接受数据处理
     */@Overridepublicvoidupdate(Observable observable,Object msg){
        log.info("### accept #### {}", msg);try{if(msg instanceofString){String json =StringUtils.substringAfter((String)msg,"hello Command!");Command command =JsonBeanUtils.jsonToBean(json,Command.class);
                log.info("{}", command);
                result.put(command.getCommandId(), json);}}catch(IOException e){
            log.error(e.getMessage());}}/**
     * 获取命令处理结果
     * 
     * @param commandId
     * @return
     */publicStringqueryResult(String commandId){return result.get(commandId);}/**
     * 业务线程处理业务,DeferredResult可以通过任何线程来计算返回一个结果
     * 
     * @param deferredResult
     * @param commandId
     * @throws TimeoutException
     * @throws ExecutionException
     * @throws InterruptedException
     */publicvoidprocessResult(DeferredResult<String> deferredResult,String commandId)throwsInterruptedException,ExecutionException,TimeoutException{processResult02(deferredResult, commandId);}/**
     * 无超时设置,不推荐
     * 
     * @param deferredResult
     * @param commandId
     */voidprocessResult01(DeferredResult<String> deferredResult,String commandId){
        executorService.execute(()->{while(!result.containsKey(commandId)){try{
                    log.info("waitting......");TimeUnit.MILLISECONDS.sleep(20);}catch(InterruptedException e){}}
            deferredResult.setResult(result.get(commandId));});}/**
     * 有超时设置(推荐)
     * 
     * 
     * @param deferredResult
     * @param commandId
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */privatevoidprocessResult02(DeferredResult<String> deferredResult,String commandId)throwsInterruptedException,ExecutionException,TimeoutException{Future<String> future = executorService.submit(()->{while(!result.containsKey(commandId)){
                log.info("waitting......");TimeUnit.MILLISECONDS.sleep(20);}return result.get(commandId);});String result = future.get(1000L,TimeUnit.MILLISECONDS);
        deferredResult.setResult(result);}}//goto src\main\resources\application-dev.yml
#设置日志级别
logging: 
  level:
    org:
      springframework:
        web: INFO
//goto src\main\resources\application.yml
server:
  port:8081
  servlet:
    context-path:/
    session:
      timeout:1800
spring:
  servlet:
    multipart:
      max-file-size:10MB
      max-request-size:100MB
  profiles:
    active:- test

#必须启用下面配置,否则接口排序失效
knife4j:
  enable:true

#设置日志级别
logging: 
  level:
    root: INFO
//goto src\test\java\com\fly\controller\ApiControllerTest.javapackagecom.fly.controller;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.Arrays;importorg.apache.commons.io.IOUtils;importorg.junit.jupiter.api.Test;importorg.springframework.core.io.ClassPathResource;importorg.springframework.core.io.Resource;importorg.springframework.http.HttpEntity;importorg.springframework.http.HttpHeaders;importorg.springframework.http.MediaType;importorg.springframework.http.ResponseEntity;importorg.springframework.http.client.SimpleClientHttpRequestFactory;importorg.springframework.web.client.RestTemplate;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassApiControllerTest{privatestaticRestTemplate restTemplate;static{// 配置proxy,connectTimeout,readTimeout等参数SimpleClientHttpRequestFactory requestFactory =newSimpleClientHttpRequestFactory();
        requestFactory.setConnectTimeout(1000);
        requestFactory.setReadTimeout(1000);
        restTemplate =newRestTemplate(requestFactory);}@TestpublicvoidtestJsonRequestBody2()throwsIOException{String url ="http://192.168.114.250:8080/api/post";HttpHeaders headers =newHttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));Resource resource =newClassPathResource("data/json");String text =IOUtils.toString(resource.getInputStream(),StandardCharsets.UTF_8);HttpEntity<String> requestEntity =newHttpEntity<>(text, headers);ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, requestEntity,String.class);
        log.info("******  ResponseEntity body: {}", responseEntity.getBody());}}//goto src\test\java\com\fly\executor\ExecutorServiceTest.javapackagecom.fly.executor;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Future;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.commons.lang3.concurrent.BasicThreadFactory;importorg.junit.jupiter.api.Test;importorg.springframework.http.MediaType;importorg.springframework.web.reactive.function.client.WebClient;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassExecutorServiceTest{privateWebClient webClient =WebClient.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build();privateExecutorService executorService =newThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>(),newBasicThreadFactory.Builder().namingPattern("t-%03d").daemon(true).priority(Thread.MAX_PRIORITY).build());/**
     * 带超时条件线程池调用
     * 
     * @throws TimeoutException
     * @throws ExecutionException
     * @throws InterruptedException
     * 
     * @throws IOException
     */@TestpublicvoidtestTimeout()throwsInterruptedException,ExecutionException,TimeoutException{String ip ="192.168.0.1";Future<String> future = executorService.submit(()->{return webClient.get().uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port("2375").path("/_ping").build())// URI.acceptCharset(StandardCharsets.UTF_8).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(String.class).block();});
        log.info("return: {}", future.get(1000L,TimeUnit.MILLISECONDS));}}//goto src\test\resources\log4j2.xml<?xml version="1.0" encoding="UTF-8"?><Configuration status="WARN"><Properties><Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property><Property name="LOG_LEVEL_PATTERN">%5p</Property><Property name="LOG_DATEFORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property><Property name="CONSOLE_LOG_PATTERN">%clr{%d{${LOG_DATEFORMAT_PATTERN}}}{faint}%clr{${LOG_LEVEL_PATTERN}}%clr{%pid}{magenta}%clr{---}{faint}%clr{[%15.15t]}{faint}%clr{%-40.40c{1.}}{cyan}%clr{:}{faint}%m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}</Property><Property name="FILE_LOG_PATTERN">%d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN}%pid ---[%t]%-40.40c{1.}:%m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}</Property></Properties><Appenders><Console name="Console" target="SYSTEM_OUT" follow="true"><PatternLayout pattern="${sys:CONSOLE_LOG_PATTERN}"/></Console></Appenders><Loggers><Logger name="org.apache.catalina.startup.DigesterFactory" level="error"/><Logger name="org.apache.catalina.util.LifecycleBase" level="error"/><Logger name="org.apache.coyote.http11.Http11NioProtocol" level="warn"/><logger name="org.apache.sshd.common.util.SecurityUtils" level="warn"/><Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="warn"/><Logger name="org.eclipse.jetty.util.component.AbstractLifeCycle" level="error"/><Logger name="org.hibernate.validator.internal.util.Version" level="warn"/><logger name="org.springframework.boot.actuate.endpoint.jmx" level="warn"/><Root level="info"><AppenderRef ref="Console"/></Root></Loggers></Configuration>

六、运行界面

在这里插入图片描述
在这里插入图片描述
具体的部署和测试不再详细阐述。

七. 主要技术点

  1. 观察者模式
  2. 命令模式
  3. 线程池应用
  4. 异步Callable、WebAsyncTask、DeferredResult

**

有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!

**

-over-


本文转载自: https://blog.csdn.net/qq_16127313/article/details/142731263
版权归原作者 爱码少年 00fly.online 所有, 如有侵权,请联系我们删除。

“springboot工程中使用tcp协议”的评论:

还没有评论