什么是 WebClient?
在 Spring Boot 中,WebClient 是 Spring WebFlux 提供的一个非阻塞、响应式的 HTTP 客户端,用于与 RESTful 服务或其他 HTTP 服务交互。相比于传统的 RestTemplate,WebClient 更加现代化,具有异步和非阻塞的特点,适合高性能、高并发的应用场景。
WebClient 的特点
非阻塞 I/O:适用于响应式编程模型,能高效处理大量并发请求。
功能强大:支持同步和异步调用,处理复杂的 HTTP 请求和响应,包括流式数据。
灵活的配置:可自定义超时、请求拦截器、认证方式等。
响应式编程支持:返回 Mono 或 Flux,与 Spring WebFlux 的响应式编程模型无缝集成。
引入依赖
在使用 WebClient 之前,需要确保 Spring Boot 项目已包含相关依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
配置及使用 WebClient
现在有以下服务
- service1服务:http://localhost:8081/
- service2服务:http://localhost:8082/
- common服务:http://localhost:8079/
创建 WebClientConfig 配置类,为 service1 和 service2 配置独立的 WebClient。
package com.example.common.config;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
/**
* 配置 WebClient,支持基础功能(独立 WebClient 实例)和高级特性(超时、拦截器、内存限制)。
*/
@Configuration
public class WebClientConfig {
/**
* 配置 WebClient,用于调用 service1(http://localhost:8081)
*
* @param builder WebClient.Builder 实例
* @return 针对 service1 的 WebClient 实例
*/
@Bean(name = "service1WebClient")
public WebClient service1WebClient(WebClient.Builder builder) {
return builder
.baseUrl("http://localhost:8081") // 配置 service1 的基本 URL
.defaultHeader("Content-Type", "application/json") // 设置默认请求头
.exchangeStrategies(
ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024)) // 设置最大内存限制为 16MB
.build())
.filter(logRequest()) // 添加请求日志拦截器
.filter(logResponse()) // 添加响应日志拦截器
.build();
}
/**
* 配置 WebClient,用于调用 service2(http://localhost:8082)
*
* @param builder WebClient.Builder 实例
* @return 针对 service2 的 WebClient 实例
*/
@Bean(name = "service2WebClient")
public WebClient service2WebClient(WebClient.Builder builder) {
return builder
.baseUrl("http://localhost:8082") // 配置 service2 的基本 URL
.defaultHeader("Content-Type", "application/json") // 设置默认请求头
.filter(logRequest()) // 添加请求日志拦截器
.filter(logResponse()) // 添加响应日志拦截器
.build();
}
/**
* 提供全局的 WebClient.Builder 配置,支持超时和高级功能。
*
* @return 配置好的 WebClient.Builder
*/
@Bean
public WebClient.Builder webClientBuilder() {
// 配置 TCP 客户端,设置连接超时、读超时和写超时
TcpClient tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时 5秒
.doOnConnected(connection ->
connection.addHandlerLast(new ReadTimeoutHandler(5)) // 读超时 5秒
.addHandlerLast(new WriteTimeoutHandler(5))); // 写超时 5秒
// 使用配置的 TcpClient 创建 HttpClient
HttpClient httpClient = HttpClient.from(tcpClient);
// 创建 WebClient.Builder 并配置 HttpClient 和拦截器
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient)) // 配置 HttpClient
.filter(logRequest()) // 请求日志拦截器
.filter(logResponse()); // 响应日志拦截器
}
/**
* 请求日志拦截器:记录请求的详细信息(方法和 URL)
*
* @return ExchangeFilterFunction 拦截器
*/
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(request -> {
System.out.println("Request: " + request.method() + " " + request.url());
return Mono.just(request);
});
}
/**
* 响应日志拦截器:记录响应的状态码
*
* @return ExchangeFilterFunction 拦截器
*/
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(response -> {
System.out.println("Response status: " + response.statusCode());
return Mono.just(response);
});
}
}
service1相应的接口
package cloud.service1.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* Service1 的控制器类,用于处理与API相关的请求.
* 该类被Spring框架管理,作为处理HTTP请求的一部分.
*/
@RestController
@RequestMapping("/api/service1")
public class Service1Controller {
/**
* 获取Service1的数据信息.
*
* @return 包含服务信息的映射,包括服务名称和问候消息.
*/
@GetMapping("/data")
public Map<String, String> getData() {
// 返回一个不可变的映射,包含服务名称和问候消息
return Map.of("service", "service1", "message", "Hello from Service1");
}
}
service2相应的接口
package cloud.service2.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* Service2的控制器类,用于处理与Service2相关的HTTP请求.
* 该类被Spring框架管理,作为处理RESTful请求的控制器.
*/
@RestController
@RequestMapping("/api/service2")
public class Service2Controller {
/**
* 处理GET请求到/api/service2/info,返回Service2的信息.
*
* @return 包含服务信息的Map,包括服务名称和欢迎消息.
*/
@GetMapping("/info")
public Map<String, String> getInfo() {
return Map.of("service", "service2", "message", "Hello from Service2");
}
}
服务调用实现
package com.example.common.service;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
/**
* CommonService 类提供了对其他服务进行调用的方法
* 它通过 WebClient 实例与 service1 和 service2 进行通信
*/
@Service
public class CommonService {
// 用于与 service1 通信的 WebClient 实例
private final WebClient service1WebClient;
// 用于与 service2 通信的 WebClient 实例
private final WebClient service2WebClient;
/**
* 构造函数注入两个 WebClient 实例
*
* @param service1WebClient 用于 service1 的 WebClient
* @param service2WebClient 用于 service2 的 WebClient
*/
public CommonService(
@Qualifier("service1WebClient") WebClient service1WebClient,
@Qualifier("service2WebClient") WebClient service2WebClient) {
this.service1WebClient = service1WebClient;
this.service2WebClient = service2WebClient;
}
/**
* 调用 service1 的接口
*
* @return 来自 service1 的数据
*/
public Mono<String> callService1() {
// 通过 service1WebClient 调用 service1 的 API,并处理可能的错误
return service1WebClient.get()
.uri("/api/service1/data")
.retrieve()
.bodyToMono(String.class)
.onErrorResume(e -> {
// 错误处理:打印错误信息并返回错误提示
System.err.println("Error calling service1: " + e.getMessage());
return Mono.just("Error calling service1");
});
}
/**
* 调用 service2 的接口
*
* @return 来自 service2 的数据
*/
public Mono<String> callService2() {
// 通过 service2WebClient 调用 service2 的 API,并处理可能的错误
return service2WebClient.get()
.uri("/api/service2/info")
.retrieve()
.bodyToMono(String.class)
.onErrorResume(e -> {
// 错误处理:打印错误信息并返回错误提示
System.err.println("Error calling service2: " + e.getMessage());
return Mono.just("Error calling service2");
});
}
}
package com.example.common.controller;
import com.example.common.service.CommonService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
* 通用控制器类,处理与通用服务相关的API请求
*/
@RestController
@RequestMapping("/api/common")
public class CommonController {
// 注入通用服务接口,用于调用具体的服务方法
private final CommonService commonService;
/**
* 构造函数注入CommonService实例
*
* @param commonService 通用服务接口实例
*/
public CommonController(CommonService commonService) {
this.commonService = commonService;
}
/**
* 调用 service1 的接口
*
* @return service1 的响应数据
*/
@GetMapping("/service1")
public Mono<String> getService1Data() {
return commonService.callService1();
}
/**
* 调用 service2 的接口
*
* @return service2 的响应数据
*/
@GetMapping("/service2")
public Mono<String> getService2Info() {
return commonService.callService2();
}
}
测试接口
优化实践
将上述代码进一步优化和整合以确保代码可维护性和高效性。
package com.example.common.config;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
/**
* 配置 WebClient 的各类设置和日志记录
*/
@Configuration
public class WebClientConfig {
/**
* 全局 WebClient.Builder 配置
*
* @return 配置好的 WebClient.Builder
*/
@Bean
public WebClient.Builder webClientBuilder() {
// 配置 TCP 客户端的连接、读取、写入超时时间
TcpClient tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(5)) // 读超时
.addHandlerLast(new WriteTimeoutHandler(5))); // 写超时
// 将 TCP 客户端配置应用到 HTTP 客户端
HttpClient httpClient = HttpClient.from(tcpClient);
// 配置 WebClient 构建器,包括 HTTP 连接器、交换策略、请求和响应日志
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)) // 内存限制
.build())
.filter(logRequest()) // 请求日志
.filter(logResponse()); // 响应日志
}
/**
* 针对 service1 的 WebClient 配置
*
* @param builder 全局配置的 WebClient.Builder
* @return 配置好的 WebClient 实例
*/
@Bean(name = "service1WebClient")
public WebClient service1WebClient(WebClient.Builder builder) {
// 为 service1 配置特定的 base URL 和默认头部
return builder
.baseUrl("http://localhost:8081")
.defaultHeader("Content-Type", "application/json")
.build();
}
/**
* 针对 service2 的 WebClient 配置
*
* @param builder 全局配置的 WebClient.Builder
* @return 配置好的 WebClient 实例
*/
@Bean(name = "service2WebClient")
public WebClient service2WebClient(WebClient.Builder builder) {
// 为 service2 配置特定的 base URL 和默认头部
return builder
.baseUrl("http://localhost:8082")
.defaultHeader("Content-Type", "application/json")
.build();
}
/**
* 请求日志拦截器
*
* @return 记录请求日志的 ExchangeFilterFunction
*/
private ExchangeFilterFunction logRequest() {
// 拦截请求并打印请求方法和URL
return ExchangeFilterFunction.ofRequestProcessor(request -> {
System.out.println("Request: " + request.method() + " " + request.url());
return Mono.just(request);
});
}
/**
* 响应日志拦截器
*
* @return 记录响应日志的 ExchangeFilterFunction
*/
private ExchangeFilterFunction logResponse() {
// 拦截响应并打印响应状态码
return ExchangeFilterFunction.ofResponseProcessor(response -> {
System.out.println("Response status: " + response.statusCode());
return Mono.just(response);
});
}
}
package com.example.common.service;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.Map;
/**
* CommonService 类提供了调用两个不同服务的公共方法,并合并其结果
*/
@Service
public class CommonService {
// service1 的 WebClient 实例
private final WebClient service1WebClient;
// service2 的 WebClient 实例
private final WebClient service2WebClient;
/**
* 构造函数注入 WebClient 实例
*
* @param service1WebClient service1 的 WebClient
* @param service2WebClient service2 的 WebClient
*/
public CommonService(WebClient service1WebClient, WebClient service2WebClient) {
this.service1WebClient = service1WebClient;
this.service2WebClient = service2WebClient;
}
/**
* 异步调用 service1 和 service2,并返回合并结果(JSON 格式)
*
* @return 包含两个服务响应的 Mono 对象
*/
public Mono<Map<String, Map<String, String>>> callServicesAsync() {
// 调用 service1,返回 Map 响应
Mono<Map<String, String>> service1Response = service1WebClient.get()
// 设置请求的URI
.uri("/api/service1/data")
// 检索响应
.retrieve()
// 处理错误状态
.onStatus(
// 检查状态是否为4xx或5xx
status -> status.is4xxClientError() || status.is5xxServerError(),
// 如果是,创建一个运行时异常
response -> Mono.error(new RuntimeException("Service1 Error: " + response.statusCode()))
)
// 将响应体转换为Mono<Map<String, String>>
.bodyToMono(new ParameterizedTypeReference<Map<String, String>>() {})
// 处理错误
.onErrorResume(e -> {
// 打印错误信息
System.err.println("Error calling service1: " + e.getMessage());
// 返回一个包含错误信息的Map
return Mono.just(Map.of("error", "Fallback response for service1"));
});
// 调用 service2,返回 Map 响应
Mono<Map<String, String>> service2Response = service2WebClient.get()
// 设置请求的URI
.uri("/api/service2/info")
// 检索响应
.retrieve()
// 处理错误状态
.onStatus(
// 检查状态是否为4xx或5xx
status -> status.is4xxClientError() || status.is5xxServerError(),
// 如果是,创建一个运行时异常
response -> Mono.error(new RuntimeException("Service2 Error: " + response.statusCode()))
)
// 将响应体转换为Mono<Map<String, String>>
.bodyToMono(new ParameterizedTypeReference<Map<String, String>>() {})
// 处理错误
.onErrorResume(e -> {
// 打印错误信息
System.err.println("Error calling service2: " + e.getMessage());
// 返回一个包含错误信息的Map
return Mono.just(Map.of("error", "Fallback response for service2"));
});
// 合并两个响应
return Mono.zip(service1Response, service2Response, (response1, response2) -> Map.of(
"service1", response1,
"service2", response2
))
// 处理合并过程中的错误
.onErrorResume(e -> {
// 打印错误信息
System.err.println("Error combining responses: " + e.getMessage());
// 返回一个包含错误信息的Map
return Mono.just(Map.of(
"error", Map.of(
"status", "error",
"message", e.getMessage() // 捕获异常并输出信息
)
));
});
}
}
package com.example.common.controller;
import com.example.common.service.CommonService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.Map;
@RestController
@RequestMapping("/api/common")
public class CommonController {
private final CommonService commonService;
public CommonController(CommonService commonService) {
this.commonService = commonService;
}
/**
* 提供异步调用的 REST 接口,返回 JSON 格式的数据
*/
@GetMapping("/service")
public Mono<Map<String, Map<String, String>>> getServicesData() {
System.out.println("Received request for combined service data");
return commonService.callServicesAsync()
.doOnSuccess(response -> System.out.println("Successfully retrieved data: " + response))
.doOnError(error -> System.err.println("Error occurred while fetching service data: " + error.getMessage()));
}
}
测试接口
结语
WebClient 是一个功能强大且灵活的非阻塞 HTTP 客户端,特别适合在高并发和响应式编程场景下使用,是替代传统 RestTemplate 的优秀选择。在实际项目中,通过合理配置(如超时、连接池)和优化(如负载均衡、重试机制),可以显著提高服务间通信的效率和可靠性,降低延迟和资源消耗。
同时,结合 Spring WebFlux 提供的响应式编程支持,WebClient 能够更好地应对微服务架构中复杂的通信需求,成为开发现代分布式系统的重要工具。
版权归原作者 Onlooker﹒ 所有, 如有侵权,请联系我们删除。