''' 打印结果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主线程/主进程 Thread-1 '''
代码示例 from threading import Thread import time defsayhi(name): time.sleep(2) print('%s say hello' %name)
if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主线程') print(t.is_alive()) ''' egon say hello 主线程 False '''
from threading import Thread import os,time defwork(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i inrange(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join()
print(n) #结果可能为99
多个线程抢占资源的情况 import threading R=threading.Lock() R.acquire() ''' 对公共数据的操作 ''' R.release() from threading import Thread,Lock import os,time defwork(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i inrange(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join()
print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
同步锁的引用 #不加锁:并发执行,速度快,数据不安全 from threading import current_thread,Thread,Lock import os,time deftask(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1
if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i inrange(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join()
''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''
#不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全 from threading import current_thread,Thread,Lock import os,time deftask(): #未加锁的代码并发运行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加锁的代码串行运行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release()
if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i inrange(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n))
''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''
#有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊 #没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是 #start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的 #单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高. from threading import current_thread,Thread,Lock import os,time deftask(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1
if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i inrange(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n))
''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗时是多么的恐怖 '''
优先级队列 Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuplein the form: (priority_number, data).
exception queue.Empty Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.
exception queue.Full Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.
Queue.qsize() Queue.empty() #return True if empty Queue.full() # return True if full Queue.put(item, block=True, timeout=None) Put item into the queue. If optional args block is true and timeout isNone (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, elseraise the Full exception (timeout is ignored in that case).
Queue.put_nowait(item) Equivalent to put(item, False).
Queue.get(block=True, timeout=None) Remove andreturn an item from the queue. If optional args block is true and timeout isNone (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, elseraise the Empty exception (timeout is ignored in that case).
Queue.get_nowait() Equivalent to get(False).
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Queue.task_done() Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
#1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class.
# cancle() 取消某个任务 #介绍 The ProcessPoolExecutor classis an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
classconcurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers isNoneornot given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
#用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random deftask(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2
if __name__ == '__main__':
executor=ProcessPoolExecutor(max_workers=3)
futures=[] for i inrange(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
ProcessPoolExecutor #介绍 ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. classconcurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
Changed in version 3.5: If max_workers isNoneornot given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
#用法 与ProcessPoolExecutor相同
ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random deftask(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
# for i in range(11): # future=executor.submit(task,i)
executor.map(task,range(1,12)) #map取代了for+submit
map的用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os
defget_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text}