0


springboot3+r2dbc——响应式编程实践

在这里插入图片描述

Spring boot3

已经

M1

了,最近群佬们也开始蠢蠢欲动的开始整活

Reactive
  • Spring Boot3
    
    ,跟着大家的步伐,我也来整一篇工程入门,我们将用
    java17
    
  • Spring Boot3
    
  • r2dbc
    
  • Reactive
    
    栈来讲述,欢迎大家来讨论。(关于响应式,请大家异步到之前的文章里,有详细介绍。)

r2dbc

Reactor

还有基于其之上的

Spring WebFlux

框架。包括

vert.x

rxjava

等等

reactive

技术。我们实际上在应用层已经有很多优秀的响应式处理框架。

但是有一个问题就是所有的框架都需要获取底层的数据,而基本上关系型数据库的底层读写都还是同步的。

为了解决这个问题,出现了两个标准,一个是

oracle

提出的

ADBC

(Asynchronous Database Access API),另一个就是

Pivotal

提出的

R2DBC

(Reactive Relational Database Connectivity)。

R2DBC

是基于

Reactive Streams

标准来设计的。通过使用

R2DBC

,你可以使用

reactive API

来操作数据。

同时

R2DBC

只是一个开放的标准,而各个具体的数据库连接实现,需要实现这个标准。

今天我们以

r2dbc-h2

为例,讲解一下

r2dbc

Spring webFlux

中的使用。

工程依赖

以下是

pom.xml

清单

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.0-M1</version><relativePath/><!-- lookup parent from repository --></parent><groupId>wang.datahub</groupId><artifactId>springboot3demo</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot3demo</name><description>Demo project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-rest</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-groovy-templates</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-hateoas</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId></dependency><dependency><groupId>io.r2dbc</groupId><artifactId>r2dbc-h2</artifactId></dependency><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><!--            <version>3.4.14</version>--><!--            <scope>compile</scope>--></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build><repositories><repository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></repository><repository><id>spring-snapshots</id><name>Spring Snapshots</name><url>https://repo.spring.io/snapshot</url><releases><enabled>false</enabled></releases></repository></repositories><pluginRepositories><pluginRepository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></pluginRepository><pluginRepository><id>spring-snapshots</id><name>Spring Snapshots</name><url>https://repo.spring.io/snapshot</url><releases><enabled>false</enabled></releases></pluginRepository></pluginRepositories></project>

配置文件

这里我们只配置了r2dbc链接信息

r2dbc:url: r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE

配置类

用于配置默认链接,创建初始化数据

packagewang.datahub.springboot3demo.config;importio.netty.util.internal.StringUtil;importio.r2dbc.spi.ConnectionFactories;importio.r2dbc.spi.ConnectionFactory;importio.r2dbc.spi.ConnectionFactoryOptions;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.CommandLineRunner;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importreactor.core.publisher.Flux;importstaticio.r2dbc.spi.ConnectionFactoryOptions.*;@Configuration@ConfigurationProperties(prefix ="r2dbc")publicclassDBConfig{privateString url;privateString user;privateString password;publicStringgetUrl(){return url;}publicvoidsetUrl(String url){this.url = url;}publicStringgetUser(){return user;}publicvoidsetUser(String user){this.user = user;}publicStringgetPassword(){return password;}publicvoidsetPassword(String password){this.password = password;}@BeanpublicConnectionFactoryconnectionFactory(){System.out.println("url ==> "+url);ConnectionFactoryOptions baseOptions =ConnectionFactoryOptions.parse(url);ConnectionFactoryOptions.Builder ob =ConnectionFactoryOptions.builder().from(baseOptions);if(!StringUtil.isNullOrEmpty(user)){
            ob = ob.option(USER, user);}if(!StringUtil.isNullOrEmpty(password)){
            ob = ob.option(PASSWORD, password);}returnConnectionFactories.get(ob.build());}@BeanpublicCommandLineRunnerinitDatabase(ConnectionFactory cf){return(args)->Flux.from(cf.create()).flatMap(c ->Flux.from(c.createBatch().add("drop table if exists Users").add("create table Users("+"id IDENTITY(1,1),"+"firstname varchar(80) not null,"+"lastname varchar(80) not null)").add("insert into Users(firstname,lastname)"+"values('Jacky','Li')").add("insert into Users(firstname,lastname)"+"values('Doudou','Li')").add("insert into Users(firstname,lastname)"+"values('Maimai','Li')").execute()).doFinally((st)-> c.close())).log().blockLast();}}

bean

创建用户bean

package wang.datahub.springboot3demo.bean;

import org.springframework.data.annotation.Id;

public class Users {
    @Id
    private Long id;
    private String firstname;
    private String lastname;

    public Users(){}

    public Users(Long id, String firstname, String lastname) {
        this.id = id;
        this.firstname = firstname;
        this.lastname = lastname;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getFirstname() {
        return firstname;
    }

    public void setFirstname(String firstname) {
        this.firstname = firstname;
    }

    public String getLastname() {
        return lastname;
    }

    public void setLastname(String lastname) {
        this.lastname = lastname;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", firstname='" + firstname + '\'' +
                ", lastname='" + lastname + '\'' +
                '}';
    }}

DAO

dao代码清单如下,包含查询列表、按id查询,以及创建用户等操作

packagewang.datahub.springboot3demo.dao;importio.r2dbc.spi.Connection;importio.r2dbc.spi.ConnectionFactory;importorg.springframework.data.r2dbc.core.R2dbcEntityTemplate;importorg.springframework.data.relational.core.query.Query;importorg.springframework.stereotype.Component;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importwang.datahub.springboot3demo.bean.Users;importstaticorg.springframework.data.r2dbc.query.Criteria.where;importstaticorg.springframework.data.relational.core.query.Query.query;@ComponentpublicclassUsersDao{privateConnectionFactory connectionFactory;privateR2dbcEntityTemplate template;publicUsersDao(ConnectionFactory connectionFactory){this.connectionFactory = connectionFactory;this.template =newR2dbcEntityTemplate(connectionFactory);}publicMono<Users>findById(long id){returnthis.template.selectOne(query(where("id").is(id)),Users.class);//        return Mono.from(connectionFactory.create())//                .flatMap(c -> Mono.from(c.createStatement("select id,firstname,lastname from Users where id = $1")//                                .bind("$1", id)//                                .execute())//                        .doFinally((st) -> close(c)))//                .map(result -> result.map((row, meta) ->//                        new Users(row.get("id", Long.class),//                                row.get("firstname", String.class),//                                row.get("lastname", String.class))))//                .flatMap( p -> Mono.from(p));}publicFlux<Users>findAll(){returnthis.template.select(Users.class).all();//        return Mono.from(connectionFactory.create())//                .flatMap((c) -> Mono.from(c.createStatement("select id,firstname,lastname from users")//                                .execute())//                        .doFinally((st) -> close(c)))//                .flatMapMany(result -> Flux.from(result.map((row, meta) -> {//                    Users acc = new Users();//                    acc.setId(row.get("id", Long.class));//                    acc.setFirstname(row.get("firstname", String.class));//                    acc.setLastname(row.get("lastname", String.class));//                    return acc;//                })));}publicMono<Users>createAccount(Users account){returnMono.from(connectionFactory.create()).flatMap(c ->Mono.from(c.beginTransaction()).then(Mono.from(c.createStatement("insert into Users(firstname,lastname) values($1,$2)").bind("$1", account.getFirstname()).bind("$2", account.getLastname()).returnGeneratedValues("id").execute())).map(result -> result.map((row, meta)->newUsers(row.get("id",Long.class),
                                        account.getFirstname(),
                                        account.getLastname()))).flatMap(pub ->Mono.from(pub)).delayUntil(r -> c.commitTransaction()).doFinally((st)-> c.close()));}private<T>Mono<T>close(Connection connection){returnMono.from(connection.close()).then(Mono.empty());}}

controller

controller代码清单如下,包含了查询列表、按id查询,以及创建用户等操作

packagewang.datahub.springboot3demo.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.http.HttpStatus;importorg.springframework.http.ResponseEntity;importorg.springframework.stereotype.Controller;importorg.springframework.web.bind.annotation.*;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importwang.datahub.springboot3demo.bean.Users;importwang.datahub.springboot3demo.dao.UsersDao;@RestControllerpublicclassUsersController{@AutowiredprivatefinalUsersDao usersDao;publicUsersController(UsersDao usersDao){this.usersDao = usersDao;}@GetMapping("/users/{id}")publicMono<ResponseEntity<Users>>getUsers(@PathVariable("id")Long id){return usersDao.findById(id).map(acc ->newResponseEntity<>(acc,HttpStatus.OK)).switchIfEmpty(Mono.just(newResponseEntity<>(null,HttpStatus.NOT_FOUND)));}@GetMapping("/users")publicFlux<Users>getAllAccounts(){return usersDao.findAll();}@PostMapping("/createUser")publicMono<ResponseEntity<Users>>createUser(@RequestBodyUsers user){return usersDao.createAccount(user).map(acc ->newResponseEntity<>(acc,HttpStatus.CREATED)).log();}}

启动类清单:

packagewang.datahub.springboot3demo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.boot.context.properties.EnableConfigurationProperties;importwang.datahub.springboot3demo.config.DBConfig;@SpringBootApplication@EnableConfigurationProperties(DBConfig.class)publicclassWebFluxR2dbcApp{publicstaticvoidmain(String[] args){SpringApplication.run(WebFluxR2dbcApp.class, args);}}

好了,致此我们整个

Demo

就实现完成了

参考链接:

https://zhuanlan.zhihu.com/p/299069835


本文转载自: https://blog.csdn.net/dafei1288/article/details/122875058
版权归原作者 麒思妙想 所有, 如有侵权,请联系我们删除。

“springboot3+r2dbc&mdash;&mdash;响应式编程实践”的评论:

还没有评论