0


SpringBoot整合ShardingJDBC按时间分表及自动建表

ShardingJDBC-分表,按时间分表及自动建表


前言

本文使用springboot集成shardingJDBC,实现动态分表(自动按时间分表及自动建表)。


一、引入依赖

<!--shardingJDBC--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.1.1</version></dependency>

二、yml配置文件

spring
  # 配置Sharding-JDBC的分片策略
  shardingsphere:
    # 配置数据源,给数据源起名g1,g2...此处可配置多数据源    本示例暂不分库
    datasource:
      names: g1
      g1:
        type:"com.alibaba.druid.pool.DruidDataSource"
        driver-class-name:"com.mysql.cj.jdbc.Driver"
        url: jdbc:mysql:
        username: root
        password: abc
    # 配置表的分布,表的策略
    sharding:
      tables:
        # 表名--项目中第一张需要分的表
        sharding_test:
          # 动态配置-分表节点
          actual-data-nodes: g1.sharding_test
          key-generator:
            # 指定表 主键id 生成策略为 SNOWFLAKE
            column: id
            type: SNOWFLAKE
          table-strategy:
            standard:
              # 分片字段
              sharding-column: time
              # 精确算法实现类路径
              precise-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.PreciseAlgorithmCustomer
              # 范围算法实现类路径
              range-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.RangeAlgorithmCustomer
        # 表名-项目中其他需要分的表
        sharding_order:
          # 动态配置
          actual-data-nodes: g1.sharding_order
          key-generator:
            # 指定表 主键id 生成策略为 SNOWFLAKE
            column: id
            type: SNOWFLAKE
          table-strategy:
            standard:
              sharding-column: create_time
              precise-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.PreciseAlgorithmCustomer
              range-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.RangeAlgorithmCustomer
    # 打开ShardingSphere-sql输出日志---调试时方便查看具体哪张表
    props:
      sql:
        show: true

三、分片算法实现类

1.精确算法-PreciseShardingAlgorithm

import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;

@Component
public class PreciseAlgorithmCustomer implements PreciseShardingAlgorithm<Date>{
    private static ShardingAlgorithmReload shardingAlgorithmReload;
    @Autowired
    public voidsetShardingAlgorithmReload(ShardingAlgorithmReload shardingAlgorithmReload){
        PreciseAlgorithmCustomer.shardingAlgorithmReload = shardingAlgorithmReload;}

    @Override
    public String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue){
        String suffix = ShardingDateUtil.getYearMonth(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(preciseShardingValue.getValue()));
        String preciseTable = preciseShardingValue.getLogicTableName()+"_"+ suffix;if(collection.contains(preciseTable)){return preciseTable;}else{
            String s = shardingAlgorithmReload.tryCreateShardingTable(preciseShardingValue.getLogicTableName(), suffix);if(StringUtils.isNotBlank(s)){return s;}else{
                throw new IllegalArgumentException("未找到匹配的数据表");}}}}

2.范围算法-RangeShardingAlgorithm

import com.google.common.collect.Range;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingValue;

import java.text.SimpleDateFormat;
import java.util.*;

public class RangeAlgorithmCustomer implements RangeShardingAlgorithm<Date>{
    @Override
    public Collection<String>doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue){
        List<String> list = new ArrayList<>();
        Range<Date> valueRange = rangeShardingValue.getValueRange();
        Date lowerData = valueRange.lowerEndpoint();
        Date upperData = valueRange.upperEndpoint();
        String lowerSuffix = ShardingDateUtil.getYearMonth(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lowerData));
        String upperSuffix = ShardingDateUtil.getYearMonth(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(upperData));if(lowerSuffix.equals(upperSuffix)){for(String tableName : collection){if(tableName.endsWith(lowerSuffix)){
                    list.add(tableName);break;}}}else{
            list =rangTableCheck(lowerSuffix, upperSuffix, collection);}return list;}

    private List<String>rangTableCheck(String lowerSuffix, String upperSuffix, Collection<String> collection){
        ArrayList<String> tableList = new ArrayList<>();
        Boolean addFlag = false;for(String tableName : collection){if(tableName.endsWith(lowerSuffix)){
                addFlag=true;}if(tableName.endsWith(upperSuffix)){
                tableList.add(tableName);break;}if(addFlag){
                tableList.add(tableName);}}return tableList;}}

四、节点重载及自动建表

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.api.sharding.ShardingAlgorithm;
import org.apache.shardingsphere.core.rule.TableRule;
import org.apache.shardingsphere.core.strategy.route.ShardingStrategy;
import org.apache.shardingsphere.core.strategy.route.complex.ComplexShardingStrategy;
import org.apache.shardingsphere.core.strategy.route.hint.HintShardingStrategy;
import org.apache.shardingsphere.core.strategy.route.standard.StandardShardingStrategy;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.context.ShardingRuntimeContext;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;

@Slf4j
@Component
public class ShardingAlgorithmReload {

    @Resource
    private ShardingDataSource shardingDataSource;

    private ShardingRuntimeContext runtimeContext;/**
     * 重载表缓存
     */
    public  voidtableNameCacheReloadAll(){
        ShardingRuntimeContext runtimeContext =getRuntimeContext();

        List<TableRule> tableRuleList =(List<TableRule>) runtimeContext.getRule().getTableRules();for(TableRule tableRule : tableRuleList){
            String nodeName = tableRule.getActualDatasourceNames().stream().findFirst().get();
            Set<String> tablesInDBSet =queryTables(tableRule.getLogicTable());refreshTableRule(tableRule, nodeName, tablesInDBSet);//用不到            refreshShardingAlgorithm(tableRule, null);}}

    protected voidrefreshTableRule(TableRule tableRule, String nodeName, Set<String> tablesInDBSet){// sharding缓存的表名
        Set<String> tableSets =getActualTables(tableRule);// 刷新if(!tableContrast(tableSets, tablesInDBSet)){
            List<String> tableList = new ArrayList<>(tablesInDBSet);setDatasourceToTablesMap(tableRule, nodeName, tableList);}}

    private boolean tableContrast(Set<String> actualTableSets, Set<String> tablesInDBSet){if(actualTableSets == null || tablesInDBSet == null){return false;}if(actualTableSets.size()!= tablesInDBSet.size()){return false;}return actualTableSets.containsAll(tablesInDBSet);}

    protected voidrefreshShardingAlgorithm(TableRule tableRule, String nodeName){// 获取分库分表时真正使用的表名
        Map<String, Set<String>> datasourceToTablesMap =getDatasourceToTablesMap(tableRule);
        Set<String> tables = datasourceToTablesMap.get(nodeName);
        ShardingStrategy shardingStrategy = tableRule.getTableShardingStrategy();if(shardingStrategy instanceof ComplexShardingStrategy){
            ShardingAlgorithm algorithm =getObjectField(shardingStrategy,"shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);}elseif(shardingStrategy instanceof HintShardingStrategy){
            ShardingAlgorithm algorithm =getObjectField(shardingStrategy,"shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);}elseif(shardingStrategy instanceof StandardShardingStrategy){
            ShardingAlgorithm preciseAlgorithm =getObjectField(shardingStrategy,"preciseShardingAlgorithm");setValueToBaseAlgorithm(tableRule, preciseAlgorithm, nodeName, tables);
            ShardingAlgorithm rangeAlgorithm =getObjectField(shardingStrategy,"rangeShardingAlgorithm");setValueToBaseAlgorithm(tableRule, rangeAlgorithm, nodeName, tables);}}

    private voidsetValueToBaseAlgorithm(TableRule tableRule, ShardingAlgorithm algorithm, String nodeName, Set<String> tables){if(algorithm != null && algorithm instanceof BaseShardingAlgorithm){
            BaseShardingAlgorithm baseShardingAlgorithm =(BaseShardingAlgorithm) algorithm;
            baseShardingAlgorithm.setLogicTable(tableRule.getLogicTable());
            baseShardingAlgorithm.setTables(tables);
            baseShardingAlgorithm.setTableRule(tableRule);
            baseShardingAlgorithm.setNodeName(nodeName);}}

    protected ShardingRuntimeContext getRuntimeContext(){
        try {if(runtimeContext == null){
                Method getRuntimeContextMethod = shardingDataSource.getClass().getDeclaredMethod("getRuntimeContext");
                getRuntimeContextMethod.setAccessible(true);
                runtimeContext =(ShardingRuntimeContext) getRuntimeContextMethod.invoke(shardingDataSource, null);}}catch(IllegalAccessException e){
            log.error("因为sharding版本问题", e);}catch(InvocationTargetException e){
            log.error("因为sharding版本问题", e);}catch(NoSuchMethodException e){
            log.error("因为sharding版本问题", e);}return runtimeContext;}

    protected Set<String>getActualTables(TableRule tableRule){
        Set<String> tables =getObjectField(tableRule,"actualTables");return tables == null ? new LinkedHashSet<>(): tables;}

    protected voidsetDatasourceToTablesMap(TableRule tableRule, String nodeName, List<String> newTableList){synchronized(tableRule){// 获取分库分表时真正使用的表名
            Map<String, Set<String>> datasourceToTablesMap =getDatasourceToTablesMap(tableRule);
            Set<String> tables = datasourceToTablesMap.get(nodeName);
            Collections.sort(newTableList);
            tables.clear();
            tables.addAll(newTableList);}}

    protected Map<String, Set<String>>getDatasourceToTablesMap(TableRule tableRule){
        Map<String, Set<String>> tablesMap =getObjectField(tableRule,"datasourceToTablesMap");return tablesMap == null ? new HashMap<>(0): tablesMap;}

    protected static<T> T getObjectField(Object object, String fieldName){
        try {
            Field field = object.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);return(T) field.get(object);}catch(NoSuchFieldException e){
            log.error("因为sharding版本问题", e);}catch(IllegalAccessException e){
            log.error("因为sharding版本问题", e);}return null;}

    protected Set<String>queryTables(String tableName){
        Connection conn = null;
        Statement statement = null;
        ResultSet rs = null;
        Set<String> tables = null;
        try {
            conn = shardingDataSource.getConnection();
            statement = conn.createStatement();
            rs = statement.executeQuery("select table_name from information_schema.tables where table_schema ='数据库名' and table_name LIKE '"+tableName+"_%'");
            tables = new LinkedHashSet<>();while(rs.next()){
                tables.add(rs.getString(1));}}catch(SQLException e){
            log.error("获取数据库连接失败!", e);} finally {
            try {if(rs != null){
                    rs.close();}if(statement != null){
                    statement.close();}if(conn != null){
                    conn.close();}}catch(SQLException e){
                log.error("关闭数据连接失败", e);}}return tables;}

    protected voidcreateTable(String tableName,String suffix){
        String tableAllName=tableName+"_"+suffix;
        String sql=null;//根据不同表的建表语句--也可通告数据库自行配置(这里写死)if(tableName.equals("sharding_order")){
            sql="CREATE TABLE `"+tableAllName+"` ( `id` bigint NOT NULL, `create_time` datetime NOT NULL, `user_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;";}elseif(tableName.equals("sharding_test")){
            sql="CREATE TABLE `"+tableAllName+"` ( `id` bigint NOT NULL, `time` datetime NOT NULL, `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;";}
        Connection conn = null;
        Statement statement = null;
        try {
            conn = shardingDataSource.getConnection();
            statement = conn.createStatement();
            statement.executeUpdate(sql);}catch(SQLException e){
            log.error("获取数据库连接失败!", e);} finally {
            try {if(statement != null){
                    statement.close();}if(conn != null){
                    conn.close();}}catch(SQLException e){
                log.error("关闭数据连接失败", e);}}}

    public String tryCreateShardingTable(String tableName, String suffix){
        String resTable = tableName +"_"+ suffix;//建表(业务处理) mysql,DDL操作会触发MDL x锁(排他锁),就不加分布式自旋锁了createTable(tableName,suffix);//重载tableNameCacheReloadAll();return resTable;}}

五、初始化-获取现有表

import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Order(value =1)
@Component
public class ShardingTablesLoadRunner implements CommandLineRunner {
    @Resource
    private ShardingAlgorithmReload shardingAlgorithmReload;
    @Override
    public voidrun(String... args) throws Exception {
        shardingAlgorithmReload.tableNameCacheReloadAll();}}

六、其他工具类

import org.apache.commons.lang3.StringUtils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class ShardingDateUtil {

    public static final String DATE_FORMAT_DEFAULT ="yyyy-MM-dd HH:mm:ss";
    public static final String DATE_FORMAT_NUMBER ="yyyyMMddHHmmss";
    public static final String YEAR_MONTH_DAY_NUMBER ="yyyyMMdd";
    public static final String YEAR_MONTH_NUMBER ="yyyyMM";
    public static final String DATE_FORMAT_DAY_PATTERN ="yyyy-MM-dd";
    public static final String YEAR_MONTH_DAY_EN_SECOND ="yyyy/MM/dd HH:mm:ss";
    public static final String YEAR_MONTH_DAY_CN_SECOND ="yyyy年MM月dd日 HH时mm分ss秒";
    public static final String YEAR_MONTH_DAY_CN ="yyyy年MM月dd日";
    public static final String MONTH_DAY ="MM-dd";
    public static String getYearMonth(Long date){if(date == null){return null;}return new SimpleDateFormat(YEAR_MONTH_NUMBER).format(date);}
    public static String getYearMonth(String date){if(date == null){return null;}
        String format = DATE_FORMAT_DEFAULT;
        Date parse = new Date();
        try {
            parse = new SimpleDateFormat(format).parse(date);}catch(ParseException e){
            e.printStackTrace();}return new SimpleDateFormat(YEAR_MONTH_NUMBER).format(parse);}
    public static String getYearMonth(String date, String format){if(date == null){return null;}if(StringUtils.isBlank(format)){
            format = DATE_FORMAT_DEFAULT;}
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);return simpleDateFormat.format(date);}}
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.core.rule.TableRule;
import java.util.Set;

@Setter
@Getter
@Slf4j
public class BaseShardingAlgorithm {/**
     * 数据库内所有表
     */
    private Set<String> tables;/**
     * 数据节点名称
     */
    private String nodeName;/**
     * 逻辑表名
     */
    private String logicTable;/**
     * 表的权限缓存
     */
    private TableRule tableRule;

    protected Set<String>getTables(){return tables;}}

七、测试-总结

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class SlUserServiceImplTest {

    @Resource
    private SharingTestMapper sharingTestMapper;
    @Resource
    private SharingOrderMapper sharingorderMapper;

    @Test
    voidsharingTestInsert(){

        try {/*String dateString = "2024-04-10 09:23:14";
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            ShardingTest shardingTest = new ShardingTest();
            Date date = format.parse(dateString);
            shardingTest.setName("你好");
            shardingTest.setTime(date);
            sharingTestMapper.insert(shardingTest);
            ShardingOrder shardingOrder=new ShardingOrder();
            shardingOrder.setUserName("niasdjfas");
            shardingOrder.setCreateTime(date);
            sharingorderMapper.insert(shardingOrder);*/
            
            QueryWrapper<ShardingTest> queryWrapper=new QueryWrapper<>();
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = format.parse("2024-01-9 11:23:14");
            Date dateEnd = format.parse("2024-02-10 11:23:14");
            queryWrapper.between("time",date,dateEnd);
            List<ShardingTest> shardingTests = sharingTestMapper.selectList(queryWrapper);
            System.out.println(shardingTests);/*ShardingOrder shardingOrder = new ShardingOrder();
            shardingOrder.setUserName("测试");
            shardingOrder.setId(1767427382494257153l);
            sharingorderMapper.updateById(shardingOrder);*/// 处理日期date}catch(Exception e){
            e.printStackTrace();}}}

在这里插入图片描述通过测试可以发现
1.新增时,如果没有当前表则自动建表
2.查询修改时,若条件没有分片字段,则全局搜索;反之匹配精确或范围算法找到对应表,最后综合结果返回数据

标签: 中间件 mysql spring

本文转载自: https://blog.csdn.net/m0_54323419/article/details/136651676
版权归原作者 任凭本心 所有, 如有侵权,请联系我们删除。

“SpringBoot整合ShardingJDBC按时间分表及自动建表”的评论:

还没有评论