首页 > 代码库 > c#连接vertica数据库

c#连接vertica数据库

 

项目前期使用mysql数据库,大约每天200w数据量,十天1500w数据量之后,读取写入都会很慢,而且经常锁表,后来采用vertica数据库,下面分享vertica数据使用方法,以及大批量数据快速写入数据库的方法:

1,c#里如何使用

    下载类库:https://my.vertica.com/download/vertica/client-drivers/

   技术分享

   项目中引用:

   技术分享

代码:

 1                VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]); 2                 { 3                     conn.Open(); 4                     VerticaTransaction txn = conn.BeginTransaction(); 5                     VerticaCommand cmd = new VerticaCommand(); 6                     cmd.Connection = conn; 7                     cmd.Transaction = txn; 8                     string deleteSourceData = http://www.mamicode.com/@"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city=‘" + city + ""; 9                     cmd.CommandText = deleteSourceData;10                     cmd.CommandTimeout = 90;11                     cmd.ExecuteNonQuery();12                 }

2,大量数据快速批量插入vertica数据库方法

    sqlserver中常见插入方法:

    insert into table1(a,b) valuse (‘Cust1‘, ‘Smith Company‘),(‘Cust2‘, ‘Perform Company‘)

    vertica里不支持这种写法,但是支持如下:

    insert into table1(a,b)  select a,b,c from table2 union select  ‘Cust1‘, ‘Smith Company‘

   这种写法速度并不快,更快速的方法是使用copy,copy可以从txt文件,也可以是内存流:    

   1)从txt: copy 表名 from ‘/indexdata/tmp/conv_ebc_esf_housedimension.txt‘ DELIMITER E‘\t‘ ESCAPE AS ‘\‘ ENCLOSED BY ‘"‘ DIRECT EXCEPTIONS ‘/indexdata/tmp/housedimension.log‘; 
  
    第一个txt,是数据来源,可以把数据写入txt,列之间用tab分隔,行之间是回车,此种写法未经过代码验证。
 
   2) 从内存流 :
       思路就是:取出大量数据,写入MemoryStream内存流,加载进技术分享, copy语句使用语法:
 
       copy {0}{1} from stdin record terminator E‘{2}‘ delimiter E‘{3}‘ enforcelength no commit 
 
       0:表名 ,1:列名,2:列之间分隔符‘\n‘    ,3:行之间分隔符‘\t‘
 
       stdin 就是指输入缓冲区
 
具体代码:
 1         /// <summary> 2         /// 批量插入数据到vertica数据库 3         /// </summary> 4         /// <param name="dt">数据源</param> 5         /// <param name="strTableName">插入的目标表名</param> 6         /// <param name="time">日期(删除数据使用)</param> 7         /// <param name="city">城市(删除数据使用)</param> 8         public static void BulkCopy(DataTable dt, string strTableName, DateTime time,string city) 9         {10             if (dt == null || dt.Rows.Count == 0)11             {12                 return;13             }14             if (dt.Columns.Count == 0)15             {16                 throw new Exception("The length of column cannot be zero.");17             }18             //从datatable中获取列名19             List<string> lstField = new List<string>();20             for (int colIndex = 0; colIndex < dt.Columns.Count; colIndex++)21             {22                 lstField.Add(dt.Columns[colIndex].ColumnName);23             }24             string strFiledList = string.Format("({0})", string.Join(",", lstField.ToArray()));25             //拼写copy语句26             const char RowSplit = \n;27             const char ColSplit = \t;28 29             string strCopyStatement = string.Format("copy {0}{1} from stdin record terminator E‘{2}‘ delimiter E‘{3}‘ enforcelength no commit",30                 strTableName, strFiledList, RowSplit, ColSplit);31             //按照copy语句中的分隔符,分隔数据源32             StringBuilder sbText = new StringBuilder();33             foreach (DataRow dr in dt.Rows)34             {35                 bool bFirstField = true;36                 for (int colIndex = 0; colIndex < dt.Columns.Count; colIndex++)37                 {38                     string strVal = GetDataString(dr, colIndex);39                     if (bFirstField)40                     {41                         sbText.Append(strVal);42                         bFirstField = false;43                     }44                     else45                     {46                         sbText.AppendFormat("{0}{1}", ColSplit, strVal);47                     }48                 }49                 sbText.Append(RowSplit);50             }51             //数据源写入内存流52             string strTemp = sbText.ToString();53             byte[] buff = Encoding.UTF8.GetBytes(strTemp);54             using (MemoryStream ms = new MemoryStream())55             {56                 ms.Write(buff, 0, buff.Length);57                 ms.Flush();58                 ms.Position = 0;59                 //建立vertica数据库连接60                 VerticaConnection conn = new VerticaConnection(ConfigurationManager.AppSettings["VerticalTest"]);61                 {62                     conn.Open();63                     VerticaTransaction txn = conn.BeginTransaction();64                     Vertica.Data.VerticaClient.VerticaCopyStream vcs = new VerticaCopyStream(conn, strCopyStatement);65                     //插入数据前,先删除重复数据66                     VerticaCommand cmd = new VerticaCommand();67                     cmd.Connection = conn;68                     cmd.Transaction = txn;69                     string deleteSourceData = http://www.mamicode.com/@"Delete from " + strTableName + " where sampledate = " + time.ToString("yyyyMMdd") + " and city=‘" + city + "";70                     cmd.CommandText = deleteSourceData;71                     cmd.CommandTimeout = 90;72                     cmd.ExecuteNonQuery();73                     //批量插入数据74                     vcs.Start();75                     vcs.AddStream(ms, false);76                     vcs.Execute();77 78                     long insertedCount = vcs.Finish();79 80                     IList<long> lstRejected = vcs.Rejects;81                     if (lstRejected.Count > 0)82                     {83                         txn.Rollback();84                         conn.Close();85 86                         // Maybe need more detail info to show87                         throw new Exception("Bulk copy failure.");88                     }89                     else90                     {91                         txn.Commit();92                         conn.Close();93                     }94                 }95 96                 ms.Close();97             }98         }

 

 

c#连接vertica数据库