首页 > 代码库 > csv2mysql

csv2mysql

import os
import re
import sys
import csv
import time
import argparse
import collections
import MySQLdb
import warnings 
# suppress annoying mysql warnings
warnings.filterwarnings(action=ignore, category=MySQLdb.Warning) 



def get_type(s):
    """Find type for this string
    """
    # try integer type
    try:
        v = int(s)
    except ValueError:
        pass
    else:
        if abs(v) > 2147483647:
            return bigint
        else:
            return int
    # try float type
    try:
        float(s)
    except ValueError:
        pass
    else:
        return double

    # check for timestamp
    dt_formats = (
        (%Y-%m-%d %H:%M:%S, datetime),
        (%Y-%m-%d %H:%M:%S.%f, datetime),
        (%Y-%m-%d, date),
        (%H:%M:%S, time),
    )
    for dt_format, dt_type in dt_formats:
        try:
            time.strptime(s, dt_format)
        except ValueError:
            pass
        else:
            return dt_type
   
    # doesn‘t match any other types so assume text
    if len(s) > 255:
        return text
    else:
        return varchar(255)


def most_common(l, default=varchar(255)):
    """Return most common value from list
    """
    # some formats trump others
    if l:
        for dt_type in (text, bigint):
            if dt_type in l:
                return dt_type
        return max(l, key=l.count)
    return default


def get_col_types(input_file, max_rows=1000):
    """Find the type for each CSV column
    """
    csv_types = collections.defaultdict(list)
    reader = csv.reader(open(input_file))
    # test the first few rows for their data types
    for row_i, row in enumerate(reader):
        if row_i == 0:
            header = row
        else:
            for col_i, s in enumerate(row):
                data_type = get_type(s)
                csv_types[header[col_i]].append(data_type)
 
        if row_i == max_rows:
            break

    # take the most common data type for each row
    return [most_common(csv_types[col]) for col in header]


def get_schema(table, header, col_types):
    """Generate the schema for this table from given types and columns
    """
    schema_sql = """CREATE TABLE IF NOT EXISTS %s ( 
        id int NOT NULL AUTO_INCREMENT,""" % table 

    for col_name, col_type in zip(header, col_types):
        schema_sql += \n%s %s, % (col_name, col_type)

    schema_sql += """\nPRIMARY KEY (id)
        ) DEFAULT CHARSET=utf8;"""
    return schema_sql


def get_insert(table, header):
    """Generate the SQL for inserting rows
    """
    field_names = , .join(header)
    field_markers = , .join(%s for col in header)
    return INSERT INTO %s (%s) VALUES (%s); %         (table, field_names, field_markers)


def format_header(row):
    """Format column names to remove illegal characters and duplicates
    """
    safe_col = lambda s: re.sub(\W+, _, s.lower()).strip(_)
    header = []
    counts = collections.defaultdict(int)
    for col in row:
        col = safe_col(col)
        counts[col] += 1
        if counts[col] > 1:
            col = {}{}.format(col, counts[col])
        header.append(col)
    return header


def main(input_file, user, password, host, table, database, max_inserts=10000):
    print "Importing `%s‘ into MySQL database `%s.%s‘" % (input_file, database, table)
    db = MySQLdb.connect(host=host, user=user, passwd=password, charset=utf8)
    cursor = db.cursor()
    # create database and if doesn‘t exist
    cursor.execute(CREATE DATABASE IF NOT EXISTS %s; % database)
    db.select_db(database)

    # define table
    print Analyzing column types ...
    col_types = get_col_types(input_file)
    print col_types

    header = None
    for i, row in enumerate(csv.reader(open(input_file))):
        if header:
            while len(row) < len(header):
                row.append(‘‘) # this row is missing columns so pad blank values
            cursor.execute(insert_sql, row)
            if i % max_inserts == 0:
                db.commit()
                print commit
        else:
            header = format_header(row)
            schema_sql = get_schema(table, header, col_types)
            print schema_sql
            # create table
            cursor.execute(DROP TABLE IF EXISTS %s; % table)
            cursor.execute(schema_sql)
            # create index for more efficient access
            try:
                cursor.execute(CREATE INDEX ids ON %s (id); % table)
            except MySQLdb.OperationalError:
                pass # index already exists

            print Inserting rows ...
            # SQL string for inserting data
            insert_sql = get_insert(table, header)

    # commit rows to database
    print Committing rows to database ...
    db.commit()
    print Done!



if __name__ == __main__:
    parser = argparse.ArgumentParser(description=Automatically insert CSV contents into MySQL)
    parser.add_argument(--table, dest=table, help=Set the name of the table. If not set the CSV filename will be used)
    parser.add_argument(--database, dest=database, default=test, help=Set the name of the database. If not set the test database will be used)
    parser.add_argument(--user, dest=user, default=root, help=The MySQL login username)
    parser.add_argument(--password, dest=password, default=‘‘, help=The MySQL login password)
    parser.add_argument(--host, dest=host, default=localhost, help=The MySQL host)
    parser.add_argument(input_file, help=The input CSV file)
    args = parser.parse_args(sys.argv[1:])
    if not args.table:
        # use input file name for table
        args.table = os.path.splitext(os.path.basename(args.input_file))[0]
    
    main(args.input_file, args.user, args.password, args.host, args.table, args.database)

具体使用例子如下

[root@server1]# python csv2mysql.py --host=172.20.197.61 --user=PdYRxGWNpVRCQfHj --password=RX5a5YsViQcDdywr --database=cf_dc61100a_92a7_43ca_81dd_2a7e3fa0808a --table=performance_history_2 performance_history_2.csv
Importing `performance_history_2.csv‘ into MySQL database `cf_dc61100a_92a7_43ca_81dd_2a7e3fa0808a.performance_history_2‘
Analyzing column types ...
[‘datetime‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘, ‘varchar(255)‘]
CREATE TABLE IF NOT EXISTS performance_history_2 (
id int NOT NULL AUTO_INCREMENT,
date_time datetime,
write_bw_mb_s varchar(255),
read_bw_mb_s varchar(255),
write_iops varchar(255),
read_iops varchar(255),
write_latency_usec varchar(255),
read_latency_usec varchar(255),
avg_latency_usec varchar(255),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
Inserting rows ...
commit
commit
commit
commit
commit
commit
commit
commit
commit
commit
commit
Committing rows to database ...
Done!

csv2mysql