python实现oauth2.0简易demo

OAuth2.0认证过程是一个三方协作的过程。这三方包括用户、第三方服务器、授权服务器,现在网站大部分为了简化客户操作都采用了第三方登录来实现用户登录。

认证流程

首先我们简单理下整个流程,这里以微信授权为例:

  1. 首先,用户登陆www.diandian100.cn程序。在用户登陆的时候,www.diandian100.cn将用户登陆重定向到微信Authorization Server,并附上www.diandian100.cn在微信申请的client的ID和www.diandian100.cn的回调URI。
  2. 随后,微信Authorization Server要向用户确认是否给www.diandian100.cn授权。方法是让用户向微信Authorization Server提供用户名和密码。
  3. 接着,微信Authorization Server收到用户的授权后,会重定向到www.diandian100.cn的回调URL,并附带微信Authorization code。
  4. 之后,www.diandian100.cn使用微信Authorization code和URI向微信Authorization Server请求token。
  5. 最后,微信Authorization Server验证Authorization code和URI之后,向www.diandian100.cn发送token。

代码实现

本例全部采用flask来模拟www.diandian100.cn和微信Authorization Server,其中微信Authorization Server端口为本地5000, www.diandian100.cn端口为本地5001。

微信Authorization Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Uyynot
# @Email : 646547989@qq.com
# @Time : 2022/7/28 15:34
# @File : service.py
# @Project : djangoProject
# @Desc :
import base64
import random
import time

from flask import Flask, request, redirect

app = Flask(__name__)

users = {
"magigo": ["123456"]
}

users[client_id] = []
auth_code = {}

oauth_redirect_uri = []

def gen_token(uid):
"""生成access_token"""
token = base64.b64encode(':'.join([str(uid), str(random.random()), str(time.time() + 7200)]).encode()).decode()
users[uid].append(token)
return token

def gen_auth_code(uri):
"""生成code"""
code = random.randint(0,10000)
auth_code[code] = uri
return code

def verify_token(token):
"""验证access_token"""
_token = base64.b64decode(token).decode()
if not users.get(_token.split(':')[0])[-1] == token:
return -1
if float(_token.split(':')[-1]) >= time.time():
return 1
else:
return 0


@app.route('/oauth', methods=['POST', 'GET'])
def oauth():
"""认证"""
if request.args.get('user'):
# 如果用户请求用户认证,认证成功则发放code,并重定向至用户访问的系统回调地址
if users.get(request.args.get('user'))[0] == request.args.get('pw') and oauth_redirect_uri:
uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0])
print("uri", uri)
return redirect(uri)
if request.args.get('code'):
# 如果携带了code证明是要请求access_token,认证code成功则发放access_token
if auth_code.get(int(request.args.get('code'))) == request.args.get('redirect_uri'):
return gen_token(request.args.get('client_id'))
if request.args.get('redirect_uri'):
# 如果携带重定向url则记录并提示让用户登录
oauth_redirect_uri.append(request.args.get('redirect_uri'))
return 'please login'

@app.route('/test', methods=['POST', 'GET'])
def test():
"""用户访问协同携带token测试访问用户数据,有则返回"""
token = request.args.get('token')
if verify_token(token) == 1:
return 'data'
else:
return 'error'

if __name__ == '__main__':
app.run(debug=True)

www.diandian100.cn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Uyynot
# @Email : 646547989@qq.com
# @Time : 2022/7/28 18:03
# @File : web.py
# @Project :
# @Desc :
from flask import Flask, request, redirect

app = Flask(__name__)
redirect_uri = 'http://localhost:5001/client/passport'
client_id = '123456'


@app.route('/client/login', methods=['POST', 'GET'])
def client_login():
"""用户登录视图重定向至认证服务器"""
uri = 'http://localhost:5000/oauth?response_type=code&client_id=%s&redirect_uri=%s' % (client_id, redirect_uri)
return redirect(uri)

@app.route('/client/passport', methods=['POST', 'GET'])
def client_passport():
"""默认回调地址根据用户获取的code重定向请求access_token"""
code = request.args.get('code')
uri = 'http://localhost:5000/oauth?grant_type=authorization_code&code=%s&redirect_uri=%s&client_id=%s' % (code, redirect_uri, client_id)
return redirect(uri)

if __name__ == '__main__':
app.run(debug=True, port=5001)

客户端浏览器

实际情况用户根本不用手动发起两次请求,diandian会直接为你重定向好请求连接,你所要做的就是输入账号密码或者扫码即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests
# 登录获取认证地址
r = requests.get('http://localhost:5000/client/login')
print(r.text)
print('=======')
print(r.history)
print('=======')
print(r.url)
print('=======')
# 输入用户名密码请求微信认证
uri_login = r.url.split('?')[0] + '?user=magigo&pw=123456'
print('=======')
print("uri_login:", uri_login)
r2 = requests.get(uri_login)

代码实现流程

实际情况下的oauth2流程

本例只是为了演示oauth2的简单流程,实际上根据各平台参数有些许变动包括认证方式也会更严谨,但是从提供的实例看过来再看实际情况下的认证流程,是不是更清晰了?更详细的解释可以参考阮一峰老师的文章http://www.ruanyifeng.com/blog/2014/05/oauth_2_0.html