首页 > 代码库 > SqlDataAdapter 更新插入 与 InsertBulkCopy

SqlDataAdapter 更新插入 与 InsertBulkCopy

近日做项目,涉及多个数据库多个表的关联更新,因数据量巨大,逐条更新也很费时。于是就想用SqlDataAdapter 一次提交一批数据过去。以下是自己经历的坑:

1. table Merge 部分

 

 DataTable dtCBBill = DbHelper.ExecuteDataAdapter(SqlHelper.cbBill, pars, strConnOldCBBill);
 DataTable dtMember = DbHelper.ExecuteDataAdapter(SqlHelper.accounts_m, null, strConnMember);
 DataTable dtUser = DbHelper.ExecuteDataAdapter(SqlHelper.accounts_u2, null, strConnWeb);
 //筛选出有账单的用户
 DataTable dtM = (from c in dtMember.AsEnumerable()
                   join r in dtCBBill.AsEnumerable() on c["f_accounts"] equals r["f_accounts"]
                   select c).CopyToDataTable();
 DataTable dtU = (from c in dtUser.AsEnumerable()
                   join r in dtCBBill.AsEnumerable() on c["f_accounts"] equals r["f_accounts"]
                   select c).CopyToDataTable();
 //设置主键加快合并速度
 SetPrimaryKey(dtCBBill);
 SetPrimaryKey(dtM);
 SetPrimaryKey(dtU);
 MergeTable(dtM, dtCBBill);
 MergeTable(dtU, dtCBBill);
。。。。dtContribution

Merge提示<target>.f_CBMoney 和 <source>.f_CBMoney 的属性冲突: DataType 属性不匹配。几经调试,发现,源dtCBBill的一个字段是decimal,而对应合并的目的表dtContribution相应字段是int 类型。修改sql语句,cast(某字段 as int) 解决此类异常。网上有说

在调用dt1.Merge(dt2)的时候,由于两个serverid字段类型不一致,一个int32,一个int64,导致无法Merge。用importRow的方式就可以合并了:

private DataTable MergeTable(DataTable dest, DataTable source)
{
      DataRow[] sourceRows = source.Select();
      for (int i = 0; i < sourceRows.Length; i++)
      {
          dest.ImportRow(sourceRows[i]);
       }
       return dest;
 }

这种方法需要改动代码多,就没有尝试。
2. InsertBulkCopy 部分

public static int InsertBulkCopy(string connectionString, DataTable dt,string destTable)
{
    using (SqlConnection Connection = new SqlConnection(connectionString))
    {
        Connection.Open();
        using (SqlTransaction transaction = Connection.BeginTransaction())
        {
            using (SqlBulkCopy sqlbulkcopy = new SqlBulkCopy((SqlConnection)Connection, SqlBulkCopyOptions.KeepIdentity, transaction))
            {
               sqlbulkcopy.DestinationTableName = destTable;
               for (int i = 0; i < dt.Columns.Count; i++)
               {
                   sqlbulkcopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);
                }
                sqlbulkcopy.BatchSize = 10000;
                try
                {
                    sqlbulkcopy.WriteToServer(dt);
                    transaction.Commit();
                    return 1;
                 }
                 catch(Exception ex)
                 {
                    transaction.Rollback();
                    return 0;
                 }
              }
        }
    }
}

必须要保证 source table 与 destination table 字段完全一致,包括字段的大小写。因为大小写不一样,如 f_employee 与 f_Employee,直接会导致插入失败。

3. 最重要的 SqlDataAdapter

SqlDataAdapter adapter = new SqlDataAdapter();
SqlConnection conn = new SqlConnection(strConnCash);
adapter.SelectCommand = new SqlCommand(SqlHelper.selCmdText, conn);
SqlParameter parameter1 = adapter.SelectCommand.Parameters.Add("@f_date", SqlDbType.DateTime);
parameter1.Value = date;

DataTable dtContri = new DataTable();
adapter.Fill(dtContri);

adapter.UpdateBatchSize = 1000;
adapter.UpdateCommand = new SqlCommand(SqlHelper.updCmdText, conn);
adapter.UpdateCommand.Parameters.Add("@f_CBMoney", SqlDbType.Int, 4, "f_CBMoney");
adapter.UpdateCommand.Parameters.Add("@f_CBMresult", SqlDbType.Int, 4, "f_CBMresult");
adapter.UpdateCommand.Parameters.Add("@f_CBNum", SqlDbType.Int, 4, "f_CBNum");
SqlParameter[] paramters = new SqlParameter[]
{
       new SqlParameter("@f_date",SqlDbType.DateTime,8,"f_date"),
       new SqlParameter("@f_accounts",SqlDbType.VarChar,20,"f_accounts")
};
paramters[0].SourceVersion = DataRowVersion.Original;
paramters[1].SourceVersion = DataRowVersion.Original;
                            adapter.UpdateCommand.Parameters.AddRange(paramters);
adapter.UpdateCommand.UpdatedRowSource = UpdateRowSource.None;

adapter.InsertCommand = new SqlCommand(SqlHelper.insCmdText, conn);
adapter.InsertCommand.Parameters.Add("@f_accounts", SqlDbType.VarChar, 20, "f_accounts");
adapter.InsertCommand.Parameters.Add("@f_date", SqlDbType.DateTime, 4, "f_date");
adapter.InsertCommand.Parameters.Add("@f_CBMoney", SqlDbType.Int, 4, "f_CBMoney");
adapter.InsertCommand.Parameters.Add("@f_CBMresult", SqlDbType.Int, 4, "f_CBMresult");
adapter.InsertCommand.Parameters.Add("@f_CBNum", SqlDbType.Int, 4, "f_CBNum");
                            adapter.InsertCommand.Parameters.Add("@f_StupeSurplus", SqlDbType.Int, 4, "f_StupeSurplus");
adapter.InsertCommand.Parameters.Add("@f_status", SqlDbType.TinyInt, 1, "f_status");
adapter.InsertCommand.UpdatedRowSource = UpdateRowSource.None;

//将ds数据同步到dtContri表
SetPrimaryKey(dtContri);
MergeTableManual( dtCBBill,ref dtContri);                           
                        
//执行更新插入                           
int r = adapter.Update(dtContri);
log.Info(site + " CB update&insert: " + r);
/// <summary>
/// 设置主键
/// </summary>
/// <param name="dt"></param>
private void SetPrimaryKey(DataTable dt)
{
   dt.PrimaryKey = new DataColumn[] { dt.Columns["f_accounts"] };
}

/// <summary>
/// 合并表数据
/// </summary>
/// <param name="source"></param>
/// <param name="target"></param>
private void MergeTable(DataTable source, DataTable target, bool isAddSchema = true)
{
    target.Merge(source, false, isAddSchema ? MissingSchemaAction.Add : MissingSchemaAction.Ignore);
}

/// <summary>
/// 手动合并表数据
/// </summary>
/// <param name="source"></param>
/// <param name="target"></param>
private void MergeTableManual(DataTable source,ref DataTable target)
{
   foreach (DataRow item in source.Rows)
   {
      var row = target.Rows.Find(item["f_accounts"]);            
      if (row != null)
      {
         row["f_CBMoney"] = item["f_CBMoney"];
         row["f_CBMresult"] = item["f_CBMresult"];
         row["f_CBNum"] = item["f_CBNum"];
       }
       else
       {
          var newRow = target.NewRow();
          newRow["f_accounts"] = item["f_accounts"];
          newRow["f_date"] = item["f_date"];
          newRow["f_CBMoney"] = item["f_CBMoney"];
          newRow["f_CBMresult"] = item["f_CBMresult"];
          newRow["f_CBNum"] = item["f_CBNum"];
          newRow["f_StupeSurplus"] = item["f_StupeSurplus"];
          newRow["f_status"] = item["f_status"];
          target.Rows.Add(newRow);
        }
    } 
}

这其中坑最多。按照 https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/updating-data-sources-with-dataadapters  这篇最有价值的文章,发现日志记录更新数目总是为0。 然后自己又做个demo,一点一点拆开研究,dataTable.SaveChanges 只是把源表最近的更新同步到离线表中,不可用。但离线表,取其中的一DataRow,改变下数据,甚至原数据再更新都能Update成功,最后通过打印RowState属性值,得到,用dataTable1.Merge(dataTable2,...)方式合并来的数据,RowState 为unchanged。遂手动Merge。运行,OK。终于搞定了。

SqlDataAdapter 更新插入 与 InsertBulkCopy