MultiThread

1. Python 多线程使用

多线程位于pythonthreading.Thread模块中, 基础使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#coding: utf-8
import time
import random
import threading
from queue import Queue
from typing import Int

def actions(data: Queue):
while not data.empty():
x = data.get()
#随机停歇一段时间, 模拟网络请求响应等待时间
time.sleep(random.uniform(0, 3))
return x**2

if __name__ == "__main__":
input_data = Queue()
for i in range(1000):
input_data.put(i)
#生成5个线程, 并将线程保存下来
threads_list = []
for i in range(5):
tmp_thread = threading.Thread(target = actions, args = (input_data, ))
threads_list.append(tmp_thread)
#启动线程
for thread in threads_list:
#启动线程
thread.start()
#阻塞, 保证所有线程同时结束时才退出主进程
thread.join()

注意事项:

  • 适用任任务: I-O密集型任务, 等待时间间隔较长的任务
  • 线程安全: 多线程模式下, 共享主进程内存, 因此存在同时访问或修改主进程变量情况, 尽量使用队列Queue来保证线程安全

2. Python 多线程封装

2.1 封装的基本思路

  • 使用producer consumer 流水线
  • 使用Queue保证线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#coding: utf-8
import datetime
import threading
from queue import Queue
from typing import Callable, Any

class MultiThread:
def __init__(self,
producer_queue: Queue,
producer_func: Callable,
consumer_func: Callable,
max_producer_threads: int = 5,
max_consumer_threads: int = 5,
queue_timeout: int = 3,
early_run_consumer: bool = False,
print_qsize_step: int = 100):
self.producer_queue = producer_queue
self.consumer_queue = Queue()
self.result_queue = Queue()
#生产者消费者的处理函数, 是一个闭包
self.producer_func = producer_func
self.consumer_func = consumer_func
#生产者队列空的标志
self.producer_queue_empty = False
self.consumer_queue_empty = False
#队列等待时间, 秒
self.queue_timeout = queue_timeout
#最大线程数
self.max_producer_threads = max_producer_threads
self.max_consumer_threads = max_consumer_threads
#consumer
self.producer_threads_list = []
self.consumer_threads_list = []
self.early_run_consumer = early_run_consumer
self.print_qsize_step = print_qsize_step

def producer(self):
while True:
#生产者队列空了
if self.producer_queue.empty():
if not self.producer_queue_empty:
self.producer_queue_empty = True
break
try:
producer_out = self.producer_queue.get(timeout = self.queue_timeout)
producer_result = self.producer_func(producer_out)
if producer_result:
self.consumer_queue.put(producer_result)
except:
continue

def consumer(self):
while True:
if self.print_qsize_step > 1 and self.result_queue.qsize() % self.print_qsize_step == 0:
print(datetime.datetime.now(), "\t"*2, self.result_queue.qsize())
if self.consumer_queue.empty() and self.producer_queue_empty:
if not self.consumer_queue_empty:
self.consumer_queue_empty = True
break
try:
consumer_out = self.consumer_queue.get(timeout = self.queue_timeout)
consumer_result = self.consumer_func(consumer_out)
if consumer_result:
self.result_queue.put(consumer_result)
except:
continue
def run(self):
for i in range(self.max_producer_threads):
producer_thread = threading.Thread(target = self.producer)
self.producer_threads_list.append(producer_thread)
producer_thread.start()
#如果不提前开始运行, 就阻塞一下
if not self.early_run_consumer:
producer_thread.join()

for i in range(self.max_consumer_threads):
consumer_thread = threading.Thread(target = self.consumer)
self.consumer_threads_list.append(consumer_thread)
consumer_thread.start()
for thread in self.consumer_threads_list:
thread.join()

2.2 使用举例

  • 需要定义producer_funcconsumer_func, 并且推荐使用闭包的形式
  • 处理完成的结果保存于MultiThread.result_queue

代码如下:

#coding: utf-8
import Queue
from typing import Int, Tuple
def producer():
    def wrapper(num) -> Tuple[Int, Int]:
        if num < 0 :
            return  (num, num**2)
        elif num > 0:
            return (num, num*5 + 1)
        else:
            return () 
    return wrapper

def consumer():
    def wrapper(item: Tuple[Int, Int]) -> Tuple[Int, Int]:
        try:
            org_num, new_num = item
            return (org_num, org_num - new_num)
        except:
            return ()
    return wrapper
if __name__ == "__main__":
    #初始化生产者消费者
    producer_func = producer()
    consumer_func = consumer()

    #初始化生产者队列
    producer_queue = Queue()
    for i in range(10000):
        producer_queue.put(i)

    #初始化多线程函数
    MyMultiThread = MultiThread(producer_queue = producer_queue, 
                                producer_func = producer_func,
                                consumer_func = consumer_func,
                                max_producer_threads = 10,
                                max_consumer_threads = 20,
                                queue_timeout = 3,
                                early_run_consumer = False)
    #开始run
    MyMultiThread.run()
    #跑完以后查看结果数量
    print(MyMultiThread.result_queue.qsize())

MultiThread
https://hoshinory.com/20240412/ProgrammingLanguage/Python/MultiThreading/MultiThread/
Author
hoshinory
Posted on
April 12, 2024
Licensed under