ShardingJDBC-分表,按时间分表及自动建表
前言
本文使用springboot集成shardingJDBC,实现动态分表(自动按时间分表及自动建表)。
一、引入依赖
<!--shardingJDBC--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.1.1</version></dependency>
二、yml配置文件
spring
# 配置Sharding-JDBC的分片策略
shardingsphere:
# 配置数据源,给数据源起名g1,g2...此处可配置多数据源 本示例暂不分库
datasource:
names: g1
g1:
type:"com.alibaba.druid.pool.DruidDataSource"
driver-class-name:"com.mysql.cj.jdbc.Driver"
url: jdbc:mysql:
username: root
password: abc
# 配置表的分布,表的策略
sharding:
tables:
# 表名--项目中第一张需要分的表
sharding_test:
# 动态配置-分表节点
actual-data-nodes: g1.sharding_test
key-generator:
# 指定表 主键id 生成策略为 SNOWFLAKE
column: id
type: SNOWFLAKE
table-strategy:
standard:
# 分片字段
sharding-column: time
# 精确算法实现类路径
precise-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.PreciseAlgorithmCustomer
# 范围算法实现类路径
range-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.RangeAlgorithmCustomer
# 表名-项目中其他需要分的表
sharding_order:
# 动态配置
actual-data-nodes: g1.sharding_order
key-generator:
# 指定表 主键id 生成策略为 SNOWFLAKE
column: id
type: SNOWFLAKE
table-strategy:
standard:
sharding-column: create_time
precise-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.PreciseAlgorithmCustomer
range-algorithm-class-name: com.cn.xiaonuo.thirdusers.entity.sharding.RangeAlgorithmCustomer
# 打开ShardingSphere-sql输出日志---调试时方便查看具体哪张表
props:
sql:
show: true
三、分片算法实现类
1.精确算法-PreciseShardingAlgorithm
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
@Component
public class PreciseAlgorithmCustomer implements PreciseShardingAlgorithm<Date>{
private static ShardingAlgorithmReload shardingAlgorithmReload;
@Autowired
public voidsetShardingAlgorithmReload(ShardingAlgorithmReload shardingAlgorithmReload){
PreciseAlgorithmCustomer.shardingAlgorithmReload = shardingAlgorithmReload;}
@Override
public String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue){
String suffix = ShardingDateUtil.getYearMonth(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(preciseShardingValue.getValue()));
String preciseTable = preciseShardingValue.getLogicTableName()+"_"+ suffix;if(collection.contains(preciseTable)){return preciseTable;}else{
String s = shardingAlgorithmReload.tryCreateShardingTable(preciseShardingValue.getLogicTableName(), suffix);if(StringUtils.isNotBlank(s)){return s;}else{
throw new IllegalArgumentException("未找到匹配的数据表");}}}}
2.范围算法-RangeShardingAlgorithm
import com.google.common.collect.Range;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingValue;
import java.text.SimpleDateFormat;
import java.util.*;
public class RangeAlgorithmCustomer implements RangeShardingAlgorithm<Date>{
@Override
public Collection<String>doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue){
List<String> list = new ArrayList<>();
Range<Date> valueRange = rangeShardingValue.getValueRange();
Date lowerData = valueRange.lowerEndpoint();
Date upperData = valueRange.upperEndpoint();
String lowerSuffix = ShardingDateUtil.getYearMonth(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lowerData));
String upperSuffix = ShardingDateUtil.getYearMonth(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(upperData));if(lowerSuffix.equals(upperSuffix)){for(String tableName : collection){if(tableName.endsWith(lowerSuffix)){
list.add(tableName);break;}}}else{
list =rangTableCheck(lowerSuffix, upperSuffix, collection);}return list;}
private List<String>rangTableCheck(String lowerSuffix, String upperSuffix, Collection<String> collection){
ArrayList<String> tableList = new ArrayList<>();
Boolean addFlag = false;for(String tableName : collection){if(tableName.endsWith(lowerSuffix)){
addFlag=true;}if(tableName.endsWith(upperSuffix)){
tableList.add(tableName);break;}if(addFlag){
tableList.add(tableName);}}return tableList;}}
四、节点重载及自动建表
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.api.sharding.ShardingAlgorithm;
import org.apache.shardingsphere.core.rule.TableRule;
import org.apache.shardingsphere.core.strategy.route.ShardingStrategy;
import org.apache.shardingsphere.core.strategy.route.complex.ComplexShardingStrategy;
import org.apache.shardingsphere.core.strategy.route.hint.HintShardingStrategy;
import org.apache.shardingsphere.core.strategy.route.standard.StandardShardingStrategy;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.context.ShardingRuntimeContext;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
@Slf4j
@Component
public class ShardingAlgorithmReload {
@Resource
private ShardingDataSource shardingDataSource;
private ShardingRuntimeContext runtimeContext;/**
* 重载表缓存
*/
public voidtableNameCacheReloadAll(){
ShardingRuntimeContext runtimeContext =getRuntimeContext();
List<TableRule> tableRuleList =(List<TableRule>) runtimeContext.getRule().getTableRules();for(TableRule tableRule : tableRuleList){
String nodeName = tableRule.getActualDatasourceNames().stream().findFirst().get();
Set<String> tablesInDBSet =queryTables(tableRule.getLogicTable());refreshTableRule(tableRule, nodeName, tablesInDBSet);//用不到 refreshShardingAlgorithm(tableRule, null);}}
protected voidrefreshTableRule(TableRule tableRule, String nodeName, Set<String> tablesInDBSet){// sharding缓存的表名
Set<String> tableSets =getActualTables(tableRule);// 刷新if(!tableContrast(tableSets, tablesInDBSet)){
List<String> tableList = new ArrayList<>(tablesInDBSet);setDatasourceToTablesMap(tableRule, nodeName, tableList);}}
private boolean tableContrast(Set<String> actualTableSets, Set<String> tablesInDBSet){if(actualTableSets == null || tablesInDBSet == null){return false;}if(actualTableSets.size()!= tablesInDBSet.size()){return false;}return actualTableSets.containsAll(tablesInDBSet);}
protected voidrefreshShardingAlgorithm(TableRule tableRule, String nodeName){// 获取分库分表时真正使用的表名
Map<String, Set<String>> datasourceToTablesMap =getDatasourceToTablesMap(tableRule);
Set<String> tables = datasourceToTablesMap.get(nodeName);
ShardingStrategy shardingStrategy = tableRule.getTableShardingStrategy();if(shardingStrategy instanceof ComplexShardingStrategy){
ShardingAlgorithm algorithm =getObjectField(shardingStrategy,"shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);}elseif(shardingStrategy instanceof HintShardingStrategy){
ShardingAlgorithm algorithm =getObjectField(shardingStrategy,"shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);}elseif(shardingStrategy instanceof StandardShardingStrategy){
ShardingAlgorithm preciseAlgorithm =getObjectField(shardingStrategy,"preciseShardingAlgorithm");setValueToBaseAlgorithm(tableRule, preciseAlgorithm, nodeName, tables);
ShardingAlgorithm rangeAlgorithm =getObjectField(shardingStrategy,"rangeShardingAlgorithm");setValueToBaseAlgorithm(tableRule, rangeAlgorithm, nodeName, tables);}}
private voidsetValueToBaseAlgorithm(TableRule tableRule, ShardingAlgorithm algorithm, String nodeName, Set<String> tables){if(algorithm != null && algorithm instanceof BaseShardingAlgorithm){
BaseShardingAlgorithm baseShardingAlgorithm =(BaseShardingAlgorithm) algorithm;
baseShardingAlgorithm.setLogicTable(tableRule.getLogicTable());
baseShardingAlgorithm.setTables(tables);
baseShardingAlgorithm.setTableRule(tableRule);
baseShardingAlgorithm.setNodeName(nodeName);}}
protected ShardingRuntimeContext getRuntimeContext(){
try {if(runtimeContext == null){
Method getRuntimeContextMethod = shardingDataSource.getClass().getDeclaredMethod("getRuntimeContext");
getRuntimeContextMethod.setAccessible(true);
runtimeContext =(ShardingRuntimeContext) getRuntimeContextMethod.invoke(shardingDataSource, null);}}catch(IllegalAccessException e){
log.error("因为sharding版本问题", e);}catch(InvocationTargetException e){
log.error("因为sharding版本问题", e);}catch(NoSuchMethodException e){
log.error("因为sharding版本问题", e);}return runtimeContext;}
protected Set<String>getActualTables(TableRule tableRule){
Set<String> tables =getObjectField(tableRule,"actualTables");return tables == null ? new LinkedHashSet<>(): tables;}
protected voidsetDatasourceToTablesMap(TableRule tableRule, String nodeName, List<String> newTableList){synchronized(tableRule){// 获取分库分表时真正使用的表名
Map<String, Set<String>> datasourceToTablesMap =getDatasourceToTablesMap(tableRule);
Set<String> tables = datasourceToTablesMap.get(nodeName);
Collections.sort(newTableList);
tables.clear();
tables.addAll(newTableList);}}
protected Map<String, Set<String>>getDatasourceToTablesMap(TableRule tableRule){
Map<String, Set<String>> tablesMap =getObjectField(tableRule,"datasourceToTablesMap");return tablesMap == null ? new HashMap<>(0): tablesMap;}
protected static<T> T getObjectField(Object object, String fieldName){
try {
Field field = object.getClass().getDeclaredField(fieldName);
field.setAccessible(true);return(T) field.get(object);}catch(NoSuchFieldException e){
log.error("因为sharding版本问题", e);}catch(IllegalAccessException e){
log.error("因为sharding版本问题", e);}return null;}
protected Set<String>queryTables(String tableName){
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
Set<String> tables = null;
try {
conn = shardingDataSource.getConnection();
statement = conn.createStatement();
rs = statement.executeQuery("select table_name from information_schema.tables where table_schema ='数据库名' and table_name LIKE '"+tableName+"_%'");
tables = new LinkedHashSet<>();while(rs.next()){
tables.add(rs.getString(1));}}catch(SQLException e){
log.error("获取数据库连接失败!", e);} finally {
try {if(rs != null){
rs.close();}if(statement != null){
statement.close();}if(conn != null){
conn.close();}}catch(SQLException e){
log.error("关闭数据连接失败", e);}}return tables;}
protected voidcreateTable(String tableName,String suffix){
String tableAllName=tableName+"_"+suffix;
String sql=null;//根据不同表的建表语句--也可通告数据库自行配置(这里写死)if(tableName.equals("sharding_order")){
sql="CREATE TABLE `"+tableAllName+"` ( `id` bigint NOT NULL, `create_time` datetime NOT NULL, `user_name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;";}elseif(tableName.equals("sharding_test")){
sql="CREATE TABLE `"+tableAllName+"` ( `id` bigint NOT NULL, `time` datetime NOT NULL, `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;";}
Connection conn = null;
Statement statement = null;
try {
conn = shardingDataSource.getConnection();
statement = conn.createStatement();
statement.executeUpdate(sql);}catch(SQLException e){
log.error("获取数据库连接失败!", e);} finally {
try {if(statement != null){
statement.close();}if(conn != null){
conn.close();}}catch(SQLException e){
log.error("关闭数据连接失败", e);}}}
public String tryCreateShardingTable(String tableName, String suffix){
String resTable = tableName +"_"+ suffix;//建表(业务处理) mysql,DDL操作会触发MDL x锁(排他锁),就不加分布式自旋锁了createTable(tableName,suffix);//重载tableNameCacheReloadAll();return resTable;}}
五、初始化-获取现有表
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Order(value =1)
@Component
public class ShardingTablesLoadRunner implements CommandLineRunner {
@Resource
private ShardingAlgorithmReload shardingAlgorithmReload;
@Override
public voidrun(String... args) throws Exception {
shardingAlgorithmReload.tableNameCacheReloadAll();}}
六、其他工具类
import org.apache.commons.lang3.StringUtils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class ShardingDateUtil {
public static final String DATE_FORMAT_DEFAULT ="yyyy-MM-dd HH:mm:ss";
public static final String DATE_FORMAT_NUMBER ="yyyyMMddHHmmss";
public static final String YEAR_MONTH_DAY_NUMBER ="yyyyMMdd";
public static final String YEAR_MONTH_NUMBER ="yyyyMM";
public static final String DATE_FORMAT_DAY_PATTERN ="yyyy-MM-dd";
public static final String YEAR_MONTH_DAY_EN_SECOND ="yyyy/MM/dd HH:mm:ss";
public static final String YEAR_MONTH_DAY_CN_SECOND ="yyyy年MM月dd日 HH时mm分ss秒";
public static final String YEAR_MONTH_DAY_CN ="yyyy年MM月dd日";
public static final String MONTH_DAY ="MM-dd";
public static String getYearMonth(Long date){if(date == null){return null;}return new SimpleDateFormat(YEAR_MONTH_NUMBER).format(date);}
public static String getYearMonth(String date){if(date == null){return null;}
String format = DATE_FORMAT_DEFAULT;
Date parse = new Date();
try {
parse = new SimpleDateFormat(format).parse(date);}catch(ParseException e){
e.printStackTrace();}return new SimpleDateFormat(YEAR_MONTH_NUMBER).format(parse);}
public static String getYearMonth(String date, String format){if(date == null){return null;}if(StringUtils.isBlank(format)){
format = DATE_FORMAT_DEFAULT;}
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);return simpleDateFormat.format(date);}}
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.core.rule.TableRule;
import java.util.Set;
@Setter
@Getter
@Slf4j
public class BaseShardingAlgorithm {/**
* 数据库内所有表
*/
private Set<String> tables;/**
* 数据节点名称
*/
private String nodeName;/**
* 逻辑表名
*/
private String logicTable;/**
* 表的权限缓存
*/
private TableRule tableRule;
protected Set<String>getTables(){return tables;}}
七、测试-总结
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class SlUserServiceImplTest {
@Resource
private SharingTestMapper sharingTestMapper;
@Resource
private SharingOrderMapper sharingorderMapper;
@Test
voidsharingTestInsert(){
try {/*String dateString = "2024-04-10 09:23:14";
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
ShardingTest shardingTest = new ShardingTest();
Date date = format.parse(dateString);
shardingTest.setName("你好");
shardingTest.setTime(date);
sharingTestMapper.insert(shardingTest);
ShardingOrder shardingOrder=new ShardingOrder();
shardingOrder.setUserName("niasdjfas");
shardingOrder.setCreateTime(date);
sharingorderMapper.insert(shardingOrder);*/
QueryWrapper<ShardingTest> queryWrapper=new QueryWrapper<>();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = format.parse("2024-01-9 11:23:14");
Date dateEnd = format.parse("2024-02-10 11:23:14");
queryWrapper.between("time",date,dateEnd);
List<ShardingTest> shardingTests = sharingTestMapper.selectList(queryWrapper);
System.out.println(shardingTests);/*ShardingOrder shardingOrder = new ShardingOrder();
shardingOrder.setUserName("测试");
shardingOrder.setId(1767427382494257153l);
sharingorderMapper.updateById(shardingOrder);*/// 处理日期date}catch(Exception e){
e.printStackTrace();}}}
通过测试可以发现
1.新增时,如果没有当前表则自动建表
2.查询修改时,若条件没有分片字段,则全局搜索;反之匹配精确或范围算法找到对应表,最后综合结果返回数据
版权归原作者 任凭本心 所有, 如有侵权,请联系我们删除。