Django之事务操作、悲观锁和乐观锁

在日常使用Django开发中,一般牵涉到多表操作,我们都会使用事务,但是你会在事务里使用select_for_update或者自己实现乐观锁吗?

事务处理(transaction)对于Web应用开发至关重要, 它可以维护数据库的完整性, 使整个系统更加安全。比如用户A通过网络转账给用户B,数据库里A账户中的钱已经扣掉,而B账户在接收过程中服务器突然发生了宕机,这时数据库里的数据就不完整了。加入事务处理机制后,如果在一连续交易过程中发生任何意外, 程序将回滚,从而保证数据的完整性。

事务的四大特性(ACID)

如果想要说明一个数据库或者一个框架支持事务性操作,则必须要满足下面的四大特性:

原子性Atomicity):整个事务中的所有操作,要么全部完成,要么全部不完成。事务在执行过程中发生错误,会被回滚到事务开始前的状态。

一致性Consistency):事务开始之前和事务结束后,数据库的完整性约束没有被破坏。

隔离性Isolation):隔离性是指当多个用户并发访问数据库时,比如同时访问一张表,数据库每一个用户开启的事务,不能被其他事务所做的操作干扰,多个并发事务之间,应当相互隔离。

持久性Durability):事务执行成功后,该事务对数据库的更改是持久保存在数据库中的,不会被回滚。

Django默认事务行为

Django是支持事务操作的,它的默认事务行为是自动提交,具体表现形式为:每次数据库操作(比如调用save()方法)会立即被提交到数据库中。但是如果你希望把连续的SQL操作包裹在一个事务里,就需要手动开启事务。

全局开启事务

在Web应用中,常用的事务处理方式是将每次请求都包裹在一个事务中。全局开启事务只需要将数据库的配置项ATOMIC_REQUESTS设置为True,如下所示:

1
2
3
4
5
6
7
8
9
10
11
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'db1',
'HOST': 'dbhost',
'PORT': '3306',
'USER': 'dbuser',
'PASSWORD': 'password',
#全局开启事务,绑定的是http请求响应整个过程
'ATOMIC_REQUESTS': True,
}

它的工作原理是这样的:每当有请求过来时,Django会在调用视图方法前开启一个事务。如果完成了请求处理并正确返回了结果,Django就会提交该事务。否则,Django会回滚该事务。

如果你全局开启了事务,你仍然可以使用non_atomic_requests装饰器让某些视图方法不受事务控制,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
from django.db import transaction


@transaction.non_atomic_requests
def my_view(request):
do_stuff()


# 如有多个数据库,让使用otherdb的视图不受事务控制
@transaction.non_atomic_requests(using='otherdb')
def my_other_view(request):
do_stuff_on_the_other_database()

虽然全局开启事务很简单,但Django并不推荐开启全局事务。因为一旦将事务跟 HTTP 请求绑定到一起时,每一个请求都会开启事务,当访问量增长到一定的时候会造成很大的性能损耗。在实际开发过程中,很多GET请求根本不涉及到事务操作,一个更好的方式是局部开启事务按需使用。

局部开启事务

Django项目中局部开启事务,可以借助于transaction.atomic方法。使用它我们就可以创建一个具备原子性的代码块,一旦代码块正常运行完毕,所有的修改会被提交到数据库。反之,如果有异常,更改会被回滚。

atomic经常被当做装饰器来使用,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 案例一:函数视图
from django.db import transaction

@transaction.atomic
def viewfunc(request):
# This code executes inside a transaction.
do_stuff()


# 案例二:基于类的视图
from django.db import transaction
from rest_framework.views import APIView


class OrderAPIView(APIView):
# 开启事务,当方法执行完以后,自动提交事务
@transaction.atomic
def post(self, request):
pass

使用了atomic装饰器,整个视图方法里的代码块都会包裹着一个事务中运行。有时我们希望只对视图方法里一小段代码使用事务,这时可以使用transaction.atomic()显式地开启事务,如下所示:

1
2
3
4
5
6
7
8
9
10
11
from django.db import transaction


def viewfunc(request):
# 默认自动提交
do_stuff()

# 显式地开启事务
with transaction.atomic():
# 下面这段代码在事务中执行
do_more_stuff()

Savepoint回滚

在事务操作中,我们还会经常显式地设置保存点(savepoint)。一旦发生异常或错误,我们使用savepoint_rollback方法让程序回滚到指定的保存点。如果没有问题,就使用savepoint_commit方法提交事务。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from django.db import transaction


def viewfunc(request):
# 默认自动提交
do_stuff()


# 显式地开启事务
with transaction.atomic():
# 创建事务保存点
sid = transaction.savepoint()


try:
do_more_stuff()
except Exception as e:
# 如发生异常,回滚到指定地方。
transaction.savepoint_rollback(sid)
# 如果没有异常,显式地提交一次事务
transaction.savepoint_commit(sid)


return HttpResponse("Success")

注意:虽然SQLite支持保存点,但是sqlite3 模块设计中的缺陷使它们很难使用。

事务提交后回调函数

有的时候我们希望当前事务提交后立即执行额外的任务,比如客户下订单后立即邮件通知卖家,这时可以使用Django提供的on_commit方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 例1
from django.db import transaction


def do_something():
pass # send a mail, invalidate a cache, fire off a Celery task, etc.


transaction.on_commit(do_something)


# 例2:调用celery异步任务
transaction.on_commit(lambda: some_celery_task.delay('arg1'))

悲观锁与乐观锁

在电商秒杀等高并发场景中,仅仅开启事务还是无法避免数据冲突。比如用户A和用户B获取某一商品的库存并尝试对其修改,A, B查询的商品库存都为5件,结果A下单5件,B也下单5件,这就出现问题了。解决方案就是操作( 查询或修改)某个商品库存信息时对其加锁。

常见的锁有悲观锁和乐观锁,接下来我们来看下在Django项目中如何通过代码实现:

悲观锁

悲观锁就是在操作数据时,假定此操作会出现数据冲突,在整个数据处理过程中,使数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制实现的。

概念:

总是假设最坏的情况,每次取数据时都认为其他线程会修改,所以都会加锁(读锁、写锁、行锁等)

当其他线程想要访问数据时,都需要阻塞挂起。可以依靠数据库实现,如行锁、读锁和写锁等,都是在操作之前加锁

保证同一时刻只有一个线程能操作数据,其他线程则会被 block

运用场景:

  • 无脏读  上锁数据保证一致, 因此无脏读, 对脏读不允许的环境悲观锁可以胜任
  • 无并行  悲观锁对事务成功性可以保证, 但是会对数据加锁导致无法实现数据的并行处理.
  • 事务成功率高  上锁保证一次成功, 因此在对数据处理的成功率要求较高的时候更适合悲观锁.
  • 开销大  悲观锁的上锁解锁是有开销的, 如果超大的并发量这个开销就不容小视, 因此不适合在高并发环境中使用悲观锁
  • 一次性完成  如果乐观锁多次尝试的代价比较大,也建议使用悲观锁, 悲观锁保证一次成功

乐观锁

乐观锁是指操作数据库时想法很乐观,认为这次的操作不会导致冲突,在操作数据时,并不进行任何其他的特殊处理, 而在进行更新时,再去判断是否有冲突了。乐观锁不是数据库提供的锁,需要我们自己去实现。

Django实现悲观锁

Django中使用悲观锁锁定一个对象,需要使用select_for_update()方法。它本质是一个行级锁,能锁定所有匹配的行,直到事务结束。两个应用示例如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 案例1:类视图,锁定id=10的SKU对象
class OrderView(APIView):


@transaction.atomic
def post(self, request):
# select_for_update表示锁,只有获取到锁才会执行查询,否则阻塞等待。
sku = GoodsSKU.objects.select_for_update().get(id=10)


# 等事务提交后,会自动释放锁。
return Response("xxx")


# 案例2:函数视图,锁定所有符合条件的文章对象列表。
from django.db import transaction


with transaction.atomic():
entries = Entry.objects.select_for_update().filter(author=request.user)
for entry in entries:
...

select_for_update()这个方法有两个默认参数,nowait=Falseskip_locked=False

nowait的含义是匹配的记录被锁时不等待,会抛异常。但是 MySQL8.0 以前不支持。

skip_locked的含义是SELECT时跳过被锁的记录。

select_for_update()方法必须应用在事务中,可利用@transaction.atomic()装饰器包裹视图函数

一般情况下如果其他事务锁定了相关行,那么本次查询将被阻塞,直到锁被释放。如果不想要使查询阻塞的话,使用select_for_update(nowait=True)

当你同时使用select_for_updateselect_related方法时,select_related指定的相关对象也会被锁定。你可以通过select_for_update(of=(...))方法指定需要锁定的关联对象,如下所示:

1
2
# 只会锁定entry(self)和category,不会锁定作者author
entries = Entry.objects.select_related('author', 'category'). select_for_update(of=('self', 'category'))

注意:

select_for_update方法必须与事务(transaction)同时使用。

MySQL版本要在8.0.1+ 以上才支持 nowaitof选项。

Django实现乐观锁

乐观锁实现一般使用记录版本号,为数据表增加一个版本标识(version)字段,每次对数据的更新操作成功后都对版本号执行+1操作。每次执行更新操作时都去判断当前版本号是不是该条数据的最新版本号,如果不是说明数据已经同时被修改过了,则丢弃更新,需要重新获取目标对象再进行更新。

Django项目中实现乐观锁可以借助于django-concurrency这个第三方库, 它可以给模型增加一个version字段,每次执行save操作时会自动给版本号+1。

1
2
3
4
5
6
7
from django.db import models
from concurrency.fields import IntegerVersionField


class ConcurrentModel( models.Model ):
version = IntegerVersionField( )
name = models.CharField(max_length=100)

下例中a和b同时获取了pk=1的模型对象信息,并尝试对其name字段进行修改。由于a.save()方法调用成功以后对象的版本号version已经加1,b再调用b.save()方法时将会报RecordModifiedError的错误,这样避免了a,b同时修改同一对象信息造成数据冲突。

1
2
3
4
5
6
7
8
9
10
a = ConcurrentModel.objects.get(pk=1)
a.name = '1'


b = ConcurrentModel.objects.get(pk=1)
b.name = '2'


a.save()
b.save()

Django实例模拟创建订单场景

事务+悲观锁

查询的时候加锁,事务结束时才会释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# 前端传递的参数:地址id:addr_id,支付方式:pay_method,购买的商品id字符串:sku_ids
class OrderCommitView(View):
"""订单创建"""
@transaction.atomic # 订单创建,数据库操作使用事务
def post(self, request):
"""订单创建"""
# 判断用户是否登录
user = request.user
if not user.is_authenticated():
# 用户未登录
return JsonResponse({"res": 0, "errmsg": "用户未登录"})

# 接收数据
addr_id = request.POST.get("addr_id")
pay_method = request.POST.get("pay_method")
sku_ids = request.POST.get("sku_ids")

# 校验数据
if not all([addr_id, pay_method, sku_ids]):
return JsonResponse({"res": 1, "errmsg": "参数不完整"})

# 校验支付方式
if pay_method not in OrderInfo.PAY_METHODS.keys():
return JsonResponse({"res": 2, "errmsg": "非法的支付方式"})

# 校验地址
try:
addr = Address.objects.get(id=addr_id)
except Address.DoesNotExist:
return JsonResponse({"res": 3, "errmsg": "地址非法"})

# todo: 创建订单核心业务

# 组织参数
# 订单id:20201005143016+用户id
order_id = datetime.now().strftime("%Y%m%d%H%M%S") + str(user.id)

# 总数目和总金额
total_count = 0
total_price = 0

# 运费
transit_price = 10

# 设置事务保存点
save_id = transaction.savepoint()

try:
# todo: 向df_order_info表中添加一条记录
order = OrderInfo.objects.create(order_id=order_id,
user=user,
addr=addr,
pay_method=pay_method,
total_count=total_count,
total_price=total_price,
transit_price=transit_price)

# todo: 用户订单中有几个商品,就需要向df_order_goods中加几条记录
conn = get_redis_connection("default")
cart_key = "cart_{}".format(user.id)

sku_ids = sku_ids.split(",")
for sku_id in sku_ids:
# 获取商品的信息
try:
# 此处使用悲观锁:select * from df_goods_sku where id=sku_id for update;
# 当事务结束后,锁才会释放
sku = GoodsSKU.objects.select_for_update().get(id=sku_id)
except GoodsSKU.DoesNotExist:
# 商品不存在
transaction.savepoint_rollback(save_id) # 回滚到事务保存点
return JsonResponse({"res": 4, "errmsg": "商品不存在"})

# 测试悲观锁的代码
# print("user:{},stock:{}".format(user.id, sku.stock))
# import time
# time.sleep(10)

# 从redis中获取用户所要购买的商品的数量
count = conn.hget(cart_key, sku_id)

# todo: 判断商品的库存
if int(count) > sku.stock:
transaction.savepoint_rollback(save_id) # 回滚到事务保存点
return JsonResponse({"res": 6, "errmsg": "商品库存不足"})

# todo: 向 df_order_goods表中插入一条记录
OrderGoods.objects.create(order=order,
sku=sku,
count=count,
price=sku.price)

# todo: 更新商品的库存和销量
sku.stock -= int(count)
sku.sales += int(count)
sku.save()

# todo: 累加计算订单商品的总数量和总价格
total_count += int(count)
amount = sku.price * int(count)
total_price += amount

# todo: 更新订单信息表中的商品总数量和总价格
order.total_count = total_count
order.total_price = total_price
# todo: 根据订单总价格更新运费
if total_price >= 80:
order.transit_price = 0
order.save()

except Exception as err:
transaction.savepoint_rollback(save_id) # 回滚到事务保存点
return JsonResponse({"res": 7, "errmsg": "下单失败"})

# 提交事务
transaction.savepoint_commit(save_id)

# todo: 清除用户购物车中对应的记录
conn.hdel(cart_key, *sku_ids)

# 返回Json数据
return JsonResponse({"res": 5, "message": "创建成功"})

事务+乐观锁

修改mysql事务隔离级别为:可读取提交内容(READ COMMITTED),查询的时候不加锁,但是在更新的时候做一个判断,如果跟之前库存一样才更新,不一样不更新,但是有的时候库存量还够,只是被修改了,所以为了避免这种情况,我们需要对遍历3次,如果3次都不一样,直接回滚事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# 前端传递的参数:地址id:addr_id,支付方式:pay_method,购买的商品id字符串:sku_ids
# 事务+乐观锁的使用
class OrderCommitView(View):
"""订单创建"""

@transaction.atomic # 订单创建,数据库操作使用事务
def post(self, request):
"""订单创建"""
# 判断用户是否登录
user = request.user
if not user.is_authenticated():
# 用户未登录
return JsonResponse({"res": 0, "errmsg": "用户未登录"})

# 接收数据
addr_id = request.POST.get("addr_id")
pay_method = request.POST.get("pay_method")
sku_ids = request.POST.get("sku_ids")

# 校验数据
if not all([addr_id, pay_method, sku_ids]):
return JsonResponse({"res": 1, "errmsg": "数据不完整"})

# 校验支付方式
if pay_method not in OrderInfo.PAY_METHODS.keys():
return JsonResponse({"res": 2, "errmsg": "非法的支付方式"})

# 校验地址
try:
addr = Address.objects.get(id=addr_id)
except Address.DoesNotExist:
return JsonResponse({"res": 3, "errmsg": "地址非法"})

# todo: 创建订单核心业务

# 组织参数
# 订单id:20201005143016+用户id
order_id = datetime.now().strftime("%Y%m%d%H%M%S") + str(user.id)

# 总数目和总金额
total_count = 0
total_price = 0

# 运费
transit_price = 10

# 设置事务保存点
save_id = transaction.savepoint()

try:
# todo: 向df_order_info表中添加一条记录
order = OrderInfo.objects.create(order_id=order_id,
user=user,
addr=addr,
pay_method=pay_method,
total_count=total_count,
total_price=total_price,
transit_price=transit_price)

# todo: 用户订单中有几个商品,就需要向df_order_goods中加几条记录
conn = get_redis_connection("default")
cart_key = "cart_{}".format(user.id)

sku_ids = sku_ids.split(",")
for sku_id in sku_ids:
for i in range(3):
# 获取商品的信息
try:
sku = GoodsSKU.objects.get(id=sku_id)
except GoodsSKU.DoesNotExist:
# 商品不存在
transaction.savepoint_rollback(save_id) # 回滚到事务保存点
return JsonResponse({"res": 4, "errmsg": "商品不存在"})

# 从redis中获取用户所要购买的商品的数量
count = conn.hget(cart_key, sku_id)

# todo: 判断商品的库存
if int(count) > sku.stock:
transaction.savepoint_rollback(save_id) # 回滚到事务保存点
return JsonResponse({"res": 6, "errmsg": "商品库存不足"})

# todo: 更新商品的库存和销量
orgin_stock = sku.stock
orgin_sales = sku.sales
new_stock = orgin_stock - int(count)
new_sales = orgin_sales + int(count)

# 测试乐观锁的代码
# print("user:{},times:{},stock:{}".format(user.id, i, sku.stock))
# import time
# time.sleep(10)

# update df_goods_sku set stock=new_stock, sales=new_sales
# where id = sku_id and stock = orgin_stock;
# filter返回的查询集可以使用update方法,返回的是受影响的行数
res = GoodsSKU.objects.filter(id=sku_id, stock=orgin_stock) \
.update(stock=new_stock, sales=new_sales)
if res == 0:
if i == 2:
# 尝试的第三次
transaction.savepoint_rollback(save_id) # 回滚事务
return JsonResponse({"res": 7, "errmsg": "下单失败2"})

continue

# todo: 向 df_order_goods表中插入一条记录
OrderGoods.objects.create(order=order,
sku=sku,
count=count,
price=sku.price)

# todo: 累加计算订单商品的总数量和总价格
total_count += int(count)
amount = sku.price * int(count)
total_price += amount

# 一次就更新成功,跳出循环
break

# todo: 更新订单信息表中的商品总数量和总价格
order.total_count = total_count
order.total_price = total_price
# todo: 根据订单总价格更新运费
if total_price >= 80:
order.transit_price = 0
order.save()

except Exception as err:
transaction.savepoint_rollback(save_id) # 回滚到事务保存点
return JsonResponse({"res": 7, "errmsg": "下单失败"})

# 提交事务
transaction.savepoint_commit(save_id)

# todo: 清除用户购物车中对应的记录
conn.hdel(cart_key, *sku_ids)

# 返回Json数据
return JsonResponse({"res": 5, "message": "创建成功"})

总结

那么问题来了,什么时候该用悲观锁,什么时候该用乐观锁呢?这主要需要考虑4个因素:

  • 并发量:如果并发量不大且不允许脏读,可以使用悲观锁解决并发问题;但如果系统的并发非常大的话,悲观锁定会带来非常大的性能问题, 建议乐观锁。

  • 响应速度:如果需要非常高的响应速度,建议采用乐观锁方案,成功就执行,不成功就失败,不需要等待其他并发去释放锁。乐观锁并未真正加锁,效率高。

  • 冲突频率:如果冲突频率非常高,建议采用悲观锁,保证成功率。冲突频率大,选择乐观锁会需要多次重试才能成功,代价比较大。

  • 重试代价:如果重试代价大,建议采用悲观锁。悲观锁依赖数据库锁,效率低。更新失败的概率比较低。

即在冲突比较少的时候,使用乐观锁,因为乐观锁省去了加锁释放锁的开销,提高性能;冲突比较多的时候,使用悲观锁,减少遍历次数,乐观锁重复代价比较大的时候使用悲观锁!