使用爬虫定时发送哔哩哔哩弹幕和发送私信

爬虫是python现在很流行的一个应用方向,今天通过我们实际生活中有可能用到的做一个小实例,不同以往的get页面数据,我们今天通过post想服务器定时发送一下数据。

今天主要是使用到两个库requests和scheduler。如果你想爬取网站其他数据,可以参考之前的文章初识Scrapy爬虫框架

requests

其实这才是我们今天的主角,由于它太出名了,简单两句话也说不了,直接放文档链接大家自己看吧https://requests.readthedocs.io/zh_CN/latest/user/quickstart.html

scheduler

APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。

调度器(scheduler)

调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。 

对于不同的不场景,可以选择的调度器:

  • BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。
  • BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(常用)。
  • AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
  • GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
  • TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
  • TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
  • QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。


from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()

def job1():
print "%s: 执行任务" % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()

while True:
pass

2、作业存储(job store)

作业存储(job store)主要用来存储被调度的作业,默认的作业存储是简单地把作业保存在内存中。

jobstore提供对scheduler中job的增删改查接口,根据存储方式的不同,分以下几种:

  • MemoryJobStore:没有序列化,jobs就存在内存里,增删改查也都是在内存中操作
  • SQLAlchemyJobStore:所有sqlalchemy支持的数据库都可以做为backend,增删改查操作转化为对应backend的sql语句
  • MongoDBJobStore:用mongodb作backend
  • RedisJobStore: 用redis作backend
  • RethinkDBJobStore: 用rethinkdb 作backend
  • ZooKeeperJobStore:用ZooKeeper做backend

3、执行器(executor)

执行器(executor)主要处理任务的运行,主要是把定时任务中的可调用对象(function)提交给一个一个线程或者进程来进行。当任务完成时,执行器将会通知调度器。

最常用的 执行器(executor) 有两种:

  • ProcessPoolExecutor(进程池)
  • ThreadPoolExecutor(线程池,max:10)

4、触发器(triggers)

当调度一个任务时,需要为它设置一个触发器(triggers)。触发器决定在什么日期/时间、用什么样的形式来执行执行这个定时任务。

APScheduler 有三种内建的 trigger:

  • date: 指定某个确定的时间点,job仅执行一次。
  • interval: 指定时间间隔(fixed intervals)周期性执行。
  • cron: 使用cron风格表达式周期性执行,用于(在指定时间内)定期运行job的场景。使用同linux下crontab的方式。
4.1、date 定时调度(作业只会执行一次)

参数如下:

  • run_date (datetime|str) – 作业的运行日期或时间
  • timezone (datetime.tzinfo|str) – 指定时区
1
2
3
4
5
6
<!--# 2016-12-12运行一次job_function-->
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])

<!--# 2016-12-12 12:00:00运行一次job_function-->
<!--args=[]中是传给job_function的参数-->
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
4.2、interval: 每隔一段时间执行一次

weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

  • weeks (int) – 间隔几周
  • days (int) – 间隔几天
  • hours (int) – 间隔几小时
  • minutes (int) – 间隔几分钟
  • seconds (int) – 间隔多少秒
  • start_date (datetime|str) – 开始日期
  • end_date (datetime|str) – 结束日期
  • timezone (datetime.tzinfo|str) – 时区
1
2
3
4
5
6
7
8
9
10
11
12
<!--每隔2小时执行一次-->
scheduler.add_job(my_job, 'interval', hours=2)


<!--设置时间范围,在设置的时间范围内每隔2小时执行一次-->
scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)


<!--使用装饰器的方式添加定时任务-->
@scheduler.scheduled_job('interval', id='my_job_id', hours=2)
def my_job():
print("Hello World")
4.3、cron: 使用同linux下crontab的方式

(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

除了week和 day_of_week,它们的默认值是 *

  • 例如 day=1, minute=20 ,这就等于
1
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

工作将在每个月的第一天以每小时20分钟的时间执行

表达式参数类型描述
*所有通配符。例: minutes=* 即每分钟触发
*/a所有可被a整除的通配符。
a-b所有范围a-b触发
a-b/c所有范围a-b,且可被c整除时触发
xth y第几个星期几触发。x为第几个,y为星期几
last x一个月中,最后个星期几触发
last一个月最后一天触发
x,y,z所有组合表达式,可以组合确定值或上方的表达式
  • year (int|str) – 年,4位数字
  • month (int|str) – 月 (范围1-12)
  • day (int|str) – 日 (范围1-31)
  • week (int|str) – 周 (范围1-53)
  • day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – 时 (范围0-23)
  • minute (int|str) – 分 (范围0-59)
  • second (int|str) – 秒 (范围0-59)
  • start_date (datetime|str) – 最早开始日期(包含)
  • end_date (datetime|str) – 最晚结束时间(包含)
  • timezone (datetime.tzinfo|str) – 指定时区
1
2
3
4
5
6
7
sched.add_job(my_job, 'cron', hour=3, minute=30)
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30')

@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")

二、 How:APSched 怎么用?

安装
  • pip 安装
1
pip install apscheduler
  • 源码安装(https://pypi.python.org/pypi/APScheduler/)
1
python setup.py install
快速上手
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# first.py

from apscheduler.schedulers.blocking import BlockingScheduler
import time

# 实例化一个调度器
scheduler = BlockingScheduler()

def job1():
print "%s: 执行任务" % time.asctime()

# 添加任务并设置触发方式为3s一次
scheduler.add_job(job1, 'interval', seconds=3)

# 开始运行调度器
scheduler.start()
  • 执行输出:
1
2
3
4
5
> python first.py

Fri Sep 8 20:41:55 2017: 执行任务
Fri Sep 8 20:41:58 2017: 执行任务
...
任务操作
1、添加任务

方法一:调用add_job()方法

调用add_job()方法返回一个apscheduler.job.Job 的实例,可以用来改变或者移除 job

1
job = scheduler.add_job(myfunc, 'interval', minutes=2)

方法二:使用装饰器scheduled_job()

适用于应用运行期间不会改变的 job

1
2
3
4
5
6
7
8
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
# 装饰器
@sched.scheduled_job('interval', id='my_job_id', seconds=5)
def job_function():
print("Hello World")
# 开始
sched.start()
2、删除任务
1
2
3
4
5
6
7
8
 <!--方法一:通过作业ID或别名调用remove_job()删除作业-->
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')


<!-- 方法二:通过add_job()返回的job实例调用remove()方法删除作业-->
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
3、暂停&继续任务

可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:

1
2
3
4
5
6
7
8
9
10
11
12
<!--根据任务实例-->
job = scheduler.add_job(myfunc, 'interval', minutes=2)
<!--暂停-->
job.pause()
<!--继续-->
job.resume()


<!--根据任务id暂停-->
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
4、修改任务属性
1
2
3
4
5
6
<!--修饰:-->
job.modify(max_instances=6, name='Alternate name')


<!--重设:-->
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
5、获得job列表
  • 使用get_jobs()来获取所有的job实例。
  • 使用print_jobs()来输出所有格式化的作业列表。
  • 使用get_job(任务ID)获取指定任务的作业列表。
1
2
<!--获取所有的job实例-->
apscheduler.get_jobs()
6、开始&关闭任务
1
2
scheduler.start() #开启
scheduler.shotdown(wait=True|False) #关闭 False 无论任务是否执行,强制关闭

三、一些定时任务脚本

1、定时任务运行脚本每日凌晨00:30:30执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler() # 后台运行

# 设置为每日凌晨00:30:30时执行一次调度程序
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
print "schedule execute"
sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


if __name__ == '__main__':
try:
scheduler.start()
sys_logging.debug("statistic scheduler start success")
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
sys_logging.debug("statistic scheduler start-up fail")
2、每天晚上0点 - 早上8点期间,每5秒执行一次任务。
1
2
# 每天晚上0点 - 早上8点期间,每5秒执行一次任务。
scheduler.add_job(my_job, 'cron',day_of_week='*',hour = '0-8',second = '*/5')
3、在0、10、20、30、40、50分时执行任务。
1
2
<!--# 可以被10整除的时间点执行任务,这个任务就表示在0、10、20、30、40、50分时都会执行任务-->
scheduler.add_job(my_job, 'cron',day_of_week='*',minute = '*/10')
4、直到2020-05-30,每周从周一到周五的早上5:30都执行一次定时任务
1
2
3
4
5
6
<!--直到2020-05-30 00:00:00,每周星期从星期一到星期五的早上5:30都执行一次定时任务-->
sched.add_job(my_job(),'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2020-05-30')


<!--# 截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function-->
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')
5、在6,7,8,11,12月的第3个周五的1,2,3点执行定时任务
1
2
3
<!--job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行-->

sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
6、每5秒执行该程序一次
1
2
<!--#表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5-->
sched.add_job(my_job, 'cron',second = '*/5')

定时发弹幕

我们这里主要使用到两个第三方库requests和Scheduler这两个库我这里因为其大名和上面也讲过,下面代码中我就不再作解释。

我们这里理一下正常发弹幕的流程:打开直播页面》登录(发送弹幕登录)》输入弹幕内容》点击发送

将以上流程转化为代码运行流程:得到弹幕提交的链接》使用带登录的cookie携带弹幕等数据(大部分为post)进行发送。

以上看似流程减少了,因为大部分东西已包含在了代码中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import random

import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler


def send_barrage():
"""发送弹幕"""
# 弹幕发送接口
url = "https://api.live.bilibili.com/msg/send"
# 请求头
headers = {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Connection": "keep-alive",
"Content-Length": "178",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Cookie": "你的cookie",
"Host": "api.live.bilibili.com",
"Origin": "https://live.bilibili.com",
"Referer": "https://live.bilibili.com/22606139?session_id=44851e883b2f84e_E8A17D19-ABF3-4805-9B35-CA6540BD2AC3&visit_id=9ld4n04lw4jk",
"TE": "Trailers",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:84.0) Gecko/20100101 Firefox/84.0",
}
# 请求体
data = {
"color": "16777215",
"fontsize": "25",
"mode": "1",
"msg": random_words(),
"rnd": "1608879402",
"roomid": "22606139",
"bubble": "0",
"csrf_token": "11debb38a4ed044cf23427161aa07048",
"csrf": "11debb38a4ed044cf23427161aa07048",
}
# 发送post请求
res = requests.post(url=url, headers=headers, data=data)
# 打印下提交后返回的数据
print(res.text)
return res.text

def random_words():
"""准备随机发送弹幕的内容,当然可以在这里调用你的数据库里的"""
words = ["大神收下我的膝盖", "特意过来膜拜你的", "为大神打call", "这也太太太厉害了吧", "好喜欢么么哒", "请大佬喝茶"]
# 随机返回一条数组数据
return random.choice(words)

# 定时执行
def auto_start():
# 实例化
scheduler = BlockingScheduler()
# 每隔8秒钟循环执行send_barrage方法
scheduler.add_job(send_barrage, "interval", seconds=8)
# 开始执行任务
scheduler.start()


if __name__ == '__main__':
auto_start()

关于以上请求头、请求接口、请求体格式或内容从哪儿获取

请求接口获取

如图打开一个直播页面,调出开发者工具进入到网络(其他浏览器是network),此时在页面中发送一条弹幕即可看到刚才的请求,选中该请求复制请求网址及弹幕的发送接口,从图中我们也可以看到接口给我们返回的数据,code=0是成功的意思。

获取请求头

还是刚才的开发者工具,我们下方调整到消息头,可以看到cookie其实就保存了我们当前的登录信息,我们将整个请求头给拷贝出来,格式化为python字典,跟代码中对应一下。

获取请求体

再调整到请求,将表单数据复制出来,格式化代码中的data格式一样

运行

以上我们要准备的东西都已经齐了,运行上方的代码,同时观察直播页面,可以看到每隔8秒钟直播间弹幕里会出现你随机发送的内容,是不是很简单呢。

使用requests发送b站私信

其实看了以上代码,发私信我觉得对大家来说也很简单了,我这里给大家放一个生成curl的代码工具让你的操作更简单,它支持python、php、go,node等等,偷懒的必备工具,以下我们就拿私信做个尝试吧。

获取curl命令

打开私信页面,同样尝试给某人发送一条消息,抓取该请求,右击该请求赋值为curl命令。

打开生成代码工具

打开curl代码生成工具,语言选择python,将复制的curl命令拷贝到curl command,即可在python requests内获取到 代码。

运行

将改代码复制出来拷贝到你的ide直接运行,再查看私信页面,数据是不是被发送出去了呢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import requests

cookies = {
'$Cookie: _uuid': 'Ceeeeeeeeeeewrwrwer',
'buvid3': '6werwerwerwerwerwer',
'UM_distinctid': '1748f6b487d44d-0werwerwerwerb487e5cb',
'sid': '51o22sll46',
'DedeUserID': '3789335165',
'DedeUserID__ckMd5': 'a87ca25d00ee7740',
'SESSDATA': '399werwerwer6364*b1',
'bili_jct': '11debb3werwerweraa07048',
'CURRENT_FNVAL': '80',
'blackside_state': '1',
'rpdid': '|(J|J|m)kYm|0J\'uY||uJlR|R',
'PVID': '9',
'LIVE_BUVID': 'AUTO1616033517634145',
'_dfcaptcha': '939e596fa169awerwerwerwer37b5b907d7',
'bsource': 'search_baidu',
'bp_video_offset_37895165': '472307werwer57485447',
}

headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:84.0) Gecko/20100101 Firefox/84.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': 'https://message.bilibili.com',
'Connection': 'keep-alive',
'Referer': 'https://message.bilibili.com/',
'TE': 'Trailers',
}

data = {
'msg[sender_uid]': '234242',
'msg[receiver_id]': '72342233',
'msg[receiver_type]': '1',
'msg[msg_type]': '1',
'msg[msg_status]': '0',
'msg[content]': '{"content":"学习爬虫"}',
'msg[timestamp]': '1608884548',
'msg[new_face_version]': '0',
'msg[dev_id]': '65C3E936-eeee-4DD8-A65C-262BB8A1A582',
'from_firework': '0',
'build': '0',
'mobi_app': 'web',
'csrf_token': '11debb38a4ed04wrwerew4cff0f57161aa07048',
'csrf': '11debb38a4ed04wrwerew4cff0f57161aa07048'
}

response = requests.post('https://api.vc.bilibili.com/web_im/v1/web_im/send_msg', headers=headers, cookies=cookies,
data=data)
print(response.text)

看下刚才复制的代码是不是就是一个很简单的post请求呢,根据定时发弹幕的例子,你是不是也能写出定时发私信,批量群发私信的功能了呢。