Python 多进程

1. 进程

多进程可以避免 Python 多线程的劣势,充分发挥多核优势。

2. 创建进程

创建过程与多线程一致:

import multiprocessing as mp


def job(a: int, b: int) -> None:
    print("a:", a, "b:", b)


if __name__ == "__main__":
    p1 = mp.Process(target=job, args=(1, 2))
    p1.start()
    p1.join()

3. 使用 Queue 进行进程间通信

使用 multiprocessing.Queue 返回输出的值:

import multiprocessing as mp


def job(q: mp.Queue, index: int) -> None:
    res = 0
    for i in range(1000):
        res += i + i * i + i**3
    q.put(res + index)


if __name__ == "__main__":
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q, 1))
    p2 = mp.Process(target=job, args=(q, 2))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(q.get())
    print(q.get())

4. 多线程与多进程对比

性能对比

import multiprocessing as mp
import threading as td
import time


def job(q: mp.Queue) -> None:
    res = 0
    for i in range(10000000):
        res += i + i * i + i**3
    q.put(res)


def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("multicore:", q.get(), q.get())


def normal():
    res = 0
    for _ in range(2):
        for i in range(10000000):
            res += i + i * i + i**3
    print("normal:", res)


def multithread():
    q = mp.Queue()
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print("multithread:", res1, res2)


if __name__ == "__main__":
    st = time.time()
    normal()
    et = time.time()
    print("Time:", et - st)
    st = time.time()
    multicore()
    et = time.time()
    print("Time:", et - st)
    st = time.time()
    multithread()
    et = time.time()
    print("Time:", et - st)

5. Pool 进程池

import multiprocessing as mp


def job(x: int):
    return x**x


def multicore():
    pool = mp.Pool(processes=2)
    res = pool.map(job, range(1000))
    print(len(res))


if __name__ == "__main__":
    multicore()

apply_async() 函数一次使用一个值

res = pool.apply_async(job, (2,))
print(res.get())
pool = mp.Pool(processes=2)

6. 共享内存

定义共享内存变量:

val = mp.Value('d', 1)
array = mp.Array('i', [1, 2, 3])

变量的类型值可以参考标准库文档在新窗口打开

7. Lock

使用锁:

import multiprocessing as mp
import time
from multiprocessing.synchronize import Lock


def job(v, num: int, lock: Lock):
    lock.acquire()
    for _ in range(10):
        time.sleep(0.1)
        v.value += num
        print(v.value)
    lock.release()


def multicore():
    lock = mp.Lock()
    val = mp.Value("i", 0)
    p1 = mp.Process(target=job, args=(val, 1, lock))
    p2 = mp.Process(target=job, args=(val, 3, lock))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == "__main__":
    multicore()