数据库SqlServer迁移PostgreSql实践

背景
公司某内部系统属于商业产品,数据库性能已出现明显问题,服务经常卡死,员工经常反馈数据无法查询或不能及时查询,该系统所使用的数据库为SqlServer,SqlServer数据库属于商业数据库,依赖厂商的维护,且维护成本高,效率低,且存在版权等问题,考虑将该系统的数据库,迁移至PostGresql数据库,属于BSD的开源数据库,不存在版本问题,公司也有部分系统采用pg,维护成本也将大大减低。

迁移原理
SqlServer属于商业数据库,不可能像Mysql等数据库一样,去解析相关的数据库binlog,从而实现增量数据的回放,结合应用属性,最后确定采用离线迁移方式,从SqlServer中将表数据全部读出,然后将数据写入到pg中,采用此种方案的弊病就是程序端需停止写入(应用可将部分数据缓存到本地),等待数据库迁移完成后,程序端再迁移至PostGresql,迁移方法如下: 

表结构迁移原理
表结构主要包含字段,索引,主键,外键等信息组成,主要采用开源工具sqlserver2pg进行表结构的转换

表结构转换
从SqlServer中读写表结构的字段信息,并对字段类型进行转换,转换核心代码如下

sub convert_type
{
 my ($sqlstype, $sqlqual, $colname, $tablename, $typname, $schemaname) =
 @_;
 my $rettype;
 if (defined $types{$sqlstype})
 {
 if ((defined $sqlqual and defined($unqual{$types{$sqlstype}}))
 or not defined $sqlqual)
 {
 # This is one of the few types that have to be unqualified (binary type)
 $rettype = $types{$sqlstype};
 # but we might add a check constraint for binary data
 if ($sqlstype =~ 'binary' and defined $sqlqual) {
 print STDERR "convert_type: $sqlstype, $sqlqual, $colname\n";
 my $constraint;
 $constraint->{TYPE} = 'CHECK_BINARY_LENGTH';
 $constraint->{TABLE} = $tablename;
 $constraint->{TEXT} = "octet_length(" . format_identifier($colname) . ") <= $sqlqual";
 push @{$objects->{SCHEMAS}->{$schemaname}->{TABLES}->{$tablename}
 ->{CONSTRAINTS}}, ($constraint);
 }
 }
 elsif (defined $sqlqual)
 {
 $rettype = ($types{$sqlstype} . "($sqlqual)");
 }
 }
 # A few special cases
 elsif ($sqlstype eq 'bit' and not defined $sqlqual)
 {
 $rettype = "boolean";
 }
 elsif ($sqlstype eq 'ntext' and not defined $sqlqual)
 {
 $rettype = "text";
 }

外键,索引,唯一键转换

主要是从sqlserver导出的表结构数据中,对相关的索引,外键等语句进行转换,转换核心代码如下

while (my ($schema, $refschema) = each %{$objects->{SCHEMAS}})
 {
 # Indexes
 # They don't have a schema qualifier. But their table has, and they are in the same schema as their table
 foreach my $table (sort keys %{$refschema->{TABLES}})
 {
 foreach
 my $index (
 sort keys %{$refschema->{TABLES}->{$table}->{INDEXES}})
 {
 my $index_created = 0;
 my $idxref =
 $refschema->{TABLES}->{$table}->{INDEXES}->{$index};
 my $idxdef .= "";
 if ($idxref->{DISABLE})
 {
 $idxdef .= "-- ";
 }
 $idxdef .= "CREATE";
 if ($idxref->{UNIQUE})
 {
 $idxdef .= " UNIQUE";
 }
 if (defined $idxref->{COLS})
 {
 $idxdef .= " INDEX " . format_identifier($index) . " ON " . format_identifier($schema) . '.' . format_identifier($table) . " ("
 . join(",", map{format_identifier_cols_index($_)} @{$idxref->{COLS}}) . ")";
 if (defined $idxref->{INCLUDE}) {
 $idxdef .= " INCLUDE (" .
 join(",", map{format_identifier_cols_index($_)} @{$idxref->{INCLUDE}})
 . ")";
 }
 if (not defined $idxref->{WHERE} and not defined $idxref->{DISABLE}) {
 $idxdef .= ";\n";
 print AFTER $idxdef;
 # the possible comment would go to after file
 $index_created = 1;
 }

数据类型转换原理

数据类型转换

函数类型转换

存储过程

视图部分需手动改造

迁移方法

表结构转换

./sqlserver2pgsql.pl -b before.sql -a after.sql -u unsure.sql -k /opt/data_migration/data-integration/ -sd test -sh 127.0.0.1 -sp 1433 -su user_only -sw 122132321 -pd test -ph 192.168.1.1 -pp 15432 -pu postgres -pw 12345678 -pi 8 -po 8 -f script.sql

表结构导入pg

/usr/local/pgsql1201/bin/psql -h 127.0.0.1 -U postgres -p 15432 <before.sql

数据迁移

cd /opt/data_migration/data-integration/
sh kitchen.sh -file=migration.kjb -level=detailed >migration.log

数据比对

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author:jacker_zhou
@create_time: 2017-04-07
@overview: mssql pg 
"""
__author__ = 'jacker_zhou'
__version__ = '0.1'
import psycopg2,pymssql 
import types
import time
TableSpace='public.' 
class CompareDataBase(): 
 def __init__(self): 
 
 self.pgcnotallow=psycopg2.connect(database="test",host="127.0.0.1",port=15432,user="postgres",password="test") 
 
 self.mscnotallow=pymssql.connect(host="192.168.1.1",user="test",password="test",database="test") 
 def commit(self): 
 self.pgconn.commit() 
 def close(self): 
 self.pgconn.close() 
 self.msconn.close() 
 def rollback(self): 
 self.pgconn.rollback() 
 def exesyncdb(self): 
 mscursor=self.msconn.cursor() 
 sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ") 
 mscursor.execute(sql) 
 table=mscursor.fetchall()
 print ("total table %d"%len(table))
 if(table is None or len(table)<=0): 
 return 
 else: 
 for row in table: 
 self.executeTable(row[1],row[0]) 
 print ("%s is execute success"%row[1])
 def comparedb(self): 
 mscursor=self.msconn.cursor() 
 sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ") 
 mscursor.execute(sql) 
 table=mscursor.fetchall()
 print ("total table %d"%len(table))
 if(table is None or len(table)<=0): 
 return 
 else: 
 for row in table: 
 self.compareTable(row[1]) 
 def executeTable(self,tablename,count): 
 #print tablename 
 sql1="SELECT * FROM %s"%tablename 
 print (sql1)
 mscursor=self.msconn.cursor() 
 mscursor.execute(sql1) 
 table=mscursor.fetchall()
 if(table is None or len(table)<=0): 
 mscursor.close() 
 return 
 lst_result=self.initColumn(table)
 #print "column" 
 mscursor.close() 
 print ("execute sync %s data to postgresql"%tablename)
 sql2=self.initPgSql(tablename,count)
 pgcursor=self.pgconn.cursor() 
 pgcursor.executemany(sql2,lst_result) 
 pgcursor.close()
 def compareTable(self,tablename): 
 #print tablename 
 sql1="SELECT count(*) FROM %s"%tablename 
 mscursor=self.msconn.cursor() 
 mscursor.execute(sql1) 
 ms_res=mscursor.fetchall() 
 mscursor.close() 
 pgcursor=self.pgconn.cursor()
 pgcursor.execute(sql1) 
 pg_res=pgcursor.fetchall() 
 pgcursor.close()
 res =""
 if ms_res[0][0] == pg_res[0][0]:
 res ="ok"
 else:
 res = "fail"
 print ("execute compare table %s data postgresql: %s mssql:%s result: %s"%(tablename,pg_res[0][0],ms_res[0][0],res))
 if __name__=="__main__": 
 sdb= CompareDataBase()
 start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
 print ("task start time %s"%start_time)
 try: 
 sdb.comparedb()
 except Exception as e: 
 print (e) 
 sdb.rollback() 
 else: 
 sdb.commit() 
 sdb.close() 
 end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
 print ("task end time %s"%end_time)
 print ("ok........" )

参考

​ ​https://github.com/dalibo/sqlserver2pgsql​

作者:古道轻风原文地址:https://www.cnblogs.com/88223100/p/Database-SqlServer-Migration-PostgreSql-Practice.html

%s 个评论

要回复文章请先登录注册