首页 > 代码库 > SqlDataAdapter 更新插入 与 InsertBulkCopy
SqlDataAdapter 更新插入 与 InsertBulkCopy
近日做项目,涉及多个数据库多个表的关联更新,因数据量巨大,逐条更新也很费时。于是就想用SqlDataAdapter 一次提交一批数据过去。以下是自己经历的坑:
1. table Merge 部分
DataTable dtCBBill = DbHelper.ExecuteDataAdapter(SqlHelper.cbBill, pars, strConnOldCBBill); DataTable dtMember = DbHelper.ExecuteDataAdapter(SqlHelper.accounts_m, null, strConnMember); DataTable dtUser = DbHelper.ExecuteDataAdapter(SqlHelper.accounts_u2, null, strConnWeb); //筛选出有账单的用户 DataTable dtM = (from c in dtMember.AsEnumerable() join r in dtCBBill.AsEnumerable() on c["f_accounts"] equals r["f_accounts"] select c).CopyToDataTable(); DataTable dtU = (from c in dtUser.AsEnumerable() join r in dtCBBill.AsEnumerable() on c["f_accounts"] equals r["f_accounts"] select c).CopyToDataTable(); //设置主键加快合并速度 SetPrimaryKey(dtCBBill); SetPrimaryKey(dtM); SetPrimaryKey(dtU); MergeTable(dtM, dtCBBill); MergeTable(dtU, dtCBBill);
。。。。dtContribution
Merge提示<target>.f_CBMoney 和 <source>.f_CBMoney 的属性冲突: DataType 属性不匹配。几经调试,发现,源dtCBBill的一个字段是decimal,而对应合并的目的表dtContribution相应字段是int 类型。修改sql语句,cast(某字段 as int) 解决此类异常。网上有说
在调用dt1.Merge(dt2)的时候,由于两个serverid字段类型不一致,一个int32,一个int64,导致无法Merge。用importRow的方式就可以合并了:
private DataTable MergeTable(DataTable dest, DataTable source) { DataRow[] sourceRows = source.Select(); for (int i = 0; i < sourceRows.Length; i++) { dest.ImportRow(sourceRows[i]); } return dest; }
这种方法需要改动代码多,就没有尝试。
2. InsertBulkCopy 部分
public static int InsertBulkCopy(string connectionString, DataTable dt,string destTable) { using (SqlConnection Connection = new SqlConnection(connectionString)) { Connection.Open(); using (SqlTransaction transaction = Connection.BeginTransaction()) { using (SqlBulkCopy sqlbulkcopy = new SqlBulkCopy((SqlConnection)Connection, SqlBulkCopyOptions.KeepIdentity, transaction)) { sqlbulkcopy.DestinationTableName = destTable; for (int i = 0; i < dt.Columns.Count; i++) { sqlbulkcopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName); } sqlbulkcopy.BatchSize = 10000; try { sqlbulkcopy.WriteToServer(dt); transaction.Commit(); return 1; } catch(Exception ex) { transaction.Rollback(); return 0; } } } } }
必须要保证 source table 与 destination table 字段完全一致,包括字段的大小写。因为大小写不一样,如 f_employee 与 f_Employee,直接会导致插入失败。
3. 最重要的 SqlDataAdapter
SqlDataAdapter adapter = new SqlDataAdapter(); SqlConnection conn = new SqlConnection(strConnCash); adapter.SelectCommand = new SqlCommand(SqlHelper.selCmdText, conn); SqlParameter parameter1 = adapter.SelectCommand.Parameters.Add("@f_date", SqlDbType.DateTime); parameter1.Value = date; DataTable dtContri = new DataTable(); adapter.Fill(dtContri); adapter.UpdateBatchSize = 1000; adapter.UpdateCommand = new SqlCommand(SqlHelper.updCmdText, conn); adapter.UpdateCommand.Parameters.Add("@f_CBMoney", SqlDbType.Int, 4, "f_CBMoney"); adapter.UpdateCommand.Parameters.Add("@f_CBMresult", SqlDbType.Int, 4, "f_CBMresult"); adapter.UpdateCommand.Parameters.Add("@f_CBNum", SqlDbType.Int, 4, "f_CBNum"); SqlParameter[] paramters = new SqlParameter[] { new SqlParameter("@f_date",SqlDbType.DateTime,8,"f_date"), new SqlParameter("@f_accounts",SqlDbType.VarChar,20,"f_accounts") }; paramters[0].SourceVersion = DataRowVersion.Original; paramters[1].SourceVersion = DataRowVersion.Original; adapter.UpdateCommand.Parameters.AddRange(paramters); adapter.UpdateCommand.UpdatedRowSource = UpdateRowSource.None; adapter.InsertCommand = new SqlCommand(SqlHelper.insCmdText, conn); adapter.InsertCommand.Parameters.Add("@f_accounts", SqlDbType.VarChar, 20, "f_accounts"); adapter.InsertCommand.Parameters.Add("@f_date", SqlDbType.DateTime, 4, "f_date"); adapter.InsertCommand.Parameters.Add("@f_CBMoney", SqlDbType.Int, 4, "f_CBMoney"); adapter.InsertCommand.Parameters.Add("@f_CBMresult", SqlDbType.Int, 4, "f_CBMresult"); adapter.InsertCommand.Parameters.Add("@f_CBNum", SqlDbType.Int, 4, "f_CBNum"); adapter.InsertCommand.Parameters.Add("@f_StupeSurplus", SqlDbType.Int, 4, "f_StupeSurplus"); adapter.InsertCommand.Parameters.Add("@f_status", SqlDbType.TinyInt, 1, "f_status"); adapter.InsertCommand.UpdatedRowSource = UpdateRowSource.None; //将ds数据同步到dtContri表 SetPrimaryKey(dtContri); MergeTableManual( dtCBBill,ref dtContri); //执行更新插入 int r = adapter.Update(dtContri); log.Info(site + " CB update&insert: " + r);
/// <summary> /// 设置主键 /// </summary> /// <param name="dt"></param> private void SetPrimaryKey(DataTable dt) { dt.PrimaryKey = new DataColumn[] { dt.Columns["f_accounts"] }; } /// <summary> /// 合并表数据 /// </summary> /// <param name="source"></param> /// <param name="target"></param> private void MergeTable(DataTable source, DataTable target, bool isAddSchema = true) { target.Merge(source, false, isAddSchema ? MissingSchemaAction.Add : MissingSchemaAction.Ignore); } /// <summary> /// 手动合并表数据 /// </summary> /// <param name="source"></param> /// <param name="target"></param> private void MergeTableManual(DataTable source,ref DataTable target) { foreach (DataRow item in source.Rows) { var row = target.Rows.Find(item["f_accounts"]); if (row != null) { row["f_CBMoney"] = item["f_CBMoney"]; row["f_CBMresult"] = item["f_CBMresult"]; row["f_CBNum"] = item["f_CBNum"]; } else { var newRow = target.NewRow(); newRow["f_accounts"] = item["f_accounts"]; newRow["f_date"] = item["f_date"]; newRow["f_CBMoney"] = item["f_CBMoney"]; newRow["f_CBMresult"] = item["f_CBMresult"]; newRow["f_CBNum"] = item["f_CBNum"]; newRow["f_StupeSurplus"] = item["f_StupeSurplus"]; newRow["f_status"] = item["f_status"]; target.Rows.Add(newRow); } } }
这其中坑最多。按照 https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/updating-data-sources-with-dataadapters 这篇最有价值的文章,发现日志记录更新数目总是为0。 然后自己又做个demo,一点一点拆开研究,dataTable.SaveChanges 只是把源表最近的更新同步到离线表中,不可用。但离线表,取其中的一DataRow,改变下数据,甚至原数据再更新都能Update成功,最后通过打印RowState属性值,得到,用dataTable1.Merge(dataTable2,...)方式合并来的数据,RowState 为unchanged。遂手动Merge。运行,OK。终于搞定了。
SqlDataAdapter 更新插入 与 InsertBulkCopy