首页 > 代码库 > MongoDB 操作手册CRUD 事务 两步提交

MongoDB 操作手册CRUD 事务 两步提交

执行两步提交

概述

这部分提供了多记录更新或者多记录事务,使用两步提交来完成多记录写入的模板。另外,可以扩展此方法来提供rollback-like功能。

背景

MongoDB对于单条记录的操作是原子性的;但是涉及多条记录的操作却不是原子性的。由于记录可能是相当复杂,并且有内嵌记录,单记录原子性操作提供了实际中常用的必要支持。
除了单记录的原子性操作,还有许多情况需要多记录操作事务,当执行一个包含一些列操作的事务时,就有以下要求:
原子性:如果一个操作失败,事务中之前的操作需要回滚到之前的状态
一致性:如果一个重大失误,比如网络故障,硬件故障,中断了事务,数据库必须能够恢复到之前的状态
对于需要多记录操作的事务,可以在应用中实现两步提交的方法,来提供多记录更新支持。使用这种方法保证了一致性,并且万一出现错误,事务的执行状态是可恢复的。然而在这个过程中,记录处于未定的数据和状态。
注意:因为MongoDB只有单记录操作是原子性的,两步提交只能提供语义上的“类事务”功能。对于应用来说,使其能够回到在两步提交中的某个状态的中间数据或者回滚数据。

模板

考虑以下情景:

要将资金从账户A转移到账户B,在关系型数据库中,可以在一个事务中从A中减去资金,同时在B中加上。在MongoDB中,可以模拟两步提交来获得相同结果。

这个例子使用两个集合
1.accounts,用于存储账户信息
2.transactions,用于存储资金转移事务的信息

初始化账户信息

db.accounts.insert(
[
{ _id: "A", balance: 1000, pendingTransactions: [] },
{ _id: "B", balance: 1000, pendingTransactions: [] }
]
);

初始化转账记录

对于每次资金转移操作,将转账信息添加到transactions集合中,插入的记录包含以下信息:
source和destination字段,引用自ccounts集合中的_id字段
value字段,声明转移数值
state字段,表明当前转移状态,值可以是initial,pending, applied, done, canceling, 或者 canceled.
lastModified字段,反应最后修改日期

从A转账100到B,初始化transactions记录:
db.transactions.insert({ _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() });

使用两步提交进行转账

1.从transactions集合中,找到state为initial的记录。此时transactions集合中只有一条记录,即刚插入的那条。在包含其他记录的集合中,除非你声明了其他查询条件,否则这个查询将返回任何state为initial的记录。
var t = db.transactions.findOne( { state: "initial" } );
在MongoDB的shell中输入t,查看t的内容,类似于:
{ "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified":??}
2.更新事务状态为pending
设置state为pending,lastModified为当前时间
db.transactions.update(
{ _id: t._id, state: "initial" },
{
$set: { state: "pending" },
$currentDate: { lastModified: true }
}
)

在更新操作中,state:‘initial‘确保没有其他进程已经更新了这条记录。如果nMatched和nModified是0,回到第一步,获取一个新的事务,重新开始这个过程。
3.在两个账户中应用该事务
使用update方法将事务t应用到两个账户中。在更新条件中,包含条件pendingTransactions:{$ne:t._id},以避免重复应用该事务。
db.accounts.update(
{ _id: t.source, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
)
db.accounts.update(
{ _id: t.destination, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
)

从A账号减去t.value,给B账户加上t.value,同时给每个账户的pendingTransactions数组添加事务id
4.更新事务状态为applied
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "applied" },
$currentDate: { lastModified: true }
}
)

5.更新账户pendingTransactions数组
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)

从两个账户中移除已应用的事务。
6.更新事务状态为done
db.transactions.update(
{ _id: t._id, state: "applied" },
{
$set: { state: "done" },
$currentDate: { lastModified: true }
}
)

从失败场景中恢复数据

事务最重要的不是以上这个例子提供的原型,而是当事务没有完全执行成功的时候,从各种失败场景中恢复数据的可能性。
恢复操作
两步提交模型允许应用程序重新执行事务操作序列,以保证数据一致性。在程序启动时,或者定时执行恢复操作,来抓取任何未完成的事务。
恢复到数据一致的状态的时间取决于应用程序多久需要恢复每个事务。
以下恢复操作使用lastModified日期作为pending状态的事务是否需要回滚的标识符。如果pending或者applied状态的事务在最近的30分钟内没有被更新,说明这些事务需要被恢复。可以使用不同的条件来决定是否需要恢复。

Pending状态的事务

恢复事务状态在pending之后,applied之前
例:
获取三十分钟内未成功的事务记录
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );

然后回到“3.在两个账户中应用该事务”这一步

Applied状态的事务

例:
获取三十分钟内未成功的事务记录
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );

然后回到“5.更新账户pendingTransactions数组”这一步

回滚操作

有些情况下,可能需要回滚,或者撤销操作,比如,应用程序需要取消事务,或者其中一个账户不存在或者被冻结。

Applied状态的事务

在“4.更新事务状态为applied”这一步之后,不应该再回滚事务,而是应该完成当前事务,然后创建一个新的事务来把数据修改回来。

Pending状态的事务

在“2.更新事务状态为pending”这一步之后,“4.更新事务状态为applied”这一步之前,可以通过以下步骤回滚事务:
1.更新事务状态为取消中
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "canceling" },
$currentDate: { lastModified: true }
}
)

2.在两个账户中取消操作
如果事务已经应用,需要回退这个事务以取消在两个账户上的操作。在更新的条件中,包含pendingTransactions:t._id,以便在pending transaction已经被应用的时候更新账户。
更新目标账户,减去事务中给其增加的值,cong pendingTransactions数组中移除事务_id
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{
$inc: { balance: -t.value },
$pull: { pendingTransactions: t._id }
}
)

如果pending transaction还没有被应用到这个账户中,将不会有记录匹配查询条件。
3.更新事务状态为已取消
db.transactions.update(
{ _id: t._id, state: "canceling" },
{
$set: { state: "cancelled" },
$currentDate: { lastModified: true }
}
)

更新事务状态为cancelled来标志事务已取消。

多应用情景

由于事务的存在,多个应用可以同时创建和执行操作,而不会产生数据不一致或者冲突。在之前的例子中,更新或者回滚记录,包含state字段的更新条件防止不同应用重复提交事务
例如,app1和app2同时获取了一个在initial状态的事务。app1在app2开始前提交了整个事务。当app2试图更新事务状态为pending的时候,包含state:‘initial‘的更新条件将不会匹配任何记录,
同时nMatched和nModified将为0.这就表明app2需要回到第一步,重启一个不同的事务过程。
当多个应用运行的时候,关键在有只有一个应用可以及时处理指定的事务。这样的话,即使在有符合更新条件的记录,也可以在事务记录中创建一个标记来标志应用正在处理这个事务。使用findAndModify()方法来修改事务,并且回退。
t = db.transactions.findAndModify(
{
query: { state: "initial", application: { $exists: false } },
update:
{
$set: { state: "pending", application: "App1" },
$currentDate: { lastModified: true }
},
new: true
}
)

修正后的事务操作确保只有标识符匹配的应用可以提交该事务。
如果app1在事务执行中失败,可以使用之前讲的恢复操作,但是应用程序需要在应用事务之前确保“拥有”该事务。
例如,查找并恢复pending状态的job
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
db.transactions.find(
{
application: "App1",
state: "pending",
lastModified: { $lt: dateThreshold }
}
)

在生产环境中使用两步提交

以上的例子是有意写的很简单。例如,它假设一个账户的回滚操作总是可能的,并且账户可以保存负值。
生产环境可能会更加负值,通常来说,账户需要当前账户值,信用,欠款等多种信息。
对于所有事务,要确保使用的是write concern权限等级。

MongoDB 操作手册CRUD 事务 两步提交