1、背景:
公司本来用的数据库都是mysql,为了国产化适配兼容pg和dm。dm提供了数据迁移工具,可以直接做数据迁移,生成脚本之后在其他环境执行。但是pg貌似没有工具能直接用。navicat由于版权问题公司也用不了。pgloader使用总是有问题,可以执行pgloader命令,但是没有任何报错,表结构也没有迁移过去。所以自己写了个工具类,用于生成mysql表结构,然后通过脚本执行的方式在psql中执行。
2、难点:
1、mysql与pg建表语句不同,需要根据mysql的元数据自定义pg的表结构。
2、没有办法直接将mysql的字段类型拿来用,需要映射字段类型(如果没有特殊的类型,基本网上都可以找到映射表)。
3、直接用DriverManager的方式无法获取到字段的其他属性,比如索引等。
4、mysql的一些特性无法在pg中直接使用,比如mysql的自增直接指定auto_increment,而pg则需要指定字段为bigserial。mysql还可以设置时间的自动更新,而pg需要使用其他函数等等问题。
3、思路:
1、尽可能的找mysql与pg的类型映射关系,测试中发现问题在补全。没啥好的办法。
2、不使用JDBC提供的获取元数据的api,而是使用sql查询mysql元数据信息。然后根据mysql的元数据进行解析生成pg的建表语句。
3、对于mysql的特殊函数比如时间自动更新,修改源码。在代码层面设置时间和更新时间。
4、具体实现:
(1)、mysql字段实体和索引实体对象
package com.example.canaltest.entry;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author Shaon
* @version 1.0
* @title MysqlTableColEntity
* @description
* @create 2024/9/2 10:30
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlTableColEntity implements Serializable {
/**
* 表对应的库名
*/
private String tableSchema;
/**
* 表名
*/
private String tableName;
/**
* 列名
*/
private String columnName;
/**
* 列的类型
*/
private String dataType;
/**
* 列的长度
*/
private String dataLength;
/**
* 类是不是主键 PRI;UNI;MUL
*/
private String columnKey;
/**
* 是否可以为空 NO:不为空;YES:可以为空
*/
private String isNullable;
/**
* 列默认值
*/
private String columnDefault;
/**
* 列注释
*/
private String columnComment;
/**
* 类扩展 是否自增(auto_increment);时间函数(其他数据库不一定可以使用)
*/
private String extra;
}
package com.example.canaltest.entry;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author Shaon
* @version 1.0
* @title MysqlTableIndexEntity
* @description NON_UNIQUE, INDEX_NAME, SEQ_IN_INDEX, COLUMN_NAME
* @create 2024/9/2 17:11
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlTableIndexEntity implements Serializable {
/**
* 是否不是唯一索引 0:唯一索引;1:不是唯一索引
*/
private int nonUnique;
/**
* 索引的名称
*/
private String indexName;
/**
* 索引的序号,从 1 开始,如果存在多个递增
*/
private int seqInIndex;
/**
* 组成索引的列名
*/
private String columnName;
}
(2)、生成pg的ddl到指定的文件。这里每个数据库生成一个文件。
package com.example.canaltest.utils;
import com.example.canaltest.entry.MysqlTableColEntity;
import com.example.canaltest.entry.MysqlTableIndexEntity;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author Shaon
* @version 1.0
* @title MysqlTrasformPostgresql
* @description
* @create 2024/9/2 9:30
*/
public class MysqlTransformPostgresql {
private static final String MYSQL_URL = "jdbc:mysql://192.168.16.240:3306/dbName";
private static final String MYSQL_USER = "username";
private static final String MYSQL_PASSWORD = "password";
/**
* 需要迁移的mysql数据库
*/
private static final List<String> databaseList = Arrays.asList(
"dbName"
);
private static final Map<String, String> typeMapping = Maps.newHashMap();
static {
// 初始化映射表
typeMapping.put("tinyint", "smallint");
typeMapping.put("smallint", "smallint");
typeMapping.put("mediumint", "integer");
typeMapping.put("int", "integer");
typeMapping.put("integer", "integer");
typeMapping.put("bigint", "bigint");
typeMapping.put("decimal", "numeric");
typeMapping.put("numeric", "numeric");
typeMapping.put("float", "real");
typeMapping.put("double", "double precision");
typeMapping.put("real", "double precision");
typeMapping.put("date", "date");
typeMapping.put("time", "time without time zone");
typeMapping.put("datetime", "timestamp without time zone");
typeMapping.put("timestamp", "timestamp without time zone");
typeMapping.put("year", "smallint");
typeMapping.put("char", "character");
typeMapping.put("varchar", "varchar");
typeMapping.put("tinytext", "text");
typeMapping.put("text", "text");
typeMapping.put("mediumtext", "text");
typeMapping.put("longtext", "text");
typeMapping.put("blob", "bytea");
typeMapping.put("enum", "varchar"); // PostgreSQL 没有 ENUM 类型,可以使用 VARCHAR 替代
typeMapping.put("set", "varchar"); // PostgreSQL 没有 SET 类型,可以使用 VARCHAR 替代
typeMapping.put("json", "jsonb");
typeMapping.put("binary", "bytea");
typeMapping.put("varbinary", "bytea");
typeMapping.put("bit", "boolean");
typeMapping.put("bool", "boolean");
typeMapping.put("boolean", "boolean");
typeMapping.put("point", "point");
typeMapping.put("linestring", "line");
typeMapping.put("polygon", "polygon");
typeMapping.put("geometry", "geometry");
typeMapping.put("geography", "geography");
typeMapping.put("inet", "inet");
typeMapping.put("cidr", "cidr");
typeMapping.put("macaddr", "macaddr");
typeMapping.put("uuid", "uuid");
typeMapping.put("hstore", "hstore");
}
private static final String sql = "select TABLE_SCHEMA,\n" +
" TABLE_NAME,\n" +
" COLUMN_NAME,\n" +
" COLUMN_DEFAULT,\n" +
" IS_NULLABLE,\n" +
" DATA_TYPE,\n" +
" CHARACTER_MAXIMUM_LENGTH,\n" +
" COLUMN_KEY,\n" +
" EXTRA,\n" +
" COLUMN_COMMENT\n" +
"from INFORMATION_SCHEMA.COLUMNS\n" +
"where TABLE_SCHEMA = ?";
private static final String indexSql = "select NON_UNIQUE,INDEX_NAME,SEQ_IN_INDEX,COLUMN_NAME\n" +
"from information_schema.STATISTICS\n" +
"where TABLE_SCHEMA = ? and TABLE_NAME = ?;";
/**
* 存储文件的路径前缀
*/
private static final String filePathPrefix = "E:\\mysql-table\\";
public static void mysqlDDL2File() {
try (Connection connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASSWORD);
PreparedStatement statement = connection.prepareStatement(sql)) {
for (String dbName : databaseList) {
Path path = Paths.get(filePathPrefix + dbName + ".sql");
StringJoiner joiner = new StringJoiner("\n");
List<MysqlTableColEntity> tableColEntities = Lists.newArrayList();
statement.setString(1, dbName);
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
String tableSchema = resultSet.getString("TABLE_SCHEMA");
String tableName = resultSet.getString("TABLE_NAME");
String columnName = resultSet.getString("COLUMN_NAME");
String columnDefault = resultSet.getString("COLUMN_DEFAULT");
String isNullable = resultSet.getString("IS_NULLABLE");
String dataType = resultSet.getString("DATA_TYPE");
String dataLength = resultSet.getString("CHARACTER_MAXIMUM_LENGTH");
String columnKey = resultSet.getString("COLUMN_KEY");
String extra = resultSet.getString("EXTRA");
String columnComment = resultSet.getString("COLUMN_COMMENT");
MysqlTableColEntity build = MysqlTableColEntity.builder()
.tableSchema(tableSchema)
.tableName(tableName)
.columnName(columnName)
.columnDefault(columnDefault)
.isNullable(isNullable)
.dataType(dataType)
.dataLength(dataLength)
.columnKey(columnKey)
.extra(extra)
.columnComment(columnComment)
.build();
tableColEntities.add(build);
}
Map<String, List<MysqlTableColEntity>> groupByTableName = groupByTableName(tableColEntities);
for (String tableName : groupByTableName.keySet()) {
String tableSql = buildTableSql(connection, dbName, tableName, groupByTableName.get(tableName));
joiner.add(tableSql);
}
System.out.println(joiner);
writeSqlFile(path, joiner.toString());
}
// 生成建库脚本
createDatabaseSql(databaseList);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void createDatabaseSql(List<String> databaseList) throws IOException {
Path path = Paths.get(filePathPrefix + "pg_create_database.sql");
StringJoiner joiner = new StringJoiner("\n");
for (String dbName : databaseList) {
joiner.add("create database " + dbName + ";");
joiner.add("\n");
}
writeSqlFile(path, joiner.toString());
}
/**
* 将内容写到文件中
*
* @param path
* @param context
* @throws IOException
*/
private static void writeSqlFile(Path path, String context) throws IOException {
Files.write(path, context.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
private static Map<String, List<MysqlTableColEntity>> groupByTableName(List<MysqlTableColEntity> tableColEntities) {
return tableColEntities.stream()
.collect(
Collectors.groupingBy(MysqlTableColEntity::getTableName)
);
}
private static String buildTableSql(Connection connection, String dbName, String tableName, List<MysqlTableColEntity> tableColEntities) {
String colComment = "comment on column " + tableName + ".%s is '%s';";
StringJoiner colJoin = new StringJoiner(",\n");
StringJoiner commentJoin = new StringJoiner("\n");
List<MysqlTableIndexEntity> mysqlIndexList = getMysqlIndexList(connection, dbName, tableName);
for (MysqlTableColEntity tableColEntity : tableColEntities) {
StringJoiner col = new StringJoiner(" ");
String columnName = tableColEntity.getColumnName();
String dataType = tableColEntity.getDataType();
String isNullable = tableColEntity.getIsNullable();
String dataLength = tableColEntity.getDataLength();
String columnKey = tableColEntity.getColumnKey();
String columnDefault = tableColEntity.getColumnDefault();
String columnComment = tableColEntity.getColumnComment();
String extra = tableColEntity.getExtra();
String pgType = typeMapping.get(dataType);
// 处理不需要长度的数据类型
if (StringUtils.isNotBlank(dataLength)
&& !dataType.contains("text")
&& !Objects.equals("blob", dataType)) {
pgType += ("(" + dataLength + ")");
}
if (Objects.equals("PRI", columnKey)
&& Objects.equals("auto_increment", extra)) {
pgType = "bigserial";
}
col.add("\t");
col.add("\"" + columnName + "\"");
col.add(pgType);
if (StringUtils.isNotBlank(columnDefault)
&& !columnDefault.contains("CURRENT_TIMESTAMP")) {
if (Objects.equals(dataType, "bit")) {
if (columnDefault.contains("0")) {
columnDefault = "false";
} else {
columnDefault = "true";
}
}
col.add("default " + "'" + columnDefault + "'");
}
if (Objects.equals("NO", isNullable)) {
col.add("not null");
}
colJoin.add(col.toString());
String format = String.format(colComment, "\"" + columnName + "\"", columnComment);
commentJoin.add(format);
}
if (StringUtils.isNotBlank(buildPkIndex(mysqlIndexList))) {
String constraint = "\tconstraint " + tableName + "_pk" + "\n";
String pk = "\t\tprimary key" + " (" + buildPkIndex(mysqlIndexList) + ")";
colJoin.add(constraint + pk);
}
StringBuilder builder = new StringBuilder();
builder.append("create table if not exists " + tableName + "\n")
.append("(\n")
.append(colJoin)
.append("\n")
.append(");\n")
.append(commentJoin)
.append("\n")
.append("alter table " + tableName + " owner to postgres;")
.append("\n");
String indexSql = buildCreateIndexSql(tableName, mysqlIndexList);
// 处理 非主键索引
if (StringUtils.isNotBlank(indexSql)) {
builder.append(indexSql).append("\n");
}
return builder.toString();
}
private static String buildCreateIndexSql(String tableName, List<MysqlTableIndexEntity> mysqlIndexList) {
StringJoiner joiner = new StringJoiner("\n");
joiner.setEmptyValue("");
// 1:索引的类型 index 和unique index
// 2:索引的名称
// 3:索引的字段,多个用逗号+空格分割
String indexSql = "create %s if not exists %s\n" +
"\ton " + tableName + " (%s);";
Map<String, List<MysqlTableIndexEntity>> indexMapping = mysqlIndexList.stream()
.filter(item -> !Objects.equals("PRIMARY", item.getIndexName()))
.sorted(Comparator.comparingInt(MysqlTableIndexEntity::getSeqInIndex))
.collect(
Collectors.groupingBy(MysqlTableIndexEntity::getIndexName)
);
for (String indexName : indexMapping.keySet()) {
List<MysqlTableIndexEntity> mysqlTableIndexEntities = indexMapping.get(indexName);
int nonUnique = mysqlTableIndexEntities.get(0).getNonUnique();
String indexType = "index";
if (nonUnique == 0) {
indexType = "unique index";
}
List<String> colNameList = mysqlTableIndexEntities.stream()
.map(item -> "\"" + item.getColumnName() + "\"")
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(colNameList)) {
continue;
}
String colName = Joiner.on(", ").join(colNameList);
joiner.add(String.format(indexSql, indexType, indexName, colName));
}
return joiner.toString();
}
private static String buildPkIndex(List<MysqlTableIndexEntity> indexEntityList) {
List<MysqlTableIndexEntity> primary = indexEntityList.stream()
.filter(item -> Objects.equals("PRIMARY", item.getIndexName()))
.collect(Collectors.toList());
List<String> colNameList = primary.stream()
.map(item -> "\"" + item.getColumnName() + "\"")
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(colNameList)) {
return null;
}
return Joiner.on(", ").join(colNameList);
}
private static List<MysqlTableIndexEntity> getMysqlIndexList(Connection connection, String dbName, String tableName) {
List<MysqlTableIndexEntity> list = Lists.newArrayList();
try (PreparedStatement statement = connection.prepareStatement(indexSql)) {
statement.setString(1, dbName);
statement.setString(2, tableName);
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
// NON_UNIQUE, INDEX_NAME, SEQ_IN_INDEX, COLUMN_NAME
int nonUnique = resultSet.getInt("NON_UNIQUE");
String indexName = resultSet.getString("INDEX_NAME");
int seqInIndex = resultSet.getInt("SEQ_IN_INDEX");
String columnName = resultSet.getString("COLUMN_NAME");
MysqlTableIndexEntity build = MysqlTableIndexEntity.builder()
.nonUnique(nonUnique)
.indexName(indexName)
.seqInIndex(seqInIndex)
.columnName(columnName)
.build();
list.add(build);
}
} catch (Exception e) {
e.printStackTrace();
}
return list;
}
public static void main(String[] args) {
mysqlDDL2File();
}
}
(3)、执行脚本
#! /bin/bash
# 检查输入参数数量
if [ "$#" -ne 3 ]; then
echo "Usage: $0 <username> <password> <folder_path>"
exit 1
fi
# 检查文件夹是否存在
if [ ! -d "$FOLDER_PATH" ]; then
echo "Error: Folder does not exist: $FOLDER_PATH"
exit 1
fi
USERNAME=$1
PASSWORD=$2
FILEPATH=$3
export PGPASSWORD=${PASSWORD}
psql -U ${USERNAME} -h 127.0.0.1 -p 5432 -f ${FILEPATH}
#! /bin/bash
# 检查输入参数数量
if [ "$#" -ne 3 ]; then
echo "Usage: $0 <username> <password> <folder_path>"
exit 1
fi
USERNAME=$1
PASSWORD=$2
FOLDER_PATH=$3
# 检查文件夹是否存在
if [ ! -d "$FOLDER_PATH" ]; then
echo "Error: Folder does not exist: $FOLDER_PATH"
exit 1
fi
# 检查文件夹是否存在
if [ ! -d "$FOLDER_PATH" ]; then
echo "Error: Folder does not exist: $FOLDER_PATH"
exit 1
fi
# 遍历文件夹中的每个文件
for FILE in "$FOLDER_PATH"/*; do
if [ -f "$FILE" ];then
echo ${FILE}
DB_NAME=$(basename "$FILE" | cut -d'.' -f1)
SQL_SCRIPT=${DB_NAME}.sql
echo ${DB_NAME}
echo ${SQL_SCRIPT}
psql -d ${DB_NAME} -U ${USERNAME} -h 127.0.0.1 -p 5432 -f ${FILE}
fi
done
echo "Script execution completed."
两个脚本分别是:
1、生成数据库
2、生成表
脚本的逻辑都是执行脚本输入三个参数,分别是用户名、密码、执行路径。组装成psql语句,执行。只不过执行建表的逻辑的执行路径是个文件夹,遍历这个文件夹下的所有文件,执行建表。
5、问题:
1、可能存在个别字段映射不正确的问题。
2、mysql外键无法同步到pg中。
3、存储过程没有同步。
4、insert数据脚本没有生成。
上边存在的问题,在使用的时候可以修改一下,对于外键和存储过程,一般情况下应该在功能设计的时候就避免。对于数据的迁移还欠缺,但是基本insert就可以用,所以可以用工具生成insert然后执行。
版权归原作者 TNT_D 所有, 如有侵权,请联系我们删除。