python中如何使用rpc

在这篇文章开始之前,问下自己,知道什么是RPC吗?什么场景下要用RPC呢?平时常用的http和这个RPC有什么区别呢?

RPC简介

什么是RPC

远程过程调用(英语:Remote Procedure Call,缩写为 RPC,也叫远程程序调用)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用远程方法调用

从以上解释和图示所看有些同学就会问了,这不就是普通的函数之间相互调用吗,只是两个函数处于不同的计算机,换句话说这就是api接口嘛,是的,它就是接口,只是他不是普通的http接口,接着我们向下继续。

背景和用途

在单台计算机中,我们可以通过程序调用来传递控制和数据;或者说通过程序调用,我们可以将多个程序组成一个整体来实现某个功能。

如果将这种调用机制推广到多台彼此间可以进行网络通讯的计算机,由多台计算机中的多个程序组成一个整体来实现某个功能,这也是可以的。调用的一方(发起远程过程调用,然后调用这方的环境挂起,参数通过网络传递给被调用方,被调用的一方执行程序,当程序执行完成后,产生的结果再通过网络回传给调用的一方,调用的一方恢复继续执行。这样一种原型思想,就是我们所说的RPC远程过程调用。

RPC这种思想最早可以追溯到1976年,RPC的发展到今天已经40年有余了。

如今的计算机应用中,单机性能上很难承受住产品的压力,需要不断扩充多台机器来提升整体的性能。同时为了充分利用这些集群里的计算机,需要对其从架构上进行划分,以提供不同的服务,服务间相互调用完成整个产品的功能。RPC就能帮助我们解决这些服务间的信息传递和调用。

到了这一步,更让我们确信的是,这TMD不还是api接口嘛。是吗?是吗?

RPC概念

关于RPC的概念,我们可以从广义和狭义来分别进行理解。

广义

我们可以将所有通过网络来进行通讯调用的实现统称为RPC。

按照这样来理解的话,那我们发现HTTP其实也算是一种RPC实现。

上面的哥们儿,你没错,api接口就是一种rpc的实现方式。

狭义

区别于HTTP的实现方式,在传输的数据格式上和传输的控制上独立实现。比如在机器间通讯传输的数据不采用HTTP协议的方式(分为起始行、header、body三部份),而是使用自定义格式的二进制方式。

我们更多时候谈到的RPC都是指代这种狭义上的理解。

优缺点

相比于传统HTTP的实现而言:

优点

  • 效率高
  • 发起RPC调用的一方,在编写代码时可忽略RPC的具体实现,如同编写本地函数调用一样

缺点

  • 通用性不如HTTP好 因为传输的数据不是HTTP协议格式,所以调用双方需要专门实现的通信库,对于不同的编程开发语言,都要有相关实现。而HTTP作为一个标准协议,大部分的语言都已有相关的实现,通用性更好。

HTTP更多的面向用户与产品服务器的通讯。

RPC更多的面向产品内部服务器间的通讯。 thrift

RPC结构

RPC的设计思想是力图使远程调用中的通讯细节对于使用者透明,调用双方无需关心网络通讯的具体实现。因而实现RPC要进行一定的封装。

RPC原理上是按如下结构流程进行实现的。

流程:

  1. 调用者(Caller, 也叫客户端、Client)以本地调用的方式发起调用;
  2. Client stub(客户端存根,可理解为辅助助手)收到调用后,负责将被调用的方法名、参数等打包编码成特定格式的能进行网络传输的消息体;
  3. Client stub将消息体通过网络发送给对端(服务端)
  4. Server stub(服务端存根,同样可理解为辅助助手)收到通过网络接收到消息后按照相应格式进行拆包解码,获取方法名和参数;
  5. Server stub根据方法名和参数进行本地调用;
  6. 被调用者(Callee,也叫Server)本地调用执行后将结果返回给server stub;
  7. Server stub将返回值打包编码成消息,并通过网络发送给对端(客户端);
  8. Client stub收到消息后,进行拆包解码,返回给Client;
  9. Client得到本次RPC调用的最终结果。

gRPC

gRPC是谷歌开源的一个 RPC 框架,面向移动和 HTTP/2 设计。

介绍

  • gRPC是由Google公司开源的高性能RPC框架。

  • gRPC支持多语言

    gRPC原生使用C、Java、Go进行了三种实现,而C语言实现的版本进行封装后又支持C++、C#、Node、ObjC、 Python、Ruby、PHP等开发语言

  • gRPC支持多平台

    支持的平台包括:Linux、Android、iOS、MacOS、Windows

  • gRPC的消息协议使用Google自家开源的Protocol Buffers协议机制(proto3) 序列化

  • gRPC的传输使用HTTP/2标准,支持双向流和连接多路复用,性能比http1.1好了很多。

  • 内容交换格式采用ProtoBuf(Google Protocol Buffers),开源已久,提供了一种灵活、高效、自动序列化结构数据的机制,作用与XML,Json类似,但使用二进制,(反)序列化速度快,压缩效率高。

和很多RPC系统一样,服务端负责实现定义好的接口并处理客户端的请求,客户端根据接口描述直接调用需要的服务。客户端和服务端可以分别使用gPRC支持的不同语言实现。

ProtoBuf 具有强大的IDL(interface description language,接口描述语言)和相关工具集(主要是protoc)。用户写好.proto描述文件后,protoc可以将其编译成众多语言的接口代码。

架构

C语言实现的gRPC支持多语言,其架构如下

使用方法

  1. 使用Protocol Buffers(proto3)的IDL接口定义语言定义接口服务,编写在文本文件(以.proto为后缀名)中。
  2. 使用protobuf编译器生成服务器和客户端使用的stub代码
  3. 编写补充服务器和客户端逻辑代码

Protocol Buffers

Protocol Buffers 是一种与语言无关,平台无关的可扩展机制,用于序列化结构化数据。使用Protocol Buffers 可以一次定义结构化的数据,然后可以使用特殊生成的源代码轻松地在各种数据流中使用各种语言编写和读取结构化数据。

现在有许多框架等在使用Protocol Buffers。gRPC也是基于Protocol Buffers。 Protocol Buffers 目前有2和3两个版本号。

在gRPC中推荐使用proto3版本。

文档结构

Protocol Buffers版本

Protocol Buffers文档的第一行非注释行,为版本申明,不填写的话默认为版本2。

1
2
3
syntax = "proto3";
或者
syntax = "proto2";

Package包

Protocol Buffers 可以声明package,来防止命名冲突。 Packages是可选的。

1
2
package foo.bar;
message Open { ... }

使用的时候,也要加上命名空间,

1
2
3
4
5
message Foo {
...
foo.bar.Open open = 1;
...
}

注意:对于Python而言,package会被忽略处理,因为Python中的包是以文件目录来定义的。

导入

Protocol Buffers 中可以导入其它文件消息等,与Python的import类似。

1
import “myproject/other_protos.proto”;

定义各种消息和服务

消息messge是用来定义数据的,服务service是用来gRPC的方法的。

注释

Protocol Buffers 提供以下两种注释方式。

1
2
3
4
5
6
7
8
9
10
// 单行注释
//
//
//
/*
多行注释



*/

数据类型

基本数据类型

.proto说明Python
doublefloat
floatfloat
int32使用变长编码,对负数编码效率低, 如果你的变量可能是负数,可以使用sint32int
int64使用变长编码,对负数编码效率低,如果你的变量可能是负数,可以使用sint64int/long
uint32使用变长编码int/long
uint64使用变长编码int/long
sint32使用变长编码,带符号的int类型,对负数编码比int32高效int
sint64使用变长编码,带符号的int类型,对负数编码比int64高效int/long
fixed324字节编码, 如果变量经常大于2^{28} 的话,会比uint32高效int
fixed648字节编码, 如果变量经常大于2^{56} 的话,会比uint64高效int/long
sfixed324字节编码int
sfixed648字节编码int/long
boolbool
string必须包含utf-8编码或者7-bit ASCII textstr
bytes任意的字节序列str

枚举

在 Proto Buffers 中,我们可以定义枚举和枚举类型,

1
2
3
4
5
6
7
8
9
10
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 4;

枚举定义在一个消息内部或消息外部都是可以的,如果枚举是 定义在 message 内部,而其他 message 又想使用,那么可以通过 MessageType.EnumType 的方式引用。

定义枚举的时候,我们要保证第一个枚举值必须是0,枚举值不能重复,除非使用 option allow_alias = true 选项来开启别名。

1
2
3
4
5
6
enum EnumAllowingAlias {
option allow_alias = true;
UNKNOWN = 0;
STARTED = 1;
RUNNING = 1;
}

枚举值的范围是32-bit integer,但因为枚举值使用变长编码,所以不推荐使用负数作为枚举值,因为这会带来效率问题。

消息类型

Protocol Buffers使用message定义消息数据。在Protocol Buffers中使用的数据都是通过message消息数据封装基本类型数据或其他消息数据,对应Python中的类。

1
2
3
4
5
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
}

字段编号

消息定义中的每个字段都有唯一的编号。这些字段编号用于以消息二进制格式标识字段,并且在使用消息类型后不应更改。 请注意,1到15范围内的字段编号需要一个字节进行编码,包括字段编号和字段类型16到2047范围内的字段编号占用两个字节。因此,您应该为非常频繁出现的消息元素保留数字1到15。请记住为将来可能添加的常用元素留出一些空间。

最小的标识号可以从1开始,最大到2^29 - 1,或 536,870,911。不可以使用其中的[19000-19999]的标识号, Protobuf协议实现中对这些进行了预留。如果非要在.proto文件中使用这些预留标识号,编译时就会报警。同样你也不能使用早期保留的标识号。

指定字段规则

消息字段可以是以下之一:

  • singular:格式良好的消息可以包含该字段中的零个或一个(但不超过一个)。

  • repeated:此字段可以在格式良好的消息中重复任意次数(包括零)。将保留重复值的顺序。对应Python的列表。

    1
    2
    3
    4
    5
    message Result {
    string url = 1;
    string title = 2;
    repeated string snippets = 3;
    }

添加更多消息类型

可以在单个.proto文件中定义多个消息类型。

1
2
3
4
5
6
7
8
9
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
}

message SearchResponse {
...
}

保留字段

保留变量不被使用

如果通过完全删除字段或将其注释来更新消息类型,则未来用户可以在对类型进行自己的更新时重用字段编号。如果以后加载相同的旧版本,这可能会导致严重问题,包括数据损坏,隐私错误等。确保不会发生这种情况的一种方法是指定已删除字段的字段编号(或名称)reserved。如果将来的任何用户尝试使用这些字段标识符,protobuf编译器将会报错。

1
2
3
4
message Foo {
reserved 2, 15, 9 to 11;
reserved "foo", "bar";
}

默认值

解析消息时,如果编码消息不包含特定的单数元素,则解析对象中的相应字段将设置为该字段的默认值。这些默认值是特定于类型的:

  • 对于字符串,默认值为空字符串。
  • 对于字节,默认值为空字节。
  • 对于bools,默认值为false。
  • 对于数字类型,默认值为零。
  • 对于枚举,默认值是第一个定义的枚举值,该值必须为0。
  • 对于消息字段,未设置该字段。它的确切值取决于语言。
  • 重复字段的默认值为空(通常是相应语言的空列表)。

嵌套类型

你可以在其他消息类型中定义、使用消息类型,在下面的例子中,Result消息就定义在SearchResponse消息内,如:

1
2
3
4
5
6
7
8
message SearchResponse {
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
}
repeated Result results = 1;
}

如果要在其父消息类型之外重用此消息类型,使用

1
SearchResponse.Result

map映射

如果要在数据定义中创建关联映射,Protocol Buffers提供了一种方便的语法:

1
map< key_type, value_type> map_field = N ;

其中key_type可以是任何整数或字符串类型。请注意,枚举不是有效的key_type。value_type可以是除map映射类型外的任何类型。

例如,如果要创建项目映射,其中每条Project消息都与字符串键相关联,则可以像下面这样定义它:

1
map<string, Project> projects = 3 ;
  • map的字段可以是repeated。
  • 序列化后的顺序和map迭代器的顺序是不确定的,所以你不要期望以固定顺序处理map
  • 当为.proto文件产生生成文本格式的时候,map会按照key 的顺序排序,数值化的key会按照数值排序。
  • 从序列化中解析或者融合时,如果有重复的key则后一个key不会被使用,当从文本格式中解析map时,如果存在重复的key,则解析可能会失败。
  • 如果为映射字段提供键但没有值,则字段序列化时的行为取决于语言。在Python中,使用类型的默认值。

oneof

如果你的消息中有很多可选字段, 并且同时至多一个字段会被设置, 你可以加强这个行为,使用oneof特性节省内存。

为了在.proto定义oneof字段, 你需要在名字前面加上oneof关键字, 比如下面例子的test_oneof:

1
2
3
4
5
6
message SampleMessage {
oneof test_oneof {
string name = 4;
SubMessage sub_message = 9;
}
}

然后你可以增加oneof字段到 oneof 定义中. 你可以增加任意类型的字段, 但是不能使用repeated 关键字。

定义服务

Protocol Buffers使用service定义RPC服务。

1
2
3
4
5
6
7
8
9
10
11
message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}

service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

注意:一个service中可定义多个方法。

实例

功能要求

根据请求的用户,推荐不同的文章(给不同的用户推荐不同的文章)

接口原型

接口名称:user_recommend

调用参数(请求参数):

1
2
3
4
UserRequest:
user_id # 用户id
article_num # 推荐的文章数量
time_stamp # 推荐的时间戳

返回的数据:

1
2
3
4
5
6
7
ArticleResponse:
expousre #曝光埋点数据
time_stamp #推荐的时间戳
recommends: #推荐结果(列表)
article_id #文章id
views #点击量
share #分享量

定义Protobuf

使用protobuf定义的接口文件通常以proto作为文件后缀名,文件名:reco.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 使用proto3版本
syntax = "proto3";

// 请求参数:用户id、文章数量、推荐时间戳
message UserRequest {
int32 user_id = 1;
int32 article_num = 2;
int64 time_stamp = 3;
}

// 文章属性:点击量、分享数
message ArticleInfo {
string views = 1;
string share = 2;
}

// 文章:文章id、文章属性
message Article {
int32 article_id = 1;
ArticleInfo info = 2;
}

// 响应数据:埋点数据、推荐时间戳、推荐的文章列表
message ArticleResponse {
string exposure = 1;
int64 time_stamp = 2;
repeated Article recommends = 3;
}

// 定义服务:请求方法为user_recommend(),请求参数为UserRequest;返回响应数据为ArticleResponse
service UserRecommend {
rpc user_recommend(UserRequest) returns(ArticleResponse) {}
}

代码生成

安装protobuf编译器和grpc库

1
pip install grpcio-tools

编译生成代码

1
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. reco.proto
  • -I表示搜索proto文件中被导入文件的目录
  • --python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的数据类型
  • --grpc_python_out表示保存生成Python文件的目录,生成的文件中包含接口定义中的服务类型

在reco.proto所在目录下执行上述命令,会自动生成如下两个rpc调用辅助代码模块:

  • reco_pb2.py 保存根据接口定义文件中的数据类型生成的python类
  • reco_pb2_grpc.py 保存根据接口定义文件中的服务方法类型生成的python调用RPC方法

生成的代码预览

reco_pb2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: model.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor.FileDescriptor(
name='model.proto',
package='',
syntax='proto3',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n\x0bmodel.proto\"G\n\x0bUserRequest\x12\x0f\n\x07user_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x61rticle_num\x18\x02 \x01(\x05\x12\x12\n\ntime_stamp\x18\x03 \x01(\x03\"+\n\x0b\x41rticleInfo\x12\r\n\x05views\x18\x01 \x01(\t\x12\r\n\x05share\x18\x02 \x01(\t\"9\n\x07\x41rticle\x12\x12\n\narticle_id\x18\x01 \x01(\x05\x12\x1a\n\x04info\x18\x02 \x01(\x0b\x32\x0c.ArticleInfo\"U\n\x0f\x41rticleResponse\x12\x10\n\x08\x65xposure\x18\x01 \x01(\t\x12\x12\n\ntime_stamp\x18\x02 \x01(\x03\x12\x1c\n\nrecommends\x18\x03 \x03(\x0b\x32\x08.Article2C\n\rUserRecommend\x12\x32\n\x0euser_recommend\x12\x0c.UserRequest\x1a\x10.ArticleResponse\"\x00\x62\x06proto3'
)




_USERREQUEST = _descriptor.Descriptor(
name='UserRequest',
full_name='UserRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='user_id', full_name='UserRequest.user_id', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='article_num', full_name='UserRequest.article_num', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='time_stamp', full_name='UserRequest.time_stamp', index=2,
number=3, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=15,
serialized_end=86,
)


_ARTICLEINFO = _descriptor.Descriptor(
name='ArticleInfo',
full_name='ArticleInfo',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='views', full_name='ArticleInfo.views', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='share', full_name='ArticleInfo.share', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=88,
serialized_end=131,
)


_ARTICLE = _descriptor.Descriptor(
name='Article',
full_name='Article',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='article_id', full_name='Article.article_id', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='info', full_name='Article.info', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=133,
serialized_end=190,
)


_ARTICLERESPONSE = _descriptor.Descriptor(
name='ArticleResponse',
full_name='ArticleResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='exposure', full_name='ArticleResponse.exposure', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='time_stamp', full_name='ArticleResponse.time_stamp', index=1,
number=2, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='recommends', full_name='ArticleResponse.recommends', index=2,
number=3, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=192,
serialized_end=277,
)

_ARTICLE.fields_by_name['info'].message_type = _ARTICLEINFO
_ARTICLERESPONSE.fields_by_name['recommends'].message_type = _ARTICLE
DESCRIPTOR.message_types_by_name['UserRequest'] = _USERREQUEST
DESCRIPTOR.message_types_by_name['ArticleInfo'] = _ARTICLEINFO
DESCRIPTOR.message_types_by_name['Article'] = _ARTICLE
DESCRIPTOR.message_types_by_name['ArticleResponse'] = _ARTICLERESPONSE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)

UserRequest = _reflection.GeneratedProtocolMessageType('UserRequest', (_message.Message,), {
'DESCRIPTOR' : _USERREQUEST,
'__module__' : 'model_pb2'
# @@protoc_insertion_point(class_scope:UserRequest)
})
_sym_db.RegisterMessage(UserRequest)

ArticleInfo = _reflection.GeneratedProtocolMessageType('ArticleInfo', (_message.Message,), {
'DESCRIPTOR' : _ARTICLEINFO,
'__module__' : 'model_pb2'
# @@protoc_insertion_point(class_scope:ArticleInfo)
})
_sym_db.RegisterMessage(ArticleInfo)

Article = _reflection.GeneratedProtocolMessageType('Article', (_message.Message,), {
'DESCRIPTOR' : _ARTICLE,
'__module__' : 'model_pb2'
# @@protoc_insertion_point(class_scope:Article)
})
_sym_db.RegisterMessage(Article)

ArticleResponse = _reflection.GeneratedProtocolMessageType('ArticleResponse', (_message.Message,), {
'DESCRIPTOR' : _ARTICLERESPONSE,
'__module__' : 'model_pb2'
# @@protoc_insertion_point(class_scope:ArticleResponse)
})
_sym_db.RegisterMessage(ArticleResponse)



_USERRECOMMEND = _descriptor.ServiceDescriptor(
name='UserRecommend',
full_name='UserRecommend',
file=DESCRIPTOR,
index=0,
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_start=279,
serialized_end=346,
methods=[
_descriptor.MethodDescriptor(
name='user_recommend',
full_name='UserRecommend.user_recommend',
index=0,
containing_service=None,
input_type=_USERREQUEST,
output_type=_ARTICLERESPONSE,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_USERRECOMMEND)

DESCRIPTOR.services_by_name['UserRecommend'] = _USERRECOMMEND

# @@protoc_insertion_point(module_scope)

eco_pb2_grpc.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import model_pb2 as model__pb2


class UserRecommendStub(object):
"""定义服务:请求方法为user_recommend(),请求参数为UserRequest;返回响应数据为ArticleResponse
"""

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.user_recommend = channel.unary_unary(
'/UserRecommend/user_recommend',
request_serializer=model__pb2.UserRequest.SerializeToString,
response_deserializer=model__pb2.ArticleResponse.FromString,
)


class UserRecommendServicer(object):
"""定义服务:请求方法为user_recommend(),请求参数为UserRequest;返回响应数据为ArticleResponse
"""

def user_recommend(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_UserRecommendServicer_to_server(servicer, server):
rpc_method_handlers = {
'user_recommend': grpc.unary_unary_rpc_method_handler(
servicer.user_recommend,
request_deserializer=model__pb2.UserRequest.FromString,
response_serializer=model__pb2.ArticleResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'UserRecommend', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class UserRecommend(object):
"""定义服务:请求方法为user_recommend(),请求参数为UserRequest;返回响应数据为ArticleResponse
"""

@staticmethod
def user_recommend(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/UserRecommend/user_recommend',
model__pb2.UserRequest.SerializeToString,
model__pb2.ArticleResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

服务端代码

为了方便看到效果,我们编写补全服务端代码:server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/6/28 10:42
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : server.py
import time
from concurrent.futures.thread import ThreadPoolExecutor

import grpc

from reco_pb2 import ArticleResponse, Article
from reco_pb2_grpc import UserRecommendServicer, add_UserRecommendServicer_to_server


class RecommendService(UserRecommendServicer):
"""定义服务接口:继承rpc接口定义中服务对应的python类"""

def user_recommend(self, request, context):
"""重写接口定义的同名方法,被调用时应该执行的逻辑"""
# 获取请求参数
user_id = request.user_id
article_num = request.article_num
time_stamp = request.time_stamp

# 定义要返回的响应数据
response = ArticleResponse()
response.exposure = "自定义的埋点信息"
# 推荐的时间戳
response.time_stamp = round(time.time() * 1000)
# 推荐的文章列表
articles = []
# 根据请求的文章数量,构造文章属性
for i in range(article_nums):
article = Article()
# 文章id+3
article.article_id = i + 3
article.info.views = '访问次数:%s' % (i + 100)
article.info.share = "分享次数:%s" % (i + 10)
# 每个推荐的文章添加至推荐列表中
articles.append(article)
# 推荐的文章添加到响应数据的recommends中
response.recommends.extend(articles)
# 返回响应结果
return response


def run():
"""rpc服务端启动"""
# 创建rpc服务器
server = grpc.server(ThreadPoolExecutor(max_workers=8))

# 向服务器中添加被调用的服务方法
add_UserRecommendServicer_to_server(RecommendService(), server)
# 绑定服务器ip和端口
server.add_insecure_port('127.0.0.1:8889')

# 启动rpc服务
server.start()

# 阻塞rpc服务,让其一直运行
while True:
time.sleep(10)
print('rpc服务端运行中……')


if __name__ == '__main__':
run()

客户端代码

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/6/28 11:08
# @Author : 托小尼
# @Email : 646547989@qq.com
# @URI : https://www.diandian100.cn
# @File : client.py
import time

from grpc import insecure_channel

from reco_pb2_grpc import UserRecommendStub
from reco_pb2 import UserRequest


def feed_articles(stub):
# 构建rpc服务端请求参数
user_request = UserRequest()
# 用户id为2
user_request.user_id = 2
# 请求推荐文章数量为4
user_request.article_num = 4
# 请求推荐文章时间戳
user_request.time_stamp = round(time.time()*1000)

# 请求rpc服务器推荐文章方法
ret = stub.user_recommend(user_request)
# 打印服务器返回的推荐文章时间戳
print(ret.time_stamp)
# 循环服务器返回的推荐文章id和文章点击量
for i in ret.recommends:
print(i.article_id)
print(i.info.views)
# 如果你用的是django等,可以在此处根据文章id获取到文章的具体参数,然后返回给前端。

def run():
"""rpc客户端调用的方法"""

# 使用with语句连接rpc服务器
with insecure_channel('127.0.0.1:8889') as server:
# 创建调用rpc远端服务的辅助对象stub
stub = UserRecommendStub(server)
# 通过stub进行rpc调用
feed_articles(stub)

if __name__ == '__main__':
run()

测试运行

先运行服务端server.py,可以看到服务端正常运行中。

1
2
3
4
rpc服务端运行中……
rpc服务端运行中……
rpc服务端运行中……

运行客户端

1
2
3
4
5
6
7
8
9
10
11
1624936785348
3
访问次数:100
4
访问次数:101
5
访问次数:102
6
访问次数:103

进程已结束,退出代码为 0

可以看到客户端正常获取到了服务端返回的时间戳和推荐文章id和点击量。正常开发中,我们可以再feed_articles获得推荐文章id时再依次从缓存或数据库中获取到文章的真实数据,返回给前端即可。