文章目录
一、概述
在上文JAVA TCP协议初体验 中,我们使用java实现了tcp协议的一个雏形,实际中大部分项目都已采用springboot,那么,怎么在springboot中整合tcp协议呢?如何实现服务器controller通过tcp协议下发命令到tcp client执行,并且在controller中获取执行结果?
二、实现思路
为了方便演示,本文我们将TcpClient、TcpServer放在同一工程,具体做法为:
- 拷贝之前文章的TcpClient、TcpServer代码
- 提取服务器、客户端启动代码,使用@PostConstruct注解修饰
- 开发controller,定义客户端发送消息、服务器发送消息
三、代码结构

四、代码分析
1. 服务端发送命令
参考DemoController L67-68,这边通过封装命令对象,转换为字符串,发送到客户端。
#mermaid-svg-HDmI19qWZzGqUKfs {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .error-icon{fill:#552222;}#mermaid-svg-HDmI19qWZzGqUKfs .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-HDmI19qWZzGqUKfs .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-HDmI19qWZzGqUKfs .marker{fill:#333333;stroke:#333333;}#mermaid-svg-HDmI19qWZzGqUKfs .marker.cross{stroke:#333333;}#mermaid-svg-HDmI19qWZzGqUKfs svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-HDmI19qWZzGqUKfs .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster-label text{fill:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster-label span{color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .label text,#mermaid-svg-HDmI19qWZzGqUKfs span{fill:#333;color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .node rect,#mermaid-svg-HDmI19qWZzGqUKfs .node circle,#mermaid-svg-HDmI19qWZzGqUKfs .node ellipse,#mermaid-svg-HDmI19qWZzGqUKfs .node polygon,#mermaid-svg-HDmI19qWZzGqUKfs .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-HDmI19qWZzGqUKfs .node .label{text-align:center;}#mermaid-svg-HDmI19qWZzGqUKfs .node.clickable{cursor:pointer;}#mermaid-svg-HDmI19qWZzGqUKfs .arrowheadPath{fill:#333333;}#mermaid-svg-HDmI19qWZzGqUKfs .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-HDmI19qWZzGqUKfs .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-HDmI19qWZzGqUKfs .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-HDmI19qWZzGqUKfs .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster text{fill:#333;}#mermaid-svg-HDmI19qWZzGqUKfs .cluster span{color:#333;}#mermaid-svg-HDmI19qWZzGqUKfs div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-HDmI19qWZzGqUKfs :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
封装命令
转换为字符串
发送到客户端
Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));
2. 客户端处理
if(msg.contains("commandId")){Command cmd =JsonBeanUtils.jsonToBean(msg,Command.class);sendMsg("hello Command!"+JsonBeanUtils.beanToJson(cmd));}
3. 结果回调
这边使用了观察者模式
#mermaid-svg-SkebDWNVex4QzLIm {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SkebDWNVex4QzLIm .error-icon{fill:#552222;}#mermaid-svg-SkebDWNVex4QzLIm .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-SkebDWNVex4QzLIm .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-SkebDWNVex4QzLIm .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-SkebDWNVex4QzLIm .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-SkebDWNVex4QzLIm .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-SkebDWNVex4QzLIm .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-SkebDWNVex4QzLIm .marker{fill:#333333;stroke:#333333;}#mermaid-svg-SkebDWNVex4QzLIm .marker.cross{stroke:#333333;}#mermaid-svg-SkebDWNVex4QzLIm svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-SkebDWNVex4QzLIm .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-SkebDWNVex4QzLIm .cluster-label text{fill:#333;}#mermaid-svg-SkebDWNVex4QzLIm .cluster-label span{color:#333;}#mermaid-svg-SkebDWNVex4QzLIm .label text,#mermaid-svg-SkebDWNVex4QzLIm span{fill:#333;color:#333;}#mermaid-svg-SkebDWNVex4QzLIm .node rect,#mermaid-svg-SkebDWNVex4QzLIm .node circle,#mermaid-svg-SkebDWNVex4QzLIm .node ellipse,#mermaid-svg-SkebDWNVex4QzLIm .node polygon,#mermaid-svg-SkebDWNVex4QzLIm .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-SkebDWNVex4QzLIm .node .label{text-align:center;}#mermaid-svg-SkebDWNVex4QzLIm .node.clickable{cursor:pointer;}#mermaid-svg-SkebDWNVex4QzLIm .arrowheadPath{fill:#333333;}#mermaid-svg-SkebDWNVex4QzLIm .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-SkebDWNVex4QzLIm .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-SkebDWNVex4QzLIm .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-SkebDWNVex4QzLIm .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-SkebDWNVex4QzLIm .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-SkebDWNVex4QzLIm .cluster text{fill:#333;}#mermaid-svg-SkebDWNVex4QzLIm .cluster span{color:#333;}#mermaid-svg-SkebDWNVex4QzLIm div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-SkebDWNVex4QzLIm :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
定义可观察对象
添加观察者
观察者业务处理
定义可观察对象
classNewClientextendsObservable
添加观察者
// 注册观察者addObserver(SpringContextUtils.getBean(ResultCallBack.class));
观察者业务处理
/**
* 结果回调处理
*/@Slf4j@ServicepublicclassResultCallBackimplementsObserver{Map<String,String> result =newConcurrentHashMap<>();ExecutorService executorService =Executors.newFixedThreadPool(10);/**
* 观察者接受数据处理
*/@Overridepublicvoidupdate(Observable observable,Object msg){
log.info("### accept #### {}", msg);try{if(msg instanceofString){String json =StringUtils.substringAfter((String)msg,"hello Command!");Command command =JsonBeanUtils.jsonToBean(json,Command.class);
log.info("{}", command);
result.put(command.getCommandId(), json);}}catch(IOException e){
log.error(e.getMessage());}}
。。。
4. 接口获取处理结果
DemoController 这里使用了3种不同的方式处理,大家可以比较处理的异同
/**
* 客户端业务处理结果更多是通过回调实现的
*
* @param cmd
* @return
* @throws IOException
*/@ApiOperation("server发消息Callable")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/callable")publicCallable<JsonResult<?>>sendMsgFromServer01(String msg)throwsIOException{return()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <50; i++){
result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){returnJsonResult.success(result,"获取处理结果成功");}else{TimeUnit.MILLISECONDS.sleep(100);}}returnJsonResult.error("响应超时,请重试");};}/**
* 客户端业务处理结果更多是通过回调实现的(WebAsyncTask升级版callable,增加超时异常等处理)
*
* @param cmd
* @return
* @throws IOException
*/@ApiOperation("server发消息webAsyncTask")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/webAsyncTask")publicWebAsyncTask<String>sendMsgFromServer02(String msg)throwsIOException{WebAsyncTask<String> webAsyncTask =newWebAsyncTask<String>(2000L, executor,()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <100; i++){
result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){return result;}else{TimeUnit.MILLISECONDS.sleep(50);}}return"响应超时,请重试";});
webAsyncTask.onCompletion(()-> log.info("调用完成"));
webAsyncTask.onError(()->{
log.error("业务处理出错");return"error";});
webAsyncTask.onTimeout(()->{
log.info("业务处理超时");return"Time Out";});return webAsyncTask;}/**
* 客户端业务处理结果更多是通过回调实现的
*
* @param cmd
* @return
* @throws IOException
* @throws TimeoutException
* @throws ExecutionException
* @throws InterruptedException
*/@ApiOperation("server发消息")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg")publicDeferredResult<String>sendMsgFromServer(String msg)throwsIOException,InterruptedException,ExecutionException,TimeoutException{String clientName = initManager.getClient().getClientName();Command command =newCommand().setClientId(clientName).setText(msg);
initManager.getServer().sendMsg(clientName,JsonBeanUtils.beanToJson(command));// 异步返回结果DeferredResult<String> deferredResult =newDeferredResult<>(20000L,"失败");
deferredResult.onCompletion(()-> log.info("调用完成"));
deferredResult.onTimeout(()->{
log.info("调用超时");
deferredResult.setResult("调用超时");});
resultCallBack.processResult(deferredResult, command.getCommandId());return deferredResult;}
五、代码放送
https://gitcode.com/00fly/tcp-show/tree/main/springboot-tcp
**
或者使用下面的备份文件恢复成原始的项目代码
**
如何恢复,请移步查阅:神奇代码恢复工具
//goto docker\docker-compose.yml
version:'3.7'
services:
springboot-tcp:
image: registry.cn-shanghai.aliyuncs.com/00fly/springboot-tcp:1.0.0
container_name: springboot-tcp
deploy:
resources:
limits:
cpus:'1.0'
memory:200M
reservations:
cpus:'0.05'
memory:200M
ports:-8080:8081
restart: on-failure
logging:
driver: json-file
options:
max-size:'5m'
max-file:'1'//goto docker\restart.sh
#!/bin/bash
docker-compose down && docker-compose up -d && docker stats
//goto docker\stop.sh
#!/bin/bash
docker-compose down
//goto docker\wait-for.sh
#!/bin/sh
TIMEOUT=15
QUIET=0echoerr(){if["$QUIET"-ne 1]; then printf "%s\n""$*"1>&2; fi
}usage(){
exitcode="$1"
cat << USAGE >&2Usage:
$cmdname host:port [-t timeout][-- command args]-q |--quiet Do not output any status messages
-t TIMEOUT |--timeout=timeout Timeout in seconds, zero for no timeout
-- COMMAND ARGS Execute command withargs after the test finishes
USAGE
exit "$exitcode"}wait_for(){for i in `seq $TIMEOUT` ;do
nc -z "$HOST""$PORT">/dev/null2>&1
result=$?if[ $result -eq 0]; then
if[ $# -gt 0]; then
exec "$@"
fi
exit 0
fi
sleep 1
done
echo "Operation timed out">&2
exit 1}while[ $# -gt 0]docase"$1" in
*:*)
HOST=$(printf "%s\n""$1"| cut -d :-f 1)
PORT=$(printf "%s\n""$1"| cut -d :-f 2)
shift 1;;-q |--quiet)
QUIET=1
shift 1;;-t)
TIMEOUT="$2"if["$TIMEOUT"=""]; then break; fi
shift 2;;--timeout=*)
TIMEOUT="${1#*=}"
shift 1;;--)
shift
break;;--help)
usage 0;;*)
echoerr "Unknown argument: $1"
usage 1;;
esac
done
if["$HOST"=""-o "$PORT"=""]; then
echoerr "Error: you need to provide a host and port to test."
usage 2
fi
wait_for "$@"//goto Dockerfile
#基础镜像
FROM adoptopenjdk/openjdk8-openj9:alpine-slim
COPY docker/wait-for.sh /
RUN chmod +x /wait-for.sh && \
ln -sf /usr/share/zoneinfo/Asia/Shanghai/etc/localtime && \
echo 'Asia/Shanghai'>/etc/timezone
#引入运行包
COPY target/*.jar /app.jar
#指定交互端口
EXPOSE 8081
CMD ["--server.port=8081"]
#项目的启动方式
ENTRYPOINT ["java", "-Djava.security.egd=file:/dev/./urandom", "-Xshareclasses", "-Xquickstart", "-jar", "/app.jar"]
//goto pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fly</groupId>
<artifactId>springboot-tcp</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath />
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub>
<java.version>1.8</java.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<!-- Compile -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
<version>2.0.8</version>
</dependency>
<!-- 异步日志,需要加入disruptor依赖 -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<!-- 阿里云maven仓库 -->
<repositories>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>https://maven.aliyun.com/repository/public/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>https://maven.aliyun.com/repository/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!-- 添加docker-maven插件 -->
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.40.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>build</goal>
<!--<goal>push</goal>-->
<!--<goal>remove</goal>-->
</goals>
</execution>
</executions>
<configuration>
<!-- 连接到带docker环境的linux服务器编译image -->
<!-- <dockerHost>http://192.168.182.10:2375</dockerHost> -->
<!-- Docker 推送镜像仓库地址 -->
<pushRegistry>${docker.hub}</pushRegistry>
<images>
<image>
<name>
${docker.hub}/00fly/${project.artifactId}:${project.version}</name>
<build>
<dockerFileDir>${project.basedir}</dockerFileDir>
</build>
</image>
</images>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/java</directory>
<excludes>
<exclude>**/*.java</exclude></excludes></resource><resource><directory>src/main/resources</directory><includes><include>**/**</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
//goto src\main\java\com\fly\BootApplication.java
package com.fly;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BootApplication
{
public static void main(String[] args)
{
SpringApplication.run(BootApplication.class, args);
}
}
//goto src\main\java\com\fly\core\config\AsyncThreadPoolConfig.java
package com.fly.core.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;
/**
*
* 异步线程池配置
*
* @author 00fly
* @version [版本号, 2023年10月22日]
* @see [相关类/方法]
* @since [产品/模块版本]
*/@Slf4j@Configuration@EnableAsyncpublicclassAsyncThreadPoolConfigimplementsAsyncConfigurer{@BeanThreadPoolTaskExecutortaskExecutor(){int processors =Runtime.getRuntime().availableProcessors();ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
executor.setCorePoolSize(Math.max(processors,5));
executor.setMaxPoolSize(Math.max(processors,5)*2);
executor.setQueueCapacity(10000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("asyncTask-");// ThreadPoolExecutor类有几个内部实现类来处理这类情况:// AbortPolicy 丢弃任务,抛运行时异常// CallerRunsPolicy 执行任务// DiscardPolicy 忽视,什么都不会发生// DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
executor.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardPolicy());return executor;}@OverridepublicExecutorgetAsyncExecutor(){returntaskExecutor();}@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex, method, params)->{
log.info("Exception message - {}", ex.getMessage());
log.info("Method name - {}", method.getName());for(Object param : params){
log.info("Parameter value - {}", param);}};}}//goto src\main\java\com\fly\core\config\Knife4jConfig.javapackagecom.fly.core.config;importjava.util.Collections;importjava.util.List;importorg.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Import;importcom.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j;importio.swagger.annotations.ApiOperation;importspringfox.bean.validators.configuration.BeanValidatorPluginsConfiguration;importspringfox.documentation.builders.ApiInfoBuilder;importspringfox.documentation.builders.PathSelectors;importspringfox.documentation.builders.RequestHandlerSelectors;importspringfox.documentation.service.ApiInfo;importspringfox.documentation.service.ApiKey;importspringfox.documentation.spi.DocumentationType;importspringfox.documentation.spring.web.plugins.Docket;importspringfox.documentation.swagger2.annotations.EnableSwagger2WebMvc;/**
* Knife4jConfig
*
*/@EnableKnife4j@Configuration@EnableSwagger2WebMvc@ConditionalOnWebApplication@Import(BeanValidatorPluginsConfiguration.class)publicclassKnife4jConfig{/**
* 开发、测试环境接口文档打开
*
* @return
* @see [类、类#方法、类#成员]
*/@BeanDocketcreateRestApi(){returnnewDocket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).enable(true).select().apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class)).paths(PathSelectors.any())// 包下的类,生成接口文档.build().securitySchemes(security());}privateApiInfoapiInfo(){returnnewApiInfoBuilder().title("数据接口API").description("接口文档").termsOfServiceUrl("http://00fly.online/").version("1.0.0").build();}privateList<ApiKey>security(){returnCollections.singletonList(newApiKey("token","token","header"));}}//goto src\main\java\com\fly\core\config\WebClientConfig.javapackagecom.fly.core.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.web.reactive.function.client.WebClient;/**
* 配置WebClient
*/@ConfigurationpublicclassWebClientConfig{@BeanWebClientwebClient(){returnWebClient.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build();}}//goto src\main\java\com\fly\core\config\WebMvcConfig.javapackagecom.fly.core.config;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;importorg.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.http.MediaType;importorg.springframework.http.converter.HttpMessageConverter;importorg.springframework.http.converter.StringHttpMessageConverter;importorg.springframework.http.converter.json.MappingJackson2HttpMessageConverter;importorg.springframework.web.servlet.config.annotation.ContentNegotiationConfigurer;importorg.springframework.web.servlet.config.annotation.ViewControllerRegistry;importorg.springframework.web.servlet.config.annotation.WebMvcConfigurer;/**
*
* mvc配置
*
* @author 00fly
* @version [版本号, 2021年4月23日]
* @see [相关类/方法]
* @since [产品/模块版本]
*/@Configuration@ConditionalOnWebApplicationpublicclassWebMvcConfigimplementsWebMvcConfigurer{@OverridepublicvoidconfigureMessageConverters(finalList<HttpMessageConverter<?>> converters){
converters.add(stringHttpMessageConverter());
converters.add(mappingJackson2HttpMessageConverter());}@OverridepublicvoidconfigureContentNegotiation(finalContentNegotiationConfigurer configurer){
configurer.defaultContentType(MediaType.APPLICATION_JSON);
configurer.ignoreUnknownPathExtensions(false);
configurer.favorPathExtension(true);
configurer.favorParameter(false);finalMap<String,MediaType> mediaTypes =newConcurrentHashMap<>(3);
mediaTypes.put("atom",MediaType.APPLICATION_ATOM_XML);
mediaTypes.put("html",MediaType.TEXT_HTML);
mediaTypes.put("json",MediaType.APPLICATION_JSON);
configurer.mediaTypes(mediaTypes);}@BeanStringHttpMessageConverterstringHttpMessageConverter(){returnnewStringHttpMessageConverter();}@BeanMappingJackson2HttpMessageConvertermappingJackson2HttpMessageConverter(){finalMappingJackson2HttpMessageConverter messageConverter =newMappingJackson2HttpMessageConverter();finalList<MediaType> list =newArrayList<>();
list.add(MediaType.APPLICATION_JSON);
list.add(MediaType.APPLICATION_XML);
list.add(MediaType.TEXT_PLAIN);
list.add(MediaType.TEXT_HTML);
list.add(MediaType.TEXT_XML);
messageConverter.setSupportedMediaTypes(list);return messageConverter;}/**
* 等价于mvc中<mvc:view-controller path="/" view-name="redirect:index" /><br>
* 等价于mvc中<mvc:view-controller path="/index" view-name="index.html" />
*
* @param registry
*/@OverridepublicvoidaddViewControllers(finalViewControllerRegistry registry){
registry.addViewController("/").setViewName("redirect:index");
registry.addViewController("/index").setViewName("index.html");}}//goto src\main\java\com\fly\core\exception\GlobalExceptionHandler.javapackagecom.fly.core.exception;importjava.util.ArrayList;importjava.util.Collections;importjava.util.List;importjava.util.stream.Collectors;importorg.apache.commons.lang3.StringUtils;importorg.springframework.validation.BindException;importorg.springframework.validation.BindingResult;importorg.springframework.web.bind.MethodArgumentNotValidException;importorg.springframework.web.bind.annotation.ExceptionHandler;importorg.springframework.web.bind.annotation.RestControllerAdvice;importcom.fly.core.JsonResult;importlombok.extern.slf4j.Slf4j;/**
* 统一异常处理器
*
* @author 00fly
* @version [版本号, 2018-09-11]
* @see [相关类/方法]
* @since [产品/模块版本]
*/@Slf4j@RestControllerAdvicepublicclassGlobalExceptionHandler{@ExceptionHandler(Exception.class)publicJsonResult<?>handleBadRequest(Exception exception){// JSR303参数校验异常if(exception instanceofBindException){BindingResult bindingResult =((BindException)exception).getBindingResult();if(null!= bindingResult && bindingResult.hasErrors()){List<String> errMsg =newArrayList<>();
bindingResult.getFieldErrors().stream().forEach(fieldError ->{
errMsg.add(fieldError.getDefaultMessage());});Collections.sort(errMsg);returnJsonResult.error(StringUtils.join(errMsg,","));}}if(exception instanceofMethodArgumentNotValidException){BindingResult bindingResult =((MethodArgumentNotValidException)exception).getBindingResult();if(null!= bindingResult && bindingResult.hasErrors()){// stream写法优化returnJsonResult.error(bindingResult.getFieldErrors().stream().map(e -> e.getDefaultMessage()).sorted().collect(Collectors.joining(",")));}}// 其余情况
log.error("Error: handleBadRequest StackTrace : {}", exception);returnJsonResult.error(StringUtils.defaultString(exception.getMessage(),"系统异常,请联系管理员"));}}//goto src\main\java\com\fly\core\JsonResult.javapackagecom.fly.core;importio.swagger.annotations.ApiModel;importio.swagger.annotations.ApiModelProperty;importlombok.Data;importlombok.experimental.Accessors;/**
*
* 结果对象
*
* @author 00fly
* @version [版本号, 2021年5月2日]
* @see [相关类/方法]
* @since [产品/模块版本]
*/@Data@Accessors(chain =true)@ApiModel(description ="Json格式消息体")publicclassJsonResult<T>{@ApiModelProperty(value ="数据对象")privateT data;@ApiModelProperty(value ="是否成功", required =true, example ="true")privateboolean success;@ApiModelProperty(value ="错误码")privateString errorCode;@ApiModelProperty(value ="提示信息")privateString message;publicJsonResult(){super();}publicstatic<T>JsonResult<T>success(T data){JsonResult<T> r =newJsonResult<>();
r.setData(data);
r.setSuccess(true);return r;}publicstatic<T>JsonResult<T>success(T data,String msg){JsonResult<T> r =newJsonResult<>();
r.setData(data);
r.setMessage(msg);
r.setSuccess(true);return r;}publicstaticJsonResult<?>success(){JsonResult<Object> r =newJsonResult<>();
r.setSuccess(true);return r;}publicstaticJsonResult<Object>error(String code,String msg){JsonResult<Object> r =newJsonResult<>();
r.setSuccess(false);
r.setErrorCode(code);
r.setMessage(msg);return r;}publicstaticJsonResult<Object>error(String msg){returnerror("500", msg);}}//goto src\main\java\com\fly\core\runner\WebStartedRunner.javapackagecom.fly.core.runner;importorg.apache.commons.lang3.StringUtils;importorg.apache.commons.lang3.SystemUtils;importorg.springframework.boot.CommandLineRunner;importorg.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.stereotype.Component;importcom.fly.core.utils.SpringContextUtils;@Component@Configuration@ConditionalOnWebApplicationpublicclassWebStartedRunner{@Bean@ConditionalOnWebApplicationCommandLineRunnerinit(){return args ->{if(SystemUtils.IS_OS_WINDOWS)// 防止非windows系统报错,启动失败{String url =SpringContextUtils.getServerBaseURL();if(StringUtils.containsNone(url,"-"))// junit port:-1{Runtime.getRuntime().exec("cmd /c start /min "+ url);Runtime.getRuntime().exec("cmd /c start /min "+ url +"/doc.html");}}};}}//goto src\main\java\com\fly\core\utils\JsonBeanUtils.javapackagecom.fly.core.utils;importjava.io.IOException;importcom.fasterxml.jackson.core.type.TypeReference;importcom.fasterxml.jackson.databind.DeserializationFeature;importcom.fasterxml.jackson.databind.JavaType;importcom.fasterxml.jackson.databind.ObjectMapper;/**
* JsonBean转换工具
*
* @author 00fly
*
*/publicclassJsonBeanUtils{privatestaticObjectMapper objectMapper =newObjectMapper();/**
* bean转json字符串
*
* @param bean
* @return
* @throws IOException
*/publicstaticStringbeanToJson(Object bean)throwsIOException{returnbeanToJson(bean,false);}/**
* bean转json字符串
*
* @param bean
* @param pretty 是否格式美化
* @return
* @throws IOException
*/publicstaticStringbeanToJson(Object bean,boolean pretty)throwsIOException{String jsonText = objectMapper.writeValueAsString(bean);if(pretty){return objectMapper.readTree(jsonText).toPrettyString();}return objectMapper.readTree(jsonText).toString();}/**
* json字符串转bean
*
* @param jsonText
* @return
* @throws IOException
*/publicstatic<T>TjsonToBean(String jsonText,Class<T> clazz)throwsIOException{
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,true);return objectMapper.readValue(jsonText, clazz);}/**
* json字符串转bean
*
* @param jsonText
* @param clazz
* @param ingoreError 是否忽略无法识别字段
* @return
* @throws IOException
*/publicstatic<T>TjsonToBean(String jsonText,Class<T> clazz,boolean ingoreError)throwsIOException{
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,!ingoreError);return objectMapper.readValue(jsonText, clazz);}/**
* json字符串转bean
*
* @param jsonText
* @return
* @throws IOException
*/publicstatic<T>TjsonToBean(String jsonText,JavaType javaType)throwsIOException{return objectMapper.readValue(jsonText, javaType);}/**
* json字符串转bean
*
* @param jsonText
* @return
* @throws IOException
*/publicstatic<T>TjsonToBean(String jsonText,TypeReference<T> typeRef)throwsIOException{return objectMapper.readValue(jsonText, typeRef);}}//goto src\main\java\com\fly\core\utils\SpringContextUtils.javapackagecom.fly.core.utils;importjava.net.InetAddress;importjava.net.UnknownHostException;importjavax.servlet.ServletContext;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.BeansException;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.stereotype.Component;importorg.springframework.util.Assert;importlombok.extern.slf4j.Slf4j;/**
* Spring Context 工具类
*/@Slf4j@ComponentpublicclassSpringContextUtilsimplementsApplicationContextAware{privatestaticApplicationContext applicationContext;privatestaticString SERVER_BASE_URL =null;@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{
log.info("###### execute setApplicationContext ######");SpringContextUtils.applicationContext = applicationContext;}publicstaticApplicationContextgetApplicationContext(){return applicationContext;}publicstatic<T>TgetBean(Class<T> clazz){Assert.notNull(applicationContext,"applicationContext is null");return applicationContext.getBean(clazz);}/**
* execute @PostConstruct May be SpringContextUtils not inited, throw NullPointerException
*
* @return
*/publicstaticStringgetActiveProfile(){Assert.notNull(applicationContext,"applicationContext is null");String[] profiles = applicationContext.getEnvironment().getActiveProfiles();returnStringUtils.join(profiles,",");}/**
* can use in @PostConstruct
*
* @param context
* @return
*/publicstaticStringgetActiveProfile(ApplicationContext context){Assert.notNull(context,"context is null");String[] profiles = context.getEnvironment().getActiveProfiles();returnStringUtils.join(profiles,",");}/**
* get web服务基准地址,一般为 http://${ip}:${port}/${contentPath}
*
* @return
* @throws UnknownHostException
* @see [类、类#方法、类#成员]
*/publicstaticStringgetServerBaseURL()throwsUnknownHostException{ServletContext servletContext =getBean(ServletContext.class);Assert.notNull(servletContext,"servletContext is null");if(SERVER_BASE_URL ==null){String ip =InetAddress.getLocalHost().getHostAddress();
SERVER_BASE_URL ="http://"+ ip +":"+getProperty("server.port")+ servletContext.getContextPath();}return SERVER_BASE_URL;}/**
* getProperty
*
* @param key eg:server.port
* @return
* @see [类、类#方法、类#成员]
*/publicstaticStringgetProperty(String key){return applicationContext.getEnvironment().getProperty(key,"");}}//goto src\main\java\com\fly\tcp\base\InitManager.javapackagecom.fly.tcp.base;importjavax.annotation.PostConstruct;importorg.apache.commons.lang3.StringUtils;importorg.springframework.stereotype.Service;/**
* 初始化管理类
*/@ServicepublicclassInitManager{TcpServer server =newTcpServer();TcpClient client =newTcpClient("CLIENT_1");publicTcpServergetServer(){return server;}publicTcpClientgetClient(){return client;}/**
* 启动TcpServer、TcpClient
*/@PostConstructprivatevoidinit(){if(server.startServer("0.0.0.0",8000)){newThread(server).start();}// docker环境下优先使用docker-compose中environment值String serverIp =StringUtils.defaultIfBlank(System.getenv().get("TCP_SERVER"),"127.0.0.1");if(client.connectServer(serverIp,8000)){newThread(client).start();}}}//goto src\main\java\com\fly\tcp\base\TcpClient.javapackagecom.fly.tcp.base;importjava.io.DataInputStream;importjava.io.DataOutputStream;importjava.io.IOException;importjava.net.InetAddress;importjava.net.Socket;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;importcom.fly.core.utils.JsonBeanUtils;importcom.fly.tcp.entity.Command;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassTcpClientimplementsRunnable{privateString ip;privateint port;privateSocket socket;privateDataOutputStream dataOutputStream;privateString clientName;privateboolean isClientCoreRun =false;privateScheduledExecutorService scheduler =Executors.newScheduledThreadPool(2);privateExecutorService executor =Executors.newFixedThreadPool(2);publicTcpClient(String clientName){super();this.clientName = clientName;}publicStringgetClientName(){return clientName;}/**
*
* @param ip 服务端IP
* @param port 服务端PORT
* @return
*/publicbooleanconnectServer(String ip,int port){try{this.ip = ip;this.port = port;
socket =newSocket(InetAddress.getByName(ip), port);
log.info("****** TcpClient will connect to Server {}:{}", ip, port);
scheduler.scheduleAtFixedRate(this::checkConnection,0,10,TimeUnit.SECONDS);
isClientCoreRun =true;
dataOutputStream =newDataOutputStream(socket.getOutputStream());
dataOutputStream.writeUTF(clientName);
dataOutputStream.flush();}catch(IOException e){
log.error(e.getMessage());
isClientCoreRun =false;}return isClientCoreRun;}/**
* 检查TCP连接
*/privatevoidcheckConnection(){if(socket ==null|| socket.isClosed()){
log.error("Connection lost, attempting to reconnect");reconnect();}}privatevoidreconnect(){try{
socket =newSocket(InetAddress.getByName(ip), port);
log.info("****** TcpClient will connect to Server {}:{}", ip, port);
isClientCoreRun =true;
executor.execute(newReceiveMsg());
dataOutputStream =newDataOutputStream(socket.getOutputStream());
dataOutputStream.writeUTF(clientName);
dataOutputStream.flush();}catch(IOException e){
log.error(e.getMessage());
isClientCoreRun =false;}}/**
* 发送报文
*/publicvoidsendMsg(String msg){try{
dataOutputStream.writeUTF(msg);
dataOutputStream.flush();}catch(IOException e){
log.error(e.getMessage());closeClientConnect();}}/**
* 断开客户端与服务端的连接
*/publicvoidcloseClientConnect(){if(dataOutputStream !=null){try{
dataOutputStream.close();
isClientCoreRun =false;if(socket !=null){
socket.close();}}catch(IOException e){
log.error(e.getMessage());}}}@Overridepublicvoidrun(){
executor.execute(newReceiveMsg());}classReceiveMsgimplementsRunnable{privateDataInputStream dataInputStream;publicReceiveMsg(){try{// 数据输入流
dataInputStream =newDataInputStream(socket.getInputStream());}catch(IOException e){
log.error(e.getMessage());}}@Overridepublicvoidrun(){try{// server停止后, 会影响接受消息线程工作while(isClientCoreRun){String msg = dataInputStream.readUTF();
log.info("{} get msg: {}", clientName, msg);if(msg.contains("commandId")){Command cmd =JsonBeanUtils.jsonToBean(msg,Command.class);sendMsg("hello Command!"+JsonBeanUtils.beanToJson(cmd));}}}catch(IOException e){
log.error(e.getMessage());// 防止重连失败closeClientConnect();}}}}//goto src\main\java\com\fly\tcp\base\TcpServer.javapackagecom.fly.tcp.base;importjava.io.DataInputStream;importjava.io.DataOutputStream;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.net.ServerSocket;importjava.net.Socket;importjava.util.HashMap;importjava.util.Map;importjava.util.Observable;importorg.apache.commons.lang3.StringUtils;importorg.apache.commons.lang3.time.DateFormatUtils;importcom.fly.core.utils.SpringContextUtils;importcom.fly.tcp.service.ResultCallBack;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassTcpServerimplementsRunnable{privateServerSocket serverSocket;privateboolean isServerCoreRun =false;privateMap<String,NewClient> allClient =newHashMap<>();publicbooleanstartServer(String ip,int port){try{
serverSocket =newServerSocket();
serverSocket.bind(newInetSocketAddress(ip, port));
isServerCoreRun =true;}catch(IOException e){
log.error(e.getMessage());
isServerCoreRun =false;}return isServerCoreRun;}/**
* 关闭服务
*
* #1 断开与所有客户端的连接,并将客户端容器中的所有已连接的客户端清空。 #2 关闭服务器套接字
*/publicvoidcloseServer(){try{
isServerCoreRun =false;for(Map.Entry<String,NewClient> all :this.allClient.entrySet()){
all.getValue().isNewClientRun =false;
all.getValue().socket.close();}
allClient.clear();
serverSocket.close();}catch(IOException e){
log.error(e.getMessage());}}/**
* 向客户端发送报文
*/publicvoidsendMsg(String clientName,String msg){if(allClient.containsKey(clientName)){
allClient.get(clientName).sendMsg(msg);}}@Overridepublicvoidrun(){try{
log.info("TcpServer will start");while(isServerCoreRun){// 阻塞式等待客户端连接Socket socket = serverSocket.accept();String clientName =newDataInputStream(socket.getInputStream()).readUTF();String clientIP = socket.getInetAddress().getHostAddress();int clientPort = socket.getPort();String clientConnectDateTime =DateFormatUtils.format(System.currentTimeMillis(),"yyyy-MM-dd HH:mm:ss");NewClient newClient =newNewClient(socket, clientName, clientIP, clientPort, clientConnectDateTime);
allClient.put(clientName, newClient);
log.info("**** add new client ===> {}", allClient.keySet());newThread(newClient).start();}}catch(IOException e){
log.error(e.getMessage());}}/**
* 客户端线程(被观察者)
*/classNewClientextendsObservableimplementsRunnable{// 客户端套接字privateSocket socket;// 数据输入流privateDataInputStream dataInputStream;// 数据输出流privateDataOutputStream dataOutputStream;// 客户端运行(收、发报文)状态privateboolean isNewClientRun =true;// 客户端的名称privateString clientName;// 客户端的IP地址privateString clientIP;// 构造方法初始化成员属性publicNewClient(Socket socket,String clientName,String clientIP,int clientPort,String clientConnectDateTime){this.socket = socket;this.clientName = clientName;this.clientIP = clientIP;try{// 注册观察者addObserver(SpringContextUtils.getBean(ResultCallBack.class));// 创建客户端数据输入、输出流
dataInputStream =newDataInputStream(socket.getInputStream());
dataOutputStream =newDataOutputStream(socket.getOutputStream());}catch(IOException e){
log.error(e.getMessage());closeCurrentClient();}}@Overridepublicvoidrun(){try{// 客户端在运行才能收发报文while(this.isNewClientRun){// 获取到客户端发送的报文String msg = dataInputStream.readUTF();if(StringUtils.isNotBlank(msg)){
log.info("clientName: {}, clientIP: {}, send msg ===> {}", clientName, clientIP, msg);}// 通知观察者处理if(StringUtils.startsWith(msg,"hello Command")){setChanged();notifyObservers(msg);continue;}// 向客户端传送数据int index =0;for(String key : allClient.keySet()){
index++;if(StringUtils.equals(key, clientName)){
allClient.get(key).sendMsg("from server "+ msg +StringUtils.repeat("-----", index));}}}}catch(IOException e){
log.error(e.getMessage());closeCurrentClient();}}/**
* 断开当前客户端的连接释放资源
*/publicvoidcloseCurrentClient(){try{// 结束客户端的运行状态
isNewClientRun =false;// 断开数据输出出流if(dataOutputStream !=null){
dataOutputStream.close();}// 断开数据输入出流if(dataInputStream !=null){
dataInputStream.close();}// 断开客户端套解析if(socket !=null){
socket.close();}// 将该客户端从客户端容器中删除
allClient.remove(clientName);
log.info("**** remove client ===> {}", allClient.keySet());}catch(IOException e){
log.error(e.getMessage());}}/**
* 发送报文
*/publicvoidsendMsg(String msg){try{// 发送报文
dataOutputStream.writeUTF(msg);// 清空报文缓存
dataOutputStream.flush();}catch(IOException e){
log.error(e.getMessage());closeCurrentClient();}}}}//goto src\main\java\com\fly\tcp\DemoController.javapackagecom.fly.tcp;importjava.io.IOException;importjava.util.concurrent.Callable;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importorg.springframework.web.context.request.async.DeferredResult;importorg.springframework.web.context.request.async.WebAsyncTask;importcom.fly.core.JsonResult;importcom.fly.core.utils.JsonBeanUtils;importcom.fly.tcp.base.InitManager;importcom.fly.tcp.entity.Command;importcom.fly.tcp.service.ResultCallBack;importio.swagger.annotations.Api;importio.swagger.annotations.ApiImplicitParam;importio.swagger.annotations.ApiOperation;importlombok.extern.slf4j.Slf4j;@Slf4j@Api(tags ="tcp应用接口")@RestController@RequestMapping("/tcp")publicclassDemoController{@AutowiredInitManager initManager;@AutowiredResultCallBack resultCallBack;@AutowiredThreadPoolTaskExecutor executor;@ApiOperation("client发消息")@ApiImplicitParam(name ="msg", example ="忽如一夜春风来", required =true)@PostMapping("/client/sendMsg")publicStringsendMsgFromClient(String msg){
initManager.getClient().sendMsg(msg);return msg;}/**
* 客户端业务处理结果更多是通过回调实现的
*
* @param cmd
* @return
* @throws IOException
*/@ApiOperation("server发消息Callable")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/callable")publicCallable<JsonResult<?>>sendMsgFromServer01(String msg)throwsIOException{return()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <50; i++){
result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){returnJsonResult.success(result,"获取处理结果成功");}else{TimeUnit.MILLISECONDS.sleep(100);}}returnJsonResult.error("响应超时,请重试");};}/**
* 客户端业务处理结果更多是通过回调实现的(WebAsyncTask升级版callable,增加超时异常等处理)
*
* @param cmd
* @return
* @throws IOException
*/@ApiOperation("server发消息webAsyncTask")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg/webAsyncTask")publicWebAsyncTask<String>sendMsgFromServer02(String msg)throwsIOException{WebAsyncTask<String> webAsyncTask =newWebAsyncTask<String>(2000L, executor,()->{Command command =newCommand().setClientId(initManager.getClient().getClientName()).setText(msg);
initManager.getServer().sendMsg(initManager.getClient().getClientName(),JsonBeanUtils.beanToJson(command));String result;for(int i =0; i <100; i++){
result = resultCallBack.queryResult(command.getCommandId());if(StringUtils.isNotBlank(result)){return result;}else{TimeUnit.MILLISECONDS.sleep(50);}}return"响应超时,请重试";});
webAsyncTask.onCompletion(()-> log.info("调用完成"));
webAsyncTask.onError(()->{
log.error("业务处理出错");return"error";});
webAsyncTask.onTimeout(()->{
log.info("业务处理超时");return"Time Out";});return webAsyncTask;}/**
* 客户端业务处理结果更多是通过回调实现的
*
* @param cmd
* @return
* @throws IOException
* @throws TimeoutException
* @throws ExecutionException
* @throws InterruptedException
*/@ApiOperation("server发消息")@ApiImplicitParam(name ="msg", example ="千树万树梨花开", required =true)@PostMapping("/server/sendMsg")publicDeferredResult<String>sendMsgFromServer(String msg)throwsIOException,InterruptedException,ExecutionException,TimeoutException{String clientName = initManager.getClient().getClientName();Command command =newCommand().setClientId(clientName).setText(msg);
initManager.getServer().sendMsg(clientName,JsonBeanUtils.beanToJson(command));// 异步返回结果DeferredResult<String> deferredResult =newDeferredResult<>(20000L,"失败");
deferredResult.onCompletion(()-> log.info("调用完成"));
deferredResult.onTimeout(()->{
log.info("调用超时");
deferredResult.setResult("调用超时");});
resultCallBack.processResult(deferredResult, command.getCommandId());return deferredResult;}}//goto src\main\java\com\fly\tcp\entity\Command.javapackagecom.fly.tcp.entity;importjava.util.UUID;importlombok.Data;importlombok.experimental.Accessors;@Data@Accessors(chain =true)publicclassCommand{/**
* 命令id
*/privateString commandId = UUID.randomUUID().toString();/**
* 客户端id
*/privateString clientId;/**
* 具体命令内容
*/privateString text;}//goto src\main\java\com\fly\tcp\service\ResultCallBack.javapackagecom.fly.tcp.service;importjava.io.IOException;importjava.util.Map;importjava.util.Observable;importjava.util.Observer;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.commons.lang3.StringUtils;importorg.springframework.stereotype.Service;importorg.springframework.web.context.request.async.DeferredResult;importcom.fly.core.utils.JsonBeanUtils;importcom.fly.tcp.entity.Command;importlombok.extern.slf4j.Slf4j;/**
* 结果回调处理
*/@Slf4j@ServicepublicclassResultCallBackimplementsObserver{Map<String,String> result =newConcurrentHashMap<>();ExecutorService executorService =Executors.newFixedThreadPool(10);/**
* 观察者接受数据处理
*/@Overridepublicvoidupdate(Observable observable,Object msg){
log.info("### accept #### {}", msg);try{if(msg instanceofString){String json =StringUtils.substringAfter((String)msg,"hello Command!");Command command =JsonBeanUtils.jsonToBean(json,Command.class);
log.info("{}", command);
result.put(command.getCommandId(), json);}}catch(IOException e){
log.error(e.getMessage());}}/**
* 获取命令处理结果
*
* @param commandId
* @return
*/publicStringqueryResult(String commandId){return result.get(commandId);}/**
* 业务线程处理业务,DeferredResult可以通过任何线程来计算返回一个结果
*
* @param deferredResult
* @param commandId
* @throws TimeoutException
* @throws ExecutionException
* @throws InterruptedException
*/publicvoidprocessResult(DeferredResult<String> deferredResult,String commandId)throwsInterruptedException,ExecutionException,TimeoutException{processResult02(deferredResult, commandId);}/**
* 无超时设置,不推荐
*
* @param deferredResult
* @param commandId
*/voidprocessResult01(DeferredResult<String> deferredResult,String commandId){
executorService.execute(()->{while(!result.containsKey(commandId)){try{
log.info("waitting......");TimeUnit.MILLISECONDS.sleep(20);}catch(InterruptedException e){}}
deferredResult.setResult(result.get(commandId));});}/**
* 有超时设置(推荐)
*
*
* @param deferredResult
* @param commandId
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/privatevoidprocessResult02(DeferredResult<String> deferredResult,String commandId)throwsInterruptedException,ExecutionException,TimeoutException{Future<String> future = executorService.submit(()->{while(!result.containsKey(commandId)){
log.info("waitting......");TimeUnit.MILLISECONDS.sleep(20);}return result.get(commandId);});String result = future.get(1000L,TimeUnit.MILLISECONDS);
deferredResult.setResult(result);}}//goto src\main\resources\application-dev.yml
#设置日志级别
logging:
level:
org:
springframework:
web: INFO
//goto src\main\resources\application.yml
server:
port:8081
servlet:
context-path:/
session:
timeout:1800
spring:
servlet:
multipart:
max-file-size:10MB
max-request-size:100MB
profiles:
active:- test
#必须启用下面配置,否则接口排序失效
knife4j:
enable:true
#设置日志级别
logging:
level:
root: INFO
//goto src\test\java\com\fly\controller\ApiControllerTest.javapackagecom.fly.controller;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.Arrays;importorg.apache.commons.io.IOUtils;importorg.junit.jupiter.api.Test;importorg.springframework.core.io.ClassPathResource;importorg.springframework.core.io.Resource;importorg.springframework.http.HttpEntity;importorg.springframework.http.HttpHeaders;importorg.springframework.http.MediaType;importorg.springframework.http.ResponseEntity;importorg.springframework.http.client.SimpleClientHttpRequestFactory;importorg.springframework.web.client.RestTemplate;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassApiControllerTest{privatestaticRestTemplate restTemplate;static{// 配置proxy,connectTimeout,readTimeout等参数SimpleClientHttpRequestFactory requestFactory =newSimpleClientHttpRequestFactory();
requestFactory.setConnectTimeout(1000);
requestFactory.setReadTimeout(1000);
restTemplate =newRestTemplate(requestFactory);}@TestpublicvoidtestJsonRequestBody2()throwsIOException{String url ="http://192.168.114.250:8080/api/post";HttpHeaders headers =newHttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));Resource resource =newClassPathResource("data/json");String text =IOUtils.toString(resource.getInputStream(),StandardCharsets.UTF_8);HttpEntity<String> requestEntity =newHttpEntity<>(text, headers);ResponseEntity<String> responseEntity = restTemplate.postForEntity(url, requestEntity,String.class);
log.info("****** ResponseEntity body: {}", responseEntity.getBody());}}//goto src\test\java\com\fly\executor\ExecutorServiceTest.javapackagecom.fly.executor;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Future;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.TimeoutException;importorg.apache.commons.lang3.concurrent.BasicThreadFactory;importorg.junit.jupiter.api.Test;importorg.springframework.http.MediaType;importorg.springframework.web.reactive.function.client.WebClient;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassExecutorServiceTest{privateWebClient webClient =WebClient.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build();privateExecutorService executorService =newThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>(),newBasicThreadFactory.Builder().namingPattern("t-%03d").daemon(true).priority(Thread.MAX_PRIORITY).build());/**
* 带超时条件线程池调用
*
* @throws TimeoutException
* @throws ExecutionException
* @throws InterruptedException
*
* @throws IOException
*/@TestpublicvoidtestTimeout()throwsInterruptedException,ExecutionException,TimeoutException{String ip ="192.168.0.1";Future<String> future = executorService.submit(()->{return webClient.get().uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port("2375").path("/_ping").build())// URI.acceptCharset(StandardCharsets.UTF_8).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(String.class).block();});
log.info("return: {}", future.get(1000L,TimeUnit.MILLISECONDS));}}//goto src\test\resources\log4j2.xml<?xml version="1.0" encoding="UTF-8"?><Configuration status="WARN"><Properties><Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property><Property name="LOG_LEVEL_PATTERN">%5p</Property><Property name="LOG_DATEFORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property><Property name="CONSOLE_LOG_PATTERN">%clr{%d{${LOG_DATEFORMAT_PATTERN}}}{faint}%clr{${LOG_LEVEL_PATTERN}}%clr{%pid}{magenta}%clr{---}{faint}%clr{[%15.15t]}{faint}%clr{%-40.40c{1.}}{cyan}%clr{:}{faint}%m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}</Property><Property name="FILE_LOG_PATTERN">%d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN}%pid ---[%t]%-40.40c{1.}:%m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}</Property></Properties><Appenders><Console name="Console" target="SYSTEM_OUT" follow="true"><PatternLayout pattern="${sys:CONSOLE_LOG_PATTERN}"/></Console></Appenders><Loggers><Logger name="org.apache.catalina.startup.DigesterFactory" level="error"/><Logger name="org.apache.catalina.util.LifecycleBase" level="error"/><Logger name="org.apache.coyote.http11.Http11NioProtocol" level="warn"/><logger name="org.apache.sshd.common.util.SecurityUtils" level="warn"/><Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="warn"/><Logger name="org.eclipse.jetty.util.component.AbstractLifeCycle" level="error"/><Logger name="org.hibernate.validator.internal.util.Version" level="warn"/><logger name="org.springframework.boot.actuate.endpoint.jmx" level="warn"/><Root level="info"><AppenderRef ref="Console"/></Root></Loggers></Configuration>
六、运行界面


具体的部署和测试不再详细阐述。
七. 主要技术点
- 观察者模式
- 命令模式
- 线程池应用
- 异步Callable、WebAsyncTask、DeferredResult
**
有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!
**
-over-
版权归原作者 爱码少年 00fly.online 所有, 如有侵权,请联系我们删除。