Python 多进程
小于 1 分钟
Python 多进程
1. 进程
多进程可以避免 Python 多线程的劣势,充分发挥多核优势。
2. 创建进程
创建过程与多线程一致:
import multiprocessing as mp
def job(a: int, b: int) -> None:
print("a:", a, "b:", b)
if __name__ == "__main__":
p1 = mp.Process(target=job, args=(1, 2))
p1.start()
p1.join()
3. 使用 Queue
进行进程间通信
使用 multiprocessing.Queue
返回输出的值:
import multiprocessing as mp
def job(q: mp.Queue, index: int) -> None:
res = 0
for i in range(1000):
res += i + i * i + i**3
q.put(res + index)
if __name__ == "__main__":
q = mp.Queue()
p1 = mp.Process(target=job, args=(q, 1))
p2 = mp.Process(target=job, args=(q, 2))
p1.start()
p2.start()
p1.join()
p2.join()
print(q.get())
print(q.get())
4. 多线程与多进程对比
性能对比
import multiprocessing as mp
import threading as td
import time
def job(q: mp.Queue) -> None:
res = 0
for i in range(10000000):
res += i + i * i + i**3
q.put(res)
def multicore():
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
print("multicore:", q.get(), q.get())
def normal():
res = 0
for _ in range(2):
for i in range(10000000):
res += i + i * i + i**3
print("normal:", res)
def multithread():
q = mp.Queue()
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print("multithread:", res1, res2)
if __name__ == "__main__":
st = time.time()
normal()
et = time.time()
print("Time:", et - st)
st = time.time()
multicore()
et = time.time()
print("Time:", et - st)
st = time.time()
multithread()
et = time.time()
print("Time:", et - st)
5. Pool
进程池
import multiprocessing as mp
def job(x: int):
return x**x
def multicore():
pool = mp.Pool(processes=2)
res = pool.map(job, range(1000))
print(len(res))
if __name__ == "__main__":
multicore()
apply_async()
函数一次使用一个值
res = pool.apply_async(job, (2,))
print(res.get())
pool = mp.Pool(processes=2)
6. 共享内存
定义共享内存变量:
val = mp.Value('d', 1)
array = mp.Array('i', [1, 2, 3])
变量的类型值可以参考标准库文档。
7. Lock
锁
使用锁:
import multiprocessing as mp
import time
from multiprocessing.synchronize import Lock
def job(v, num: int, lock: Lock):
lock.acquire()
for _ in range(10):
time.sleep(0.1)
v.value += num
print(v.value)
lock.release()
def multicore():
lock = mp.Lock()
val = mp.Value("i", 0)
p1 = mp.Process(target=job, args=(val, 1, lock))
p2 = mp.Process(target=job, args=(val, 3, lock))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == "__main__":
multicore()