在日常使用Django
开发中,一般牵涉到多表操作,我们都会使用事务,但是你会在事务里使用select_for_update
或者自己实现乐观锁吗?
事务处理(transaction)对于Web应用开发至关重要, 它可以维护数据库的完整性, 使整个系统更加安全。比如用户A通过网络转账给用户B,数据库里A账户中的钱已经扣掉,而B账户在接收过程中服务器突然发生了宕机,这时数据库里的数据就不完整了。加入事务处理机制后,如果在一连续交易过程中发生任何意外, 程序将回滚,从而保证数据的完整性。
事务的四大特性(ACID)
如果想要说明一个数据库或者一个框架支持事务性操作,则必须要满足下面的四大特性:
原子性(Atomicity):整个事务中的所有操作,要么全部完成,要么全部不完成。事务在执行过程中发生错误,会被回滚到事务开始前的状态。
一致性 (Consistency):事务开始之前和事务结束后,数据库的完整性约束没有被破坏。
隔离性(Isolation):隔离性是指当多个用户并发访问数据库时,比如同时访问一张表,数据库每一个用户开启的事务,不能被其他事务所做的操作干扰,多个并发事务之间,应当相互隔离。
持久性(Durability):事务执行成功后,该事务对数据库的更改是持久保存在数据库中的,不会被回滚。
Django
默认事务行为
Django
是支持事务操作的,它的默认事务行为是自动提交,具体表现形式为:每次数据库操作(比如调用save()
方法)会立即被提交到数据库中。但是如果你希望把连续的SQL
操作包裹在一个事务里,就需要手动开启事务。
全局开启事务
在Web应用中,常用的事务处理方式是将每次请求都包裹在一个事务中。全局开启事务只需要将数据库的配置项ATOMIC_REQUESTS
设置为True
,如下所示:
1 2 3 4 5 6 7 8 9 10 11
| DATABASES = { 'default': { 'ENGINE': 'django.db.backends.mysql', 'NAME': 'db1', 'HOST': 'dbhost', 'PORT': '3306', 'USER': 'dbuser', 'PASSWORD': 'password', 'ATOMIC_REQUESTS': True, }
|
它的工作原理是这样的:每当有请求过来时,Django
会在调用视图方法前开启一个事务。如果完成了请求处理并正确返回了结果,Django
就会提交该事务。否则,Django
会回滚该事务。
如果你全局开启了事务,你仍然可以使用non_atomic_requests
装饰器让某些视图方法不受事务控制,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12
| from django.db import transaction @transaction.non_atomic_requests def my_view(request): do_stuff()
@transaction.non_atomic_requests(using='otherdb') def my_other_view(request): do_stuff_on_the_other_database()
|
虽然全局开启事务很简单,但Django
并不推荐开启全局事务。因为一旦将事务跟 HTTP
请求绑定到一起时,每一个请求都会开启事务,当访问量增长到一定的时候会造成很大的性能损耗。在实际开发过程中,很多GET请求根本不涉及到事务操作,一个更好的方式是局部开启事务按需使用。
局部开启事务
Django
项目中局部开启事务,可以借助于transaction.atomic
方法。使用它我们就可以创建一个具备原子性的代码块,一旦代码块正常运行完毕,所有的修改会被提交到数据库。反之,如果有异常,更改会被回滚。
atomic
经常被当做装饰器来使用,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from django.db import transaction
@transaction.atomic def viewfunc(request): do_stuff()
from django.db import transaction from rest_framework.views import APIView
class OrderAPIView(APIView): @transaction.atomic def post(self, request): pass
|
使用了atomic
装饰器,整个视图方法里的代码块都会包裹着一个事务中运行。有时我们希望只对视图方法里一小段代码使用事务,这时可以使用transaction.atomic()
显式地开启事务,如下所示:
1 2 3 4 5 6 7 8 9 10 11
| from django.db import transaction
def viewfunc(request): do_stuff()
with transaction.atomic(): do_more_stuff()
|
Savepoint
回滚
在事务操作中,我们还会经常显式地设置保存点(savepoint
)。一旦发生异常或错误,我们使用savepoint_rollback
方法让程序回滚到指定的保存点。如果没有问题,就使用savepoint_commit
方法提交事务。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| from django.db import transaction
def viewfunc(request): do_stuff()
with transaction.atomic(): sid = transaction.savepoint()
try: do_more_stuff() except Exception as e: transaction.savepoint_rollback(sid) transaction.savepoint_commit(sid)
return HttpResponse("Success")
|
注意:虽然SQLite
支持保存点,但是sqlite3
模块设计中的缺陷使它们很难使用。
事务提交后回调函数
有的时候我们希望当前事务提交后立即执行额外的任务,比如客户下订单后立即邮件通知卖家,这时可以使用Django
提供的on_commit
方法,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13
| from django.db import transaction
def do_something(): pass
transaction.on_commit(do_something)
transaction.on_commit(lambda: some_celery_task.delay('arg1'))
|
悲观锁与乐观锁
在电商秒杀等高并发场景中,仅仅开启事务还是无法避免数据冲突。比如用户A和用户B获取某一商品的库存并尝试对其修改,A, B查询的商品库存都为5件,结果A下单5件,B也下单5件,这就出现问题了。解决方案就是操作( 查询或修改)某个商品库存信息时对其加锁。
常见的锁有悲观锁和乐观锁,接下来我们来看下在Django
项目中如何通过代码实现:
悲观锁
悲观锁就是在操作数据时,假定此操作会出现数据冲突,在整个数据处理过程中,使数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制实现的。
概念:
总是假设最坏的情况,每次取数据时都认为其他线程会修改,所以都会加锁(读锁、写锁、行锁等)
当其他线程想要访问数据时,都需要阻塞挂起。可以依靠数据库实现,如行锁、读锁和写锁等,都是在操作之前加锁
保证同一时刻只有一个线程能操作数据,其他线程则会被 block
运用场景:
- 无脏读 上锁数据保证一致, 因此无脏读, 对脏读不允许的环境悲观锁可以胜任
- 无并行 悲观锁对事务成功性可以保证, 但是会对数据加锁导致无法实现数据的并行处理.
- 事务成功率高 上锁保证一次成功, 因此在对数据处理的成功率要求较高的时候更适合悲观锁.
- 开销大 悲观锁的上锁解锁是有开销的, 如果超大的并发量这个开销就不容小视, 因此不适合在高并发环境中使用悲观锁
- 一次性完成 如果乐观锁多次尝试的代价比较大,也建议使用悲观锁, 悲观锁保证一次成功
乐观锁
乐观锁是指操作数据库时想法很乐观,认为这次的操作不会导致冲突,在操作数据时,并不进行任何其他的特殊处理, 而在进行更新时,再去判断是否有冲突了。乐观锁不是数据库提供的锁,需要我们自己去实现。
Django
实现悲观锁
Django
中使用悲观锁锁定一个对象,需要使用select_for_update()
方法。它本质是一个行级锁,能锁定所有匹配的行,直到事务结束。两个应用示例如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class OrderView(APIView):
@transaction.atomic def post(self, request): sku = GoodsSKU.objects.select_for_update().get(id=10)
return Response("xxx")
from django.db import transaction
with transaction.atomic(): entries = Entry.objects.select_for_update().filter(author=request.user) for entry in entries: ...
|
select_for_update()
这个方法有两个默认参数,nowait=False
和skip_locked=False
nowait
的含义是匹配的记录被锁时不等待,会抛异常。但是 MySQL8.0
以前不支持。
skip_locked
的含义是SELECT
时跳过被锁的记录。
select_for_update()
方法必须应用在事务中,可利用@transaction.atomic()
装饰器包裹视图函数
一般情况下如果其他事务锁定了相关行,那么本次查询将被阻塞,直到锁被释放。如果不想要使查询阻塞的话,使用select_for_update(nowait=True)
。
当你同时使用select_for_update
与select_related
方法时,select_related
指定的相关对象也会被锁定。你可以通过select_for_update(of=(...))
方法指定需要锁定的关联对象,如下所示:
1 2
| entries = Entry.objects.select_related('author', 'category'). select_for_update(of=('self', 'category'))
|
注意:
select_for_update
方法必须与事务(transaction
)同时使用。
MySQL
版本要在8.0.1+ 以上才支持 nowait
和of
选项。
Django
实现乐观锁
乐观锁实现一般使用记录版本号,为数据表增加一个版本标识(version)字段,每次对数据的更新操作成功后都对版本号执行+1操作。每次执行更新操作时都去判断当前版本号是不是该条数据的最新版本号,如果不是说明数据已经同时被修改过了,则丢弃更新,需要重新获取目标对象再进行更新。
Django
项目中实现乐观锁可以借助于django-concurrency
这个第三方库, 它可以给模型增加一个version字段,每次执行save操作时会自动给版本号+1。
1 2 3 4 5 6 7
| from django.db import models from concurrency.fields import IntegerVersionField
class ConcurrentModel( models.Model ): version = IntegerVersionField( ) name = models.CharField(max_length=100)
|
下例中a和b同时获取了pk=1的模型对象信息,并尝试对其name字段进行修改。由于a.save()
方法调用成功以后对象的版本号version已经加1,b再调用b.save()
方法时将会报RecordModifiedError
的错误,这样避免了a,b同时修改同一对象信息造成数据冲突。
1 2 3 4 5 6 7 8 9 10
| a = ConcurrentModel.objects.get(pk=1) a.name = '1'
b = ConcurrentModel.objects.get(pk=1) b.name = '2'
a.save() b.save()
|
Django
实例模拟创建订单场景
事务+悲观锁
查询的时候加锁,事务结束时才会释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| class OrderCommitView(View): """订单创建""" @transaction.atomic def post(self, request): """订单创建""" user = request.user if not user.is_authenticated(): return JsonResponse({"res": 0, "errmsg": "用户未登录"}) addr_id = request.POST.get("addr_id") pay_method = request.POST.get("pay_method") sku_ids = request.POST.get("sku_ids") if not all([addr_id, pay_method, sku_ids]): return JsonResponse({"res": 1, "errmsg": "参数不完整"}) if pay_method not in OrderInfo.PAY_METHODS.keys(): return JsonResponse({"res": 2, "errmsg": "非法的支付方式"}) try: addr = Address.objects.get(id=addr_id) except Address.DoesNotExist: return JsonResponse({"res": 3, "errmsg": "地址非法"}) order_id = datetime.now().strftime("%Y%m%d%H%M%S") + str(user.id) total_count = 0 total_price = 0 transit_price = 10 save_id = transaction.savepoint() try: order = OrderInfo.objects.create(order_id=order_id, user=user, addr=addr, pay_method=pay_method, total_count=total_count, total_price=total_price, transit_price=transit_price) conn = get_redis_connection("default") cart_key = "cart_{}".format(user.id) sku_ids = sku_ids.split(",") for sku_id in sku_ids: try: sku = GoodsSKU.objects.select_for_update().get(id=sku_id) except GoodsSKU.DoesNotExist: transaction.savepoint_rollback(save_id) return JsonResponse({"res": 4, "errmsg": "商品不存在"}) count = conn.hget(cart_key, sku_id) if int(count) > sku.stock: transaction.savepoint_rollback(save_id) return JsonResponse({"res": 6, "errmsg": "商品库存不足"}) OrderGoods.objects.create(order=order, sku=sku, count=count, price=sku.price) sku.stock -= int(count) sku.sales += int(count) sku.save() total_count += int(count) amount = sku.price * int(count) total_price += amount order.total_count = total_count order.total_price = total_price if total_price >= 80: order.transit_price = 0 order.save() except Exception as err: transaction.savepoint_rollback(save_id) return JsonResponse({"res": 7, "errmsg": "下单失败"}) transaction.savepoint_commit(save_id) conn.hdel(cart_key, *sku_ids) return JsonResponse({"res": 5, "message": "创建成功"})
|
事务+乐观锁
修改mysql
事务隔离级别为:可读取提交内容(READ COMMITTED
),查询的时候不加锁,但是在更新的时候做一个判断,如果跟之前库存一样才更新,不一样不更新,但是有的时候库存量还够,只是被修改了,所以为了避免这种情况,我们需要对遍历3次,如果3次都不一样,直接回滚事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
class OrderCommitView(View): """订单创建""" @transaction.atomic def post(self, request): """订单创建""" user = request.user if not user.is_authenticated(): return JsonResponse({"res": 0, "errmsg": "用户未登录"}) addr_id = request.POST.get("addr_id") pay_method = request.POST.get("pay_method") sku_ids = request.POST.get("sku_ids") if not all([addr_id, pay_method, sku_ids]): return JsonResponse({"res": 1, "errmsg": "数据不完整"}) if pay_method not in OrderInfo.PAY_METHODS.keys(): return JsonResponse({"res": 2, "errmsg": "非法的支付方式"}) try: addr = Address.objects.get(id=addr_id) except Address.DoesNotExist: return JsonResponse({"res": 3, "errmsg": "地址非法"}) order_id = datetime.now().strftime("%Y%m%d%H%M%S") + str(user.id) total_count = 0 total_price = 0 transit_price = 10 save_id = transaction.savepoint() try: order = OrderInfo.objects.create(order_id=order_id, user=user, addr=addr, pay_method=pay_method, total_count=total_count, total_price=total_price, transit_price=transit_price) conn = get_redis_connection("default") cart_key = "cart_{}".format(user.id) sku_ids = sku_ids.split(",") for sku_id in sku_ids: for i in range(3): try: sku = GoodsSKU.objects.get(id=sku_id) except GoodsSKU.DoesNotExist: transaction.savepoint_rollback(save_id) return JsonResponse({"res": 4, "errmsg": "商品不存在"}) count = conn.hget(cart_key, sku_id) if int(count) > sku.stock: transaction.savepoint_rollback(save_id) return JsonResponse({"res": 6, "errmsg": "商品库存不足"}) orgin_stock = sku.stock orgin_sales = sku.sales new_stock = orgin_stock - int(count) new_sales = orgin_sales + int(count) res = GoodsSKU.objects.filter(id=sku_id, stock=orgin_stock) \ .update(stock=new_stock, sales=new_sales) if res == 0: if i == 2: transaction.savepoint_rollback(save_id) return JsonResponse({"res": 7, "errmsg": "下单失败2"}) continue OrderGoods.objects.create(order=order, sku=sku, count=count, price=sku.price) total_count += int(count) amount = sku.price * int(count) total_price += amount break order.total_count = total_count order.total_price = total_price if total_price >= 80: order.transit_price = 0 order.save() except Exception as err: transaction.savepoint_rollback(save_id) return JsonResponse({"res": 7, "errmsg": "下单失败"}) transaction.savepoint_commit(save_id) conn.hdel(cart_key, *sku_ids) return JsonResponse({"res": 5, "message": "创建成功"})
|
总结
那么问题来了,什么时候该用悲观锁,什么时候该用乐观锁呢?这主要需要考虑4个因素:
并发量:如果并发量不大且不允许脏读,可以使用悲观锁解决并发问题;但如果系统的并发非常大的话,悲观锁定会带来非常大的性能问题, 建议乐观锁。
响应速度:如果需要非常高的响应速度,建议采用乐观锁方案,成功就执行,不成功就失败,不需要等待其他并发去释放锁。乐观锁并未真正加锁,效率高。
冲突频率:如果冲突频率非常高,建议采用悲观锁,保证成功率。冲突频率大,选择乐观锁会需要多次重试才能成功,代价比较大。
重试代价:如果重试代价大,建议采用悲观锁。悲观锁依赖数据库锁,效率低。更新失败的概率比较低。
即在冲突比较少的时候,使用乐观锁,因为乐观锁省去了加锁释放锁的开销,提高性能;冲突比较多的时候,使用悲观锁,减少遍历次数,乐观锁重复代价比较大的时候使用悲观锁!