首页 > 代码库 > python--同一mysql数据库下批量迁移数据

python--同一mysql数据库下批量迁移数据

最近接手些mysql数据库维护,发现mysql在批量操作方面就是个渣渣啊,比起MS SQL SERVER简直就是“不可同日而语”。

咨询了下MySQL的高手,对于数据迁移这种问题,一种处理方式就是直接“一步到位” ,一次性将所有数据查询插入到另外一个表,然后再删除原表数据;另外一种处理方式就是使用pt--archiver工具来归档。

 

然并卵,“一步到位”法太刺激,pt--archiver工具用不顺手,由于目前大部分的表都以自增id为主键,以此为此为前提自己写个小脚本,厚脸拿出来供各位参考:

技术分享
# coding: utf-8import MySQLdbimport time# common configEXEC_DETAIL_FILE = exec_detail.txtDATETIME_FORMAT = %Y-%m-%d %XDefault_MySQL_Host = 192.168.166.169Default_MySQL_Port = 3358Default_MySQL_User = "mysql_admin"Default_MySQL_Password = mysql@Admin@PwdDefault_MySQL_Charset = "utf8"Default_MySQL_Connect_TimeOut = 120# Transfer ConfigTransfer_Database_Name = "db001"Transfer_Source_Table_Name = "tb2001"Transfer_Target_Table_Name = "tb2001_his"Transfer_Condition = "dt <‘2016-10-01‘"Transfer_Rows_Per_Batch = 10000Sleep_Second_Per_Batch = 0.5def get_time_string(dt_time):    """    获取指定格式的时间字符串    :param dt_time: 要转换成字符串的时间    :return: 返回指定格式的字符串    """    global DATETIME_FORMAT    return time.strftime(DATETIME_FORMAT, dt_time)def get_time_string(dt_time):    return time.strftime("%Y-%m-%d %X", dt_time)def highlight(s):    return "%s[30;2m%s%s[1m" % (chr(27), s, chr(27))def print_warning_message(message):    """    以红色字体显示消息内容    :param message: 消息内容    :return: 无返回值    """    message = str(message)    print(highlight(‘‘) + "%s[31;1m%s%s[0m" % (chr(27), message, chr(27)))def print_info_message(message):    """    以绿色字体输出提醒性的消息    :param message: 消息内容    :return: 无返回值    """    message = str(message)    print(highlight(‘‘) + "%s[32;2m%s%s[0m" % (chr(27), message, chr(27)))def write_file(file_path, message):    """    将传入的message追加写入到file_path指定的文件中    请先创建文件所在的目录    :param file_path: 要写入的文件路径    :param message: 要写入的信息    :return:    """    file_handle = open(file_path, a)    file_handle.writelines(message)    # 追加一个换行以方便浏览    file_handle.writelines(chr(13))    file_handle.close()def get_mysql_connection():    """    根据默认配置返回数据库连接    :return: 数据库连接    """    conn = MySQLdb.connect(            host=Default_MySQL_Host,            port=Default_MySQL_Port,            user=Default_MySQL_User,            passwd=Default_MySQL_Password,            connect_timeout=Default_MySQL_Connect_TimeOut,            charset=Default_MySQL_Charset,            db=Transfer_Database_Name    )    return conndef mysql_exec(sql_script, sql_param=None):    """    执行传入的脚本,返回影响行数    :param sql_script:    :param sql_param:    :return: 脚本最后一条语句执行影响行数    """    try:        conn = get_mysql_connection()        print_info_message("在服务器{0}上执行脚本:{1}".format(                conn.get_host_info(), sql_script))        cursor = conn.cursor()        if sql_param is not None:            cursor.execute(sql_script, sql_param)        else:            cursor.execute(sql_script)        affect_rows = cursor.rowcount        conn.commit()        cursor.close()        conn.close()        return affect_rows    except Exception as ex:        cursor.close()        conn.rollback()        raise Exception(str(ex))def mysql_exec_many(sql_script_list):    """    执行传入的脚本,返回影响行数    :param sql_script_list: 要执行的脚本List,List中每个元素为sql_script, sql_param对    :return: 返回执行每个脚本影响的行数列表    """    try:        conn = get_mysql_connection()        exec_result_list = []        for sql_script, sql_param in sql_script_list:            print_info_message("在服务器{0}上执行脚本:{1}".format(                    conn.get_host_info(), sql_script))            cursor = conn.cursor()            if sql_param is not None:                cursor.execute(sql_script, sql_param)            else:                cursor.execute(sql_script)            affect_rows = cursor.rowcount            exec_result_list.append("影响行数:{0}".format(affect_rows))        conn.commit()        cursor.close()        conn.close()        return exec_result_list    except Exception as ex:        cursor.close()        conn.rollback()        raise Exception(str(ex))def mysql_query(sql_script, sql_param=None):    """    执行传入的SQL脚本,并返回查询结果    :param sql_script:    :param sql_param:    :return: 返回SQL查询结果    """    try:        conn = get_mysql_connection()        print_info_message("在服务器{0}上执行脚本:{1}".format(                conn.get_host_info(), sql_script))        cursor = conn.cursor()        if sql_param != ‘‘:            cursor.execute(sql_script, sql_param)        else:            cursor.execute(sql_script)        exec_result = cursor.fetchall()        cursor.close()        conn.close()        return exec_result    except Exception as ex:        cursor.close()        conn.close()        raise Exception(str(ex))def get_column_info_list(table_name):    sql_script = """DESC {0}""".format(table_name)    column_info_list = []    query_result = mysql_query(sql_script=sql_script, sql_param=None)    for row in query_result:        column_name = row[0]        column_key = row[3]        column_info = column_name, column_key        column_info_list.append(column_info)    return column_info_listdef get_id_range():    """    按照传入的表获取要删除数据最大ID、最小ID、删除总行数    :return: 返回要删除数据最大ID、最小ID、删除总行数    """    global Transfer_Condition    global Transfer_Rows_Per_Batch    sql_script = """SELECTMAX(ID) AS MAX_ID,MIN(ID) AS MIN_ID,COUNT(1) AS Total_CountFROM {0}WHERE {1};""".format(Transfer_Source_Table_Name, Transfer_Condition)    query_result = mysql_query(sql_script=sql_script, sql_param=None)    max_id, min_id, total_count = query_result[0]    # 此处有一坑,可能出现total_count不为0 但是max_id 和min_id 为None的情况    # 因此判断max_id和min_id 是否为NULL    if (max_id is None) or (min_id is None):        max_id, min_id, total_count = 0, 0, 0    return max_id, min_id, total_countdef check_env():    try:        source_columns_info_list = get_column_info_list(Transfer_Source_Table_Name)        target_columns_info_list = get_column_info_list(Transfer_Target_Table_Name)        if len(source_columns_info_list) != len(target_columns_info_list):            print_info("源表和目标表的列数不对,不满足迁移条件")            return False        column_count = len(source_columns_info_list)        id_flag = False        for column_id in range(column_count):            source_column_name, source_column_key = source_columns_info_list[column_id]            target_column_name, target_column_key = target_columns_info_list[column_id]            if source_column_name != target_column_name:                print_info("源表和目标表的列名不匹配,不满足迁移条件")                return False            if source_column_name.lower() == id                     and source_column_key.lower() == pri                     and target_column_name.lower() == id                     and target_column_key.lower() == pri:                id_flag = True        if not id_flag:            print_info("未找到为主键的id列,不满足迁移条件")            return False        return True    except Exception as ex:        print_info("执行出现异常,异常为{0}".format(ex.message))        return Falsedef main():    flag = check_env()    if not flag:        return    loop_trans_data()def trans_data(current_min_id, current_max_id):    global Transfer_Source_Table_Name    global Transfer_Target_Table_Name    global Transfer_Condition    global Transfer_Rows_Per_Batch    print_info_message("*" * 70)    copy_data_script = """INSERT INTO {0}SELECT * FROM {1}WHERE ID>={2}AND ID<{3}AND {4} ;""".format(Transfer_Target_Table_Name, Transfer_Source_Table_Name, current_min_id, current_max_id, Transfer_Condition)    delete_data_script = """DELETE FROM {0}WHERE ID IN (SELECT IDFROM {1}WHERE ID>={2}AND ID<{3})AND ID>={4}AND ID<{5};""".format(Transfer_Source_Table_Name, Transfer_Target_Table_Name, current_min_id, current_max_id, current_min_id,           current_max_id)    sql_script_list = []    tem_sql_script = copy_data_script, None    sql_script_list.append(tem_sql_script)    tem_sql_script = delete_data_script, None    sql_script_list.append(tem_sql_script)    exec_result_list = mysql_exec_many(sql_script_list)    print_info_message("执行结果:")    for item in exec_result_list:        print_info_message(item)def loop_trans_data():    max_id, min_id, total_count = get_id_range()    if min_id == max_id:        print_info_message("无数据需要结转")        return    current_min_id = min_id    global Transfer_Rows_Per_Batch    while current_min_id <= max_id:        current_max_id = current_min_id + Transfer_Rows_Per_Batch        trans_data(current_min_id, current_max_id)        current_percent = (current_max_id - min_id) * 100.0 / (max_id - min_id)        left_rows = max_id - current_max_id        if left_rows < 0:            left_rows = 0        current_percent_str = "%.2f" % current_percent        info = "当前复制进度{0}/{1},剩余{2},进度为{3}%".format(current_max_id,                                                    max_id, left_rows,                                                    current_percent_str)        print_info_message(info)        time.sleep(Sleep_Second_Per_Batch)        current_min_id = current_max_id    print_info_message("*" * 70)    print_info_message("执行完成")if __name__ == __main__:    main()
View Code

 

按照各位场景的,需要修改数据库连接信息:

技术分享

还有需要迁移表的信息:

技术分享

 

生成测试数据的mysql脚本:

技术分享
CREATE TABLE `tb2001` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `c1` varchar(200) DEFAULT NULL,  `dt` datetime DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;create table tb2001_his like tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM mysql.user;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;insert tb2001(c1,dt) select abc,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
View Code

 

最终运行结果如下:

技术分享

显示简单粗暴,有兴趣的可以在此基础上修改!

=================================================================

技术分享

python--同一mysql数据库下批量迁移数据