线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from concurrent.futures import ThreadPoolExecutor

def main(url):
pass

Pool = ThreadPoolExecutor(max_workers=10)
list(Pool.map(main, links))

# 传入多个参数
def add(x, y):
return x + y

nums = [(1, 2), (3, 4), (5, 6)]

with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(add, *zip(*nums))

# 传参为空
import threading
for i in range(10):
threading.Thread(target=main).start()

进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
import multiprocessing as mp

pool = mp.Pool(processes=10)
pool.map(main, url_lst)

# 传入多个参数
def func(arg1, arg2):
print(arg1, arg2)

kwargs = {'arg1': 'hello', 'arg2': 'world'}
p = multiprocessing.Process(target=func, kwargs=kwargs)
p.start()
# p.join() 会堵塞住

多线程

适用于IO密集型任务

基本使用和实现效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import threading as td

import sys
import time

def threadJobOne():
# 获取当前函数名
print(td.current_thread())
func_name = sys._getframe().f_code.co_name
print("start ", func_name)
time.sleep(3)
print("end ", func_name)


def threadJobTwo():
print(td.current_thread())
func_name = sys._getframe().f_code.co_name
print("start ", func_name)
time.sleep(2)
print("end ", func_name)


def threadJobThree():
print(td.current_thread())
func_name = sys._getframe().f_code.co_name
print("start ", func_name)
time.sleep(1)
print("end ", func_name)


if __name__ == "__main__":
t1 = td.Thread(target=threadJobOne, name="T1")
t2 = td.Thread(target=threadJobTwo, name="T2")
t3 = td.Thread(target=threadJobThree, name="T3")
t1.start()
t2.start()
t3.start()
print("已激活线程数:", td.active_count()) # 已激活线程数
print(td.enumerate()) # 列举全部线程名

'''
输出:
<Thread(T1, started 17812)>
start threadJobOne
<Thread(T2, started 14676)>
start threadJobTwo
<Thread(T3, started 31576)>
start threadJobThree
已激活线程数:4
[<_MainThread(MainThread, started 18304)>, <Thread(T1, started 17812)>, <Thread(T2, started 14676)>, <Thread(T3, started 31576)>]

end threadJobThree
end threadJobTwo
end threadJobOne
'''

threadJobThree 所在的线程是最后启动的,但因为耗时最短,所以是最先完成

锁和传参

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading as td

import time


lock = td.Lock() # 主要用于解决全局变量的问题
lock.acquire() # 锁住当前线程
lock.release() # 释放之前锁住的线程,可在其他线程调用

def do_something(thread_id):
print(td.current_thread())
print("start thread", thread_id)
time.sleep(1)
print("end thread", thread_id)


if __name__ == "__main__":
for i in range(5):
t = td.Thread(target=do_something, args=(i,))
t.start()
# t.join() # 等待子线程结束,才执行主线程后面的语句,在for循环使用的话就变成了单线程

多进程

适用于计算密集型任务

基本使用

1
2
3
p = mp.Process(target=Job, args=(a,))
p.start()
p.join()

锁、变量共享

1
2
3
4
5
6
7
8
lock = mp.Lock()
lock.acquire()
lock.release()

# 生成队列,放置值
queue = mp.Queue()
# 各进程共享全局变量
value = mp.Value()

多线程多进程对比(计算密集)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor


def calculate_job(ranNum):
return [i + i**2 + i**3 for i in range(ranNum)]


if __name__ == "__main__":
"""
完成1000次calculate_job(100) 所花时间
"""

# exited with code=0 in 12.048 seconds
for i in range(1000):
calculate_job(10000)

# exited with code=0 in 10.4 seconds
num_lst = [10000 for i in range(1000)]
pool = mp.Pool(processes=10)
pool.map(calculate_job, num_lst)

# exited with code=0 in 12.391 seconds
Pool = ThreadPoolExecutor(max_workers=10)
num_lst = [10000 for i in range(1000)]
for task in Pool.map(calculate_job, num_lst):
pass