1. Python 多线程使用
多线程 位于python
的threading.Thread
模块中,
基础使用方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import timeimport randomimport threadingfrom queue import Queuefrom typing import Intdef actions (data: Queue ): while not data.empty(): x = data.get() time.sleep(random.uniform(0 , 3 )) return x**2 if __name__ == "__main__" : input_data = Queue() for i in range (1000 ): input_data.put(i) threads_list = [] for i in range (5 ): tmp_thread = threading.Thread(target = actions, args = (input_data, )) threads_list.append(tmp_thread) for thread in threads_list: thread.start() thread.join()
注意事项:
适用任任务: I-O密集型任务, 等待时间间隔较长的任务
线程安全: 多线程模式下, 共享主进程内存,
因此存在同时访问或修改主进程变量情况,
尽量使用队列Queue
来保证线程安全
2. Python 多线程封装
2.1 封装的基本思路
使用producer
consumer
流水线
使用Queue
保证线程安全
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import datetimeimport threadingfrom queue import Queuefrom typing import Callable , Any class MultiThread : def __init__ (self, producer_queue: Queue, producer_func: Callable , consumer_func: Callable , max_producer_threads: int = 5 , max_consumer_threads: int = 5 , queue_timeout: int = 3 , early_run_consumer: bool = False , print_qsize_step: int = 100 ): self.producer_queue = producer_queue self.consumer_queue = Queue() self.result_queue = Queue() self.producer_func = producer_func self.consumer_func = consumer_func self.producer_queue_empty = False self.consumer_queue_empty = False self.queue_timeout = queue_timeout self.max_producer_threads = max_producer_threads self.max_consumer_threads = max_consumer_threads self.producer_threads_list = [] self.consumer_threads_list = [] self.early_run_consumer = early_run_consumer self.print_qsize_step = print_qsize_step def producer (self ): while True : if self.producer_queue.empty(): if not self.producer_queue_empty: self.producer_queue_empty = True break try : producer_out = self.producer_queue.get(timeout = self.queue_timeout) producer_result = self.producer_func(producer_out) if producer_result: self.consumer_queue.put(producer_result) except : continue def consumer (self ): while True : if self.print_qsize_step > 1 and self.result_queue.qsize() % self.print_qsize_step == 0 : print (datetime.datetime.now(), "\t" *2 , self.result_queue.qsize()) if self.consumer_queue.empty() and self.producer_queue_empty: if not self.consumer_queue_empty: self.consumer_queue_empty = True break try : consumer_out = self.consumer_queue.get(timeout = self.queue_timeout) consumer_result = self.consumer_func(consumer_out) if consumer_result: self.result_queue.put(consumer_result) except : continue def run (self ): for i in range (self.max_producer_threads): producer_thread = threading.Thread(target = self.producer) self.producer_threads_list.append(producer_thread) producer_thread.start() if not self.early_run_consumer: producer_thread.join() for i in range (self.max_consumer_threads): consumer_thread = threading.Thread(target = self.consumer) self.consumer_threads_list.append(consumer_thread) consumer_thread.start() for thread in self.consumer_threads_list: thread.join()
2.2 使用举例
需要定义producer_func
和consumer_func
,
并且推荐使用闭包的形式
处理完成的结果保存于MultiThread.result_queue
中
代码如下:
#coding: utf-8
import Queue
from typing import Int, Tuple
def producer():
def wrapper(num) -> Tuple[Int, Int]:
if num < 0 :
return (num, num** 2 )
elif num > 0 :
return (num, num* 5 + 1 )
else :
return ()
return wrapper
def consumer():
def wrapper(item: Tuple[Int, Int]) -> Tuple[Int, Int]:
try :
org_num, new_num = item
return (org_num, org_num - new_num)
except :
return ()
return wrapper
if __name__ == "__main__" :
#初始化生产者消费者
producer_func = producer()
consumer_func = consumer()
#初始化生产者队列
producer_queue = Queue()
for i in range (10000 ):
producer_queue.put(i)
#初始化多线程函数
MyMultiThread = MultiThread(producer_queue = producer_queue,
producer_func = producer_func,
consumer_func = consumer_func,
max_producer_threads = 10 ,
max_consumer_threads = 20 ,
queue_timeout = 3 ,
early_run_consumer = False )
#开始run
MyMultiThread.run()
#跑完以后查看结果数量
print (MyMultiThread.result_queue.qsize())