爬虫是python现在很流行的一个应用方向,今天通过我们实际生活中有可能用到的做一个小实例,不同以往的get页面数据,我们今天通过post想服务器定时发送一下数据。
今天主要是使用到两个库requests和scheduler。如果你想爬取网站其他数据,可以参考之前的文章初识Scrapy爬虫框架
requests 其实这才是我们今天的主角,由于它太出名了,简单两句话也说不了,直接放文档链接大家自己看吧https://requests.readthedocs.io/zh_CN/latest/user/quickstart.html
scheduler APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。
调度器(scheduler) 调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
对于不同的不场景,可以选择的调度器:
BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。 BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(常用)。 AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。 GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。 TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。 TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用 QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from apscheduler.schedulers.background import BackgroundSchedulerimport timescheduler = BackgroundScheduler() def job1 (): print "%s: 执行任务" % time.asctime() scheduler.add_job(job1, 'interval' , seconds=3 ) scheduler.start() while True : pass
2、作业存储(job store) 作业存储(job store)主要用来存储被调度的作业,默认的作业存储是简单地把作业保存在内存中。
jobstore提供对scheduler中job的增删改查接口,根据存储方式的不同,分以下几种:
MemoryJobStore:没有序列化,jobs就存在内存里,增删改查也都是在内存中操作 SQLAlchemyJobStore:所有sqlalchemy支持的数据库都可以做为backend,增删改查操作转化为对应backend的sql语句 MongoDBJobStore:用mongodb作backend RedisJobStore: 用redis作backend RethinkDBJobStore: 用rethinkdb 作backend ZooKeeperJobStore:用ZooKeeper做backend 3、执行器(executor) 执行器(executor)主要处理任务的运行,主要是把定时任务中的可调用对象(function)提交给一个一个线程或者进程来进行。当任务完成时,执行器将会通知调度器。
最常用的 执行器(executor) 有两种:
ProcessPoolExecutor(进程池) ThreadPoolExecutor(线程池,max:10) 4、触发器(triggers) 当调度一个任务时,需要为它设置一个触发器(triggers)。触发器决定在什么日期/时间、用什么样的形式来执行执行这个定时任务。
APScheduler 有三种内建的 trigger:
date: 指定某个确定的时间点,job仅执行一次。 interval: 指定时间间隔(fixed intervals)周期性执行。 cron: 使用cron风格表达式周期性执行,用于(在指定时间内)定期运行job的场景。使用同linux下crontab的方式。 4.1、date 定时调度(作业只会执行一次) 参数如下:
run_date (datetime|str) – 作业的运行日期或时间 timezone (datetime.tzinfo|str) – 指定时区 1 2 3 4 5 6 <!-- sched.add_job(job_function, 'date' , run_date=date(2016 , 12 , 12 ), args=['text' ]) <!-- <!--args=[]中是传给job_function的参数--> sched.add_job(job_function, 'date' , run_date=datetime(2016 , 12 , 12 , 12 , 0 , 0 ), args=['text' ])
4.2、interval: 每隔一段时间执行一次 weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None
weeks (int) – 间隔几周 days (int) – 间隔几天 hours (int) – 间隔几小时 minutes (int) – 间隔几分钟 seconds (int) – 间隔多少秒 start_date (datetime|str) – 开始日期 end_date (datetime|str) – 结束日期 timezone (datetime.tzinfo|str) – 时区 1 2 3 4 5 6 7 8 9 10 11 12 <!--每隔2 小时执行一次--> scheduler.add_job(my_job, 'interval' , hours=2 ) <!--设置时间范围,在设置的时间范围内每隔2 小时执行一次--> scheduler.add_job(my_job, 'interval' , hours=2 , start_date='2017-9-8 21:30:00' , end_date='2018-06-15 21:30:00) <!--使用装饰器的方式添加定时任务--> @scheduler.scheduled_job(' interval', id=' my_job_id', hours=2) def my_job(): print("Hello World")
4.3、cron: 使用同linux下crontab的方式 (year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
除了week和 day_of_week,它们的默认值是 *
例如 day=1, minute=20 ,这就等于 1 year='*' , month='*' , day=1 , week='*' , day_of_week='*' , hour='*' , minute=20 , second=0
工作将在每个月的第一天以每小时20分钟的时间执行
表达式 参数类型 描述 * 所有 通配符。例: minutes=* 即每分钟触发 */a 所有 可被a整除的通配符。 a-b 所有 范围a-b触发 a-b/c 所有 范围a-b,且可被c整除时触发 xth y 日 第几个星期几触发。x为第几个,y为星期几 last x 日 一个月中,最后个星期几触发 last 日 一个月最后一天触发 x,y,z 所有 组合表达式,可以组合确定值或上方的表达式
year (int|str) – 年,4位数字 month (int|str) – 月 (范围1-12) day (int|str) – 日 (范围1-31) week (int|str) – 周 (范围1-53) day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) hour (int|str) – 时 (范围0-23) minute (int|str) – 分 (范围0-59) second (int|str) – 秒 (范围0-59) start_date (datetime|str) – 最早开始日期(包含) end_date (datetime|str) – 最晚结束时间(包含) timezone (datetime.tzinfo|str) – 指定时区 1 2 3 4 5 6 7 sched.add_job(my_job, 'cron' , hour=3 , minute=30 ) sched.add_job(my_job, 'cron' , day_of_week='mon-fri' , hour=5 , minute=30 , end_date='2017-10-30' ) @sched.scheduled_job('cron' , id ='my_job_id' , day='last sun' ) def some_decorated_task (): print ("I am printed at 00:00:00 on the last Sunday of every month!" )
二、 How:APSched 怎么用? 安装 源码安装(https://pypi.python.org/pypi/APScheduler/) 快速上手 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from apscheduler.schedulers.blocking import BlockingSchedulerimport timescheduler = BlockingScheduler() def job1 (): print "%s: 执行任务" % time.asctime() scheduler.add_job(job1, 'interval' , seconds=3 ) scheduler.start()
1 2 3 4 5 > python first.py Fri Sep 8 20 :41 :55 2017 : 执行任务 Fri Sep 8 20 :41 :58 2017 : 执行任务 ...
任务操作 1、添加任务 方法一:调用add_job()方法
调用add_job()方法返回一个apscheduler.job.Job 的实例,可以用来改变或者移除 job
1 job = scheduler.add_job(myfunc, 'interval' , minutes=2 )
方法二:使用装饰器scheduled_job()
适用于应用运行期间不会改变的 job
1 2 3 4 5 6 7 8 from apscheduler.schedulers.blocking import BlockingSchedulersched = BlockingScheduler() @sched.scheduled_job('interval' , id ='my_job_id' , seconds=5 ) def job_function (): print ("Hello World" ) sched.start()
2、删除任务 1 2 3 4 5 6 7 8 <!--方法一:通过作业ID或别名调用remove_job()删除作业--> scheduler.add_job(myfunc, 'interval' , minutes=2 , id ='my_job_id' ) scheduler.remove_job('my_job_id' ) <!-- 方法二:通过add_job()返回的job实例调用remove()方法删除作业--> job = scheduler.add_job(myfunc, 'interval' , minutes=2 ) job.remove()
3、暂停&继续任务 可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:
1 2 3 4 5 6 7 8 9 10 11 12 <!--根据任务实例--> job = scheduler.add_job(myfunc, 'interval' , minutes=2 ) <!--暂停--> job.pause() <!--继续--> job.resume() <!--根据任务id 暂停--> scheduler.add_job(myfunc, 'interval' , minutes=2 , id ='my_job_id' ) scheduler.pause_job('my_job_id' ) scheduler.resume_job('my_job_id' )
4、修改任务属性 1 2 3 4 5 6 <!--修饰:--> job.modify(max_instances=6 , name='Alternate name' ) <!--重设:--> scheduler.reschedule_job('my_job_id' , trigger='cron' , minute='*/5' )
5、获得job列表 使用get_jobs()来获取所有的job实例。 使用print_jobs()来输出所有格式化的作业列表。 使用get_job(任务ID)获取指定任务的作业列表。 1 2 <!--获取所有的job实例--> apscheduler.get_jobs()
6、开始&关闭任务 1 2 scheduler.start() scheduler.shotdown(wait=True |False )
三、一些定时任务脚本 1、定时任务运行脚本每日凌晨00:30:30执行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import datetimefrom apscheduler.schedulers.blocking import BlockingSchedulerfrom app.untils.log_builder import sys_loggingscheduler = BlockingScheduler() @scheduler.scheduled_job("cron" , day_of_week='*' , hour='1' , minute='30' , second='30' ) def rebate (): print "schedule execute" sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S" )) if __name__ == '__main__' : try : scheduler.start() sys_logging.debug("statistic scheduler start success" ) except (KeyboardInterrupt, SystemExit): scheduler.shutdown() sys_logging.debug("statistic scheduler start-up fail" )
2、每天晚上0点 - 早上8点期间,每5秒执行一次任务。 1 2 scheduler.add_job(my_job, 'cron' ,day_of_week='*' ,hour = '0-8' ,second = '*/5' )
3、在0、10、20、30、40、50分时执行任务。 1 2 <!-- scheduler.add_job(my_job, 'cron' ,day_of_week='*' ,minute = '*/10' )
4、直到2020-05-30,每周从周一到周五的早上5:30都执行一次定时任务 1 2 3 4 5 6 <!--直到2020 -05-30 00 :00 :00 ,每周星期从星期一到星期五的早上5 :30 都执行一次定时任务--> sched.add_job(my_job(),'cron' , day_of_week='mon-fri' , hour=5 , minute=30 ,end_date='2020-05-30' ) <!-- sched.add_job(job_function, 'cron' , day_of_week='mon-fri' , hour=5 , minute=30 , end_date='2016-12-31' )
5、在6,7,8,11,12月的第3个周五的1,2,3点执行定时任务 1 2 3 <!--job_function将会在6 ,7 ,8 ,11 ,12 月的第3 个周五的1 ,2 ,3 点运行--> sched.add_job(job_function, 'cron' , month='6-8,11-12' , day='3rd fri' , hour='0-3' )
6、每5秒执行该程序一次 1 2 <!-- sched.add_job(my_job, 'cron' ,second = '*/5' )
定时发弹幕 我们这里主要使用到两个第三方库requests和Scheduler这两个库我这里因为其大名和上面也讲过,下面代码中我就不再作解释。
我们这里理一下正常发弹幕的流程:打开直播页面》登录(发送弹幕登录)》输入弹幕内容》点击发送
将以上流程转化为代码运行流程:得到弹幕提交的链接》使用带登录的cookie携带弹幕等数据(大部分为post)进行发送。
以上看似流程减少了,因为大部分东西已包含在了代码中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import randomimport requestsfrom apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.schedulers.blocking import BlockingSchedulerdef send_barrage (): """发送弹幕""" url = "https://api.live.bilibili.com/msg/send" headers = { "Accept" : "application/json, text/javascript, */*; q=0.01" , "Accept-Encoding" : "gzip, deflate, br" , "Accept-Language" : "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2" , "Connection" : "keep-alive" , "Content-Length" : "178" , "Content-Type" : "application/x-www-form-urlencoded; charset=UTF-8" , "Cookie" : "你的cookie" , "Host" : "api.live.bilibili.com" , "Origin" : "https://live.bilibili.com" , "Referer" : "https://live.bilibili.com/22606139?session_id=44851e883b2f84e_E8A17D19-ABF3-4805-9B35-CA6540BD2AC3&visit_id=9ld4n04lw4jk" , "TE" : "Trailers" , "User-Agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:84.0) Gecko/20100101 Firefox/84.0" , } data = { "color" : "16777215" , "fontsize" : "25" , "mode" : "1" , "msg" : random_words(), "rnd" : "1608879402" , "roomid" : "22606139" , "bubble" : "0" , "csrf_token" : "11debb38a4ed044cf23427161aa07048" , "csrf" : "11debb38a4ed044cf23427161aa07048" , } res = requests.post(url=url, headers=headers, data=data) print (res.text) return res.text def random_words (): """准备随机发送弹幕的内容,当然可以在这里调用你的数据库里的""" words = ["大神收下我的膝盖" , "特意过来膜拜你的" , "为大神打call" , "这也太太太厉害了吧" , "好喜欢么么哒" , "请大佬喝茶" ] return random.choice(words) def auto_start (): scheduler = BlockingScheduler() scheduler.add_job(send_barrage, "interval" , seconds=8 ) scheduler.start() if __name__ == '__main__' : auto_start()
关于以上请求头、请求接口、请求体格式或内容从哪儿获取
请求接口获取 如图打开一个直播页面,调出开发者工具进入到网络(其他浏览器是network),此时在页面中发送一条弹幕即可看到刚才的请求,选中该请求复制请求网址及弹幕的发送接口,从图中我们也可以看到接口给我们返回的数据,code=0是成功的意思。
获取请求头 还是刚才的开发者工具,我们下方调整到消息头,可以看到cookie其实就保存了我们当前的登录信息,我们将整个请求头给拷贝出来,格式化为python字典,跟代码中对应一下。
获取请求体 再调整到请求,将表单数据复制出来,格式化代码中的data格式一样
运行 以上我们要准备的东西都已经齐了,运行上方的代码,同时观察直播页面,可以看到每隔8秒钟直播间弹幕里会出现你随机发送的内容,是不是很简单呢。
使用requests发送b站私信 其实看了以上代码,发私信我觉得对大家来说也很简单了,我这里给大家放一个生成curl的代码工具让你的操作更简单,它支持python、php、go,node等等,偷懒的必备工具,以下我们就拿私信做个尝试吧。
获取curl命令 打开私信页面,同样尝试给某人发送一条消息,抓取该请求,右击该请求赋值为curl命令。
打开生成代码工具 打开curl代码生成工具 ,语言选择python,将复制的curl命令拷贝到curl command,即可在python requests内获取到 代码。
运行 将改代码复制出来拷贝到你的ide直接运行,再查看私信页面,数据是不是被发送出去了呢。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import requests cookies = { '$Cookie: _uuid' : 'Ceeeeeeeeeeewrwrwer' , 'buvid3' : '6werwerwerwerwerwer' , 'UM_distinctid' : '1748f6b487d44d-0werwerwerwerb487e5cb' , 'sid' : '51o22sll46' , 'DedeUserID' : '3789335165' , 'DedeUserID__ckMd5' : 'a87ca25d00ee7740' , 'SESSDATA' : '399werwerwer6364*b1' , 'bili_jct' : '11debb3werwerweraa07048' , 'CURRENT_FNVAL' : '80' , 'blackside_state' : '1' , 'rpdid' : '|(J|J|m)kYm|0J\'uY||uJlR|R' , 'PVID' : '9' , 'LIVE_BUVID' : 'AUTO1616033517634145' , '_dfcaptcha' : '939e596fa169awerwerwerwer37b5b907d7' , 'bsource' : 'search_baidu' , 'bp_video_offset_37895165' : '472307werwer57485447' , } headers = { 'User-Agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:84.0) Gecko/20100101 Firefox/84.0' , 'Accept' : 'application/json, text/plain, */*' , 'Accept-Language' : 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2' , 'Content-Type' : 'application/x-www-form-urlencoded' , 'Origin' : 'https://message.bilibili.com' , 'Connection' : 'keep-alive' , 'Referer' : 'https://message.bilibili.com/' , 'TE' : 'Trailers' , } data = { 'msg[sender_uid]' : '234242' , 'msg[receiver_id]' : '72342233' , 'msg[receiver_type]' : '1' , 'msg[msg_type]' : '1' , 'msg[msg_status]' : '0' , 'msg[content]' : '{"content":"学习爬虫"}' , 'msg[timestamp]' : '1608884548' , 'msg[new_face_version]' : '0' , 'msg[dev_id]' : '65C3E936-eeee-4DD8-A65C-262BB8A1A582' , 'from_firework' : '0' , 'build' : '0' , 'mobi_app' : 'web' , 'csrf_token' : '11debb38a4ed04wrwerew4cff0f57161aa07048' , 'csrf' : '11debb38a4ed04wrwerew4cff0f57161aa07048' } response = requests.post('https://api.vc.bilibili.com/web_im/v1/web_im/send_msg' , headers=headers, cookies=cookies, data=data) print (response.text)
看下刚才复制的代码是不是就是一个很简单的post请求呢,根据定时发弹幕的例子,你是不是也能写出定时发私信,批量群发私信的功能了呢。