抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

线程同步与多进程

概述

线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作;

Event

Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,能过flagTrue或者False的变化来进行操作

名称 含义
set() 标记设置为True。
clear() 标记设置为False。
is_set() 标记是否为True。
import threading
e = threading.Event()
print(e.is_set()) # False
print(e.set()) # 给e设置一个标志位, 为True
print(e.is_set()) # True
import threading

import logging
import time 

FOMATE = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FOMATE)

# 1. 一个老板,雇佣了10个员工,工人生成10个杯子;
# 2. 工人生成中,老板盯着他,工人生产完,老板夸他;


# flag = False
event = threading.Event() # 事件对象,替代了flag

def boss():
    logging.info("I'm boss, I'm watching you ~~~")
    event.wait() # 阻塞等待,直到flag为True
    logging.info("Good job")



def worker(count=10):   
    global flag
    logging.info("I'm working for U.")
    cups = []
    while True:
        time.sleep(1)
        cups.append("1")
        if len(cups) >= count:
            event.set() # 设置flag为True
            break
    logging.info("Finished ,cpus={}".format(len(cups)))

b1  = threading.Thread(target=boss, name='boss1')
b2 = threading.Thread(target=boss, name='boss2')
w = threading.Thread(target=worker, name='worker')
b1.start()
b2.start()
w.start()
  • 延迟执行的线程
import threading    
e = threading.Event()

print(e.is_set())
print(e.wai(3))
print(e.is_set())

threading.Timer(3, lambda : e.set()).start()
print('-' * 30 )
print(e.wait(10))
print(e.is_set())
import threading

import logging
import time 

FOMATE = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FOMATE)

# 1. 一个老板,雇佣了10个员工,工人生成10个杯子;
# 2. 工人生成中,老板盯着他,工人生产完,老板夸他;


# flag = False
event = threading.Event() # 事件对象,替代了flag

def boss(e):
    logging.info("I'm boss, I'm watching you ~~~")
    e.wait() # 阻塞等待,直到flag为True
    logging.info("Good job")



def worker(e,count=10):   
    global flag
    logging.info("I'm working for U.")
    cups = []
    while not e.wait(0.5):
        # time.sleep(1)
        cups.append("1")
        if len(cups) >= count:
            event.set() # 设置flag为True
            #  
    logging.info("Finished ,cpus={}".format(len(cups)))

b1  = threading.Thread(target=boss, name='boss1',args=(event,)) # event 用于一对多的通知
b2 = threading.Thread(target=boss, name='boss2',args=(event,))
w = threading.Thread(target=worker, name='worker',args=(event,))
b1.start()
b2.start()
w.start()

Lock

  • Lock类是mutex互斥锁
  • 一旦一个线程获得锁,其它试图获取锁的线程将被阻塞,只到拥有锁的线程释放锁
  • 凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。
名称 含义
acquire(blocking=True, timeout=-1) 默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout 禁止设置。成功获取锁,返回True,否则返回False。
release() 释放锁。可以从任何线程调用释放。已上锁的锁会被重置为unlocked。未上锁的锁上调用,抛 RuntimeError 异常。
import threading

lock = threading.Lock()
print(lock.locked())
print(lock.acquire())

print(lock.locked())
print(lock.acquire(timeout=3))
print('='*30)
print(lock.acquire(False)) # 非阻塞的时候不能指定timeout
print('='*30)
import logging

import threading

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

lock = threading.Lock()

def worker():
    logging.info('I am working ~~~')
    lock.acquire()
    logging.info('end working ~~~')

for i in range(5):
    threading.Thread(target=worker,daemon=False, name='worker-{}'.format(i)).start()

while True:
    cmd = input('>>>')
    if cmd == 'r':
        lock.release()
        print('release one times')
    elif cmd == 'q':
        break
    else:
        print(threading.enumerate())
        print(lock.locked())
# 以上说明多个线程共享一个锁,一个线程获取锁后,其他线程就不能获取锁,只能等待锁释放后,才能获取锁,这是跨线程操作的
  • 锁简单的示例
import logging

import threading
import time 

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

# 10个工人共同生产100个杯子

lock = threading.Lock() # 锁对象
cpus = []

def worker(count=100):
    logging.info('working ~~~')
    while True:
        lock.acquire()
        if len(cpus) >= count:
            lock.release() # 第一处释放锁
            break
        time.sleep(0.01)
        # lock.release() # 第二处释放锁
        cpus.append('1')
        lock.release() # 第三处释放锁, 哪一处是对的? 
    logging.info('finished ~~~ cpus={}'.format(len(cpus)))

for i in range(10):
    threading.Thread(target=worker, name='worker-{}'.format(i)).start() # 10个线程,共同生产100个杯子
  • 锁分析

位置2分析:

  • 假设某一个瞬间,有一个工作线程A获取了锁,len(cups)正好有999个,然后就释放了锁,可以继续执行下面的语句,生产一个杯子,这地方不阻塞,但是正好杯子也没有生产完。锁释放后,其他线程就可以获得锁,线程B获得了锁,发现len(cups)也是999个,然后释放锁,然后也可以去生产一个杯子。锁释放后,其他的线程也可能获得锁。就说A和B线程都认为是999个,都会生产一个杯子,那么实际上最后一定会超出1000个。
  • 假设某个瞬间一个线程获得锁,然后发现杯子到了1000个,没有释放锁就直接break了,由于其他线程还在阻塞等待锁释放,这就成了死锁了。

位置3分析:

  • 获得锁的线程发现是999,有资格生产杯子,生产一个,释放锁,看似很完美
  • 问题在于,获取锁的线程发现杯子有1000个,直接break,没释放锁离开了,死锁了

位置1分析:

  • 如果线程获得锁,发现是1000,break前释放锁,没问题
  • 问题在于,A线程获得锁后,发现小于1000,继续执行,其他线程获得锁全部阻塞。A线程再次执行循环后,自己也阻塞了。死锁了。

锁用完了、一定要释放,即上下文干的事儿、进去要干啥、离开后一定要干啥~~

锁的应用场景

锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

如果全部都是读取同一个共享资源需要锁吗?

  • 不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一桦的值,所以不用加锁。

使用锁的注意事项:

  • 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
  • 加锁时间越短越好,不需要就立即释放锁。
  • 一定要避免死锁。

不使用锁,有了效率,但是结果是错的。
使用了锁,效率低下,但是结果是对的。

GIL全局解释器锁

CPython 在解释器进程级别有一把锁,叫做GIL,即全局解释器锁。

GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也只允许同时只能有一个CPU核心上运行该进程的一个线程。

在CPython中:

  • IO密集型任务,当某个线程阻塞时,GIL会释放,就会调度其他就绪线程。
  • CPU密集型任务,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU。
  • 在CPython中由于有GIL存在,IO密集型任务使用多线程较为合算;CPU密集型任务使用多进程,要绕开GIL。

新版CPython正在努力优化GIL的问题,但不是移除。

如果在意多线程的效率问题,请绕行,选择其它语言如Erlang、Go等。

Python中绝大多数内置数据结构的读、写操作都是原子操作。由于GIL的存在,Python的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是线程安全类型。

保留GIL的原因:
GvR坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用Python。而且移除GIL,会降低CPython单线程的执行效率。

import threading
import datetime
import logging
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'

def calc():
    s = 0 
    for i in range(100000000):
        s += 1
    logging.info(s)

start = datetime.datetime.now()

t1 = threading.Thread(target=calc)
t2 = threading.Thread(target=calc)
t3 = threading.Thread(target=calc)

t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()

delta = (datetime.datetime.now() - start).total_seconds() # 假并行,GIL锁
print(delta) #执行时间7.418743s

Queue的线程安全

标准库 queue 模块提供了 FIFO(先进先出)队列、LIFO(后进先出)队列以及优先队列等数据结构。Queue 类是线程安全的,适用于同一进程内多线程之间安全地交换数据。它内部使用了 Lock 和 Condition 来确保线程安全。

需要特别注意在多线程中使用 Queue 类时,例如以下代码:

import queue

q = queue.Queue(8)

if q.qsize() == 7:
    q.put()  # 上下两句可能被打断
if q.qsize() == 1:
    q.get()  # 未必会成功

如果不加锁,就无法确保获得准确的队列大小。这是因为在读取队列大小后,其他线程可能会修改队列,导致大小不再准确。因此,正确的做法是在进行 getput 操作时加锁,以确保操作的原子性和线程安全。

即使 Queue 类的 size 方法加了锁,也不能保证立即执行 getput 操作就一定成功。因为在执行这些操作时,可能会受到其他线程的干扰,导致操作无法立即完成。因此,在实际应用中,需要注意处理可能出现的竞态条件和线程安全问题。

多进程

由于 Python 的 GIL(全局解释器锁)存在,多线程未必是 CPU 密集型程序的好选择。多线程虽然可以实现并发执行,但由于 GIL 的限制,无法充分利用多核 CPU。

相比之下,多进程可以在完全独立的进程环境中运行程序,可以更充分地利用多处理器。但是进程之间的隔离带来了数据不共享的问题,而且进程的创建和销毁开销较大。

Python 的 multiprocessing 模块提供了多进程编程的支持,其中的 Process 类遵循了 Thread 类的 API,减少了学习难度。

由于 Python 的 GIL(全局解释器锁)存在,多线程未必是 CPU 密集型程序的好选择。多线程虽然可以实现并发执行,但由于 GIL 的限制,无法充分利用多核 CPU。

相比之下,多进程可以在完全独立的进程环境中运行程序,可以更充分地利用多处理器。但是进程之间的隔离带来了数据不共享的问题,而且进程的创建和销毁开销较大。

Python 的 `multiprocessing` 模块提供了多进程编程的支持,其中的 `Process` 类遵循了 `Thread` 类的 API,减少了学习难度。

对于上面这个程序,在同一主机(授课主机)上运行时长的对比如下:

  • 使用单线程和多线程运行时长约为 4 分钟多。
  • 使用多进程时长约为 1 分半。

可以观察到,多进程可以实现真正的并行,因为多个进程都在同时使用 CPU,而且进程库几乎没有什么学习难度。

需要注意的是,多进程的代码一定要放在 if __name__ == "__main__": 下面执行,以免在子进程中再次启动新的进程。

名称 说明
pid 进程id。
exitcode 进程的退出状态码。
terminate() 终止指定的进程。

进程间同步

Python 在进程间同步提供了和线程同步一样的类,使用的方法和效果也类似。不过,进程间同步的代价要高于线程间,因为涉及到进程的创建、销毁以及数据传输等开销,而且系统底层实现也不同。但是 Python 屏蔽了这些细节,让用户可以简单地使用多进程。

multiprocessing 模块还提供了共享内存和服务器进程来实现进程间数据的共享,同时也提供了用于进程间通信的 Queue 队列和 Pipe 管道等通信方式。

需要注意的是,多进程是启动多个解释器进程,因此进程间通信必须进行序列化和反序列化操作。此外,由于每个进程都有自己的 GIL,因此如果每个进程中没有实现多线程,全局解释器锁(GIL)在多进程中就失去了作用。

多进程、多线程的选择

  1. CPU密集型任务:
    在 CPython 中由于全局解释器锁(GIL)的存在,多线程时会存在锁的竞争,而且多核优势无法发挥,因此选择 Python 多进程效率更高。

  2. IO密集型任务:
    在 Python 中适合使用多线程,可以减少多进程间 IO 的序列化开销。在 IO 等待的时候,可以切换到其他线程继续执行,从而提高效率。

应用场景

  • 请求/应答模型:
    这是 WEB 应用中常见的处理模型。主进程启动多个 worker 工作进程,通常数量和 CPU 核心数相同,以发挥多核优势。
    在 worker 工作进程中,经常需要处理网络 IO 和磁盘 IO,可以启动多个线程来提高并发处理能力。
    Worker 进程处理用户的请求时,往往需要等待数据,并在处理完请求后通过网络 IO 返回响应。这种工作模式类似于 Nginx。

concurrent.futures 模块

concurrent.futures 模块是在 Python 3.2 版本引入的,它提供了一个高级的异步可执行的便利接口,用于异步并行任务编程。

提供的执行器

  1. ThreadPoolExecutor:
    提供了异步调用的线程池的 Executor。它可以用于在单个主线程中并发执行多个任务,适用于 IO 密集型任务,可以避免由于 GIL 的存在导致的并发问题。

  2. ProcessPoolExecutor:
    提供了异步调用的进程池的 Executor。它可以用于在多个进程中并发执行任务,每个任务在独立的进程中执行,适用于 CPU 密集型任务,可以充分利用多核处理器的优势。

ThreadPoolExecutor对象

方法 含义
ThreadPoolExecutor(max_workers=1) 池中至多创建 max_workers 个线程的池来同时异步执行,返回 Executor 实例。支持上下文,进入时返回自己,退出时调用shutdown(wait=True)
shutdown(wait=True) 清理池,wait 表示是否等待到任务线程完成。
submit(fn, *args, **kwargs) 提交执行的函数及其参数,如有空闲开启daemon线程,返回 Future 类的实例。
  • Future
方法 含义
done() 如果调用被成功的取消或者执行完成,返回True。
cancelled() 如果调用被成功的取消,返回True。
running() 如果正在运行且不能被取消,返回True。
cancel() 尝试取消调用。如果已经执行且不能取消返回False,否则返回True。
result(timeout=None) 取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常。
exception(timeout=None) 取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常。
import threading
import datetime
import logging
import multiprocessing
import time 
from concurrent.futures import ThreadPoolExecutor,wait

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def calc(base):
    s = base
    for i in range(100000000):
        s += 1  
    logging.info(s)
    return s

# executor = ThreadPoolExecutor(max_workers=3) # 线程池, 3个线程, 预先开好三个泳道, 有任务来了,就直接跳进泳道,即资源有限,复用泳道
# future = executor.submit(calc, 100)


start = datetime.datetime.now()
executor = ThreadPoolExecutor(max_workers=3)
fs = []

for  i in range(6):
    future = executor.submit(calc, i*100)
    fs.append(future)
    # future.done() # 判断任务是否完成
    # future.result() # 如果正在运行,会阻塞等待,直到任务完成
wait(fs) # 等待fs中的所有任务完成
delta = (datetime.datetime.now() - start).total_seconds()
print(delta) # 14.902915
# while True:
#     time.sleep(1)
#     print(threading.enumerate())

总结

该库统一了线程池、进程池调用,简化了编程;
是Python简单的思想哲学的体现;
缺点:无法设置线程名称。

评论