Lock对象
原语锁(互斥锁)是一个同步原语,状态是"已锁定"或者"未锁定"之一.两个方法acquire()和release()用于修改锁的状态.如果状态为已锁定,
尝试获取锁将被阻塞,直到锁被释放为止.如果有多个线程等待获取锁,当锁被释放时,只有一个线程能获取它,等待线程获得锁的顺序没有定义.
使用下面的构造函数可以创建新的Lock实例:
Lock()
创建新的Lock对象,初始状态为未锁定
Lock实例lock支持一下方法
lock.acquire([blocking])
获取锁,如果有必要,需要阻塞到锁释放为止.如果提供blocking参数并将它设为False,当无法获取锁时立即返回False,
如果成功获取锁则返回True.
lock.release()
释放一个锁.当锁处于未锁定状态时,或者从与原本调用acquire()方法的线程不同的线程调用此方法,将出现错误
# 未加锁,Windows下运行不会出现混乱,在Linux下运行就会发行结果出现混乱
import threading
import time
num = 0
def show():
global num
time.sleep(3)
num += 1
print(num)
for i in range(10):
t = threading.Thread(target=show)
t.start()
print('thread stop')
Linux下的结果(多运行几遍就会发现结果混乱)
1
2
3
5
7
4
8
6
9
10
# 加锁
import threading
import time
clock = threading.Lock()
num = 0
def show():
clock.acquire()
global num
time.sleep(3)
num += 1
print(num)
clock.release()
for i in range(10):
t = threading.Thread(target=show)
t.start()
print('thread stop')
Rlock对象
可重入锁是一个类似于Lock对象的同步原语,但同一个线程可以多次获取它。这允许拥有锁的线程执行嵌套的acquire()和release()操作。在这种
情况下,只有对外面的release()操作才能将锁重置为未锁定状态。
使用以下的构造函数可以创建一个新的RLock对象:
RLock()
创建新的可重入锁对象。RLock对象rlock支持一下方法。
rlock.acquire([blocking])
获取锁,如果有必要,需要阻塞到锁被释放为止。如果没有线程拥有锁,它将被锁定,而且递归级别被置为1.如果此线程已经拥有锁,锁的递归级别加1,
而且函数立即返回。
rlock.release()
通过减少锁的递归级别来释放它。如果在减值后递归级别为0,锁将被重置为未锁定状态。否则,锁将保持以锁定状态。只能由目前拥有锁的线程来调用此函数。
在使用诸如Lock,RLcok或Semphore之类的锁原语时,必须多加小心。锁的错误管理经常导致死锁或竞争条件。依赖的代码应该保证出现异常时正确地释放锁。
典型的代码如下:
1 try:
2 lock.acquire()
3 # 关键部分
4 statements
5 ...
6 finally:
7 lock.release()
8
9 # 另外,所有种类的锁还支持上下文管理协议(写起来更简洁些)
10
11 with lock:
12 # 关键部分
13 # with语句自动获取锁,并且在控制流离开上下文时自动释放锁
14 # 此外,编写代码时一般应该避免同时获取多个锁例如:
15 with lock_A:
16 # 关键部分
17 statements
18
19 ...
20 with lock_B:
21 # B的关键部分
22
23 ...
这通常很容易导致应用程序神秘死锁。尽管有几种策略可以避免出现这种情况(如分层锁定),但最好在编写代码时就避免这种做法
信号量
信号量是一个基于计数器的同步原语,每一次调用acquire()方法时此计数器减1,每次调用release()方法时此计数器加1.如果计数器为0,acquire()方法将会阻塞,
直到其他线程调用release()方法为止。
Semaphore([value])
创建一个新的信号量。value是计数器的初始值。如果省略此参数,计数器的值将被置为1.
Semaphore实例s支持一下方法
s.acquire([blocking])
获取信号量。如果进入时内部计数器大于0,此方法将把它的值减1,然后立即返回。如果它的值为0,此方法将阻塞,知道另一个线程调用release()方法为止。
blocking参数的行为与Lock和RLock对象中描述的相同
s.release()
通过将内部计数器的值加1来释放一个信号量。如果计数器为0,而且另一个线程正在等待,该线程将被唤醒。如果有多个线程正在等待,只能从它的acquire()
调用返回其中一个。线程释放的顺序并不确定
BoundedSemaphore([value])
创建一个新的信号量。Value是计数器的初始值。如果省略此参数,计数器的值将被置为1.BounderSemaphore的工作方式与Semaphore完全相同,
但release()操作的次数不能超过acquire()操作的次数
信号量与互斥锁之间的微妙差别在于:信号量可以用于发信号。例如,可以从不同的线程调用acquire()和release()方法,以便在生产者和消费者线程之间进行通信。
1 import threading
2 import time
3 produced = threading.Semaphore(3)
4 consumed = threading.Semaphore(2)
5
6
7 def producer():
8 for i in range(10):
9 consumed.acquire()
10 time.sleep(3)
11 print('-----------------', time.ctime())
12 consumed.release()
13
14
15 def consumer():
16 for j in range(10,20):
17 produced.acquire()
18 time.sleep(3)
19 print('=================', time.ctime())
20 consumed.release()
21
22
23 for i in range(20):
24 t1 = threading.Thread(target=producer)
25 t2 = threading.Thread(target=consumer)
26 t1.start()
27 t2.start()
运行结果
1 ----------------- Fri Dec 14 16:11:08 2018
2 ================= Fri Dec 14 16:11:08 2018
3 ----------------- Fri Dec 14 16:11:08 2018
4 ================= Fri Dec 14 16:11:08 2018
5 ================= Fri Dec 14 16:11:08 2018
6 ----------------- Fri Dec 14 16:11:11 2018
7 ----------------- Fri Dec 14 16:11:11 2018
8 ----------------- Fri Dec 14 16:11:11 2018
9 ----------------- Fri Dec 14 16:11:11 2018
10 ----------------- Fri Dec 14 16:11:11 2018
11 ----------------- Fri Dec 14 16:11:14 2018
事件
事件用于在线程之间通信。一个线程发出“事件”信号,一个或多个其他线程等待它,Event实例管理着一个内部标志,可以使用set()方法将它置为True,或者
使用clear()方法将它重置为False。wait()方法将阻塞,直到标志为True
Event()
创建新的Event实例,并将内部标志置为False。Event实例e支持一下方法
e.is_set()
只有当内部标志为True时才返回True。在老式代码中此方法叫isSet()
e.set()
内部标志为True。等待它变为True的所有线程都将被唤醒
e.clear()
将内部标志重置为False
e.wait([timeout])
阻塞直到内部标志为True。如果进入时内部标志为True,此方法将立即返回。否则它将阻塞,直到另一个线程调用set()方法将标志置为True,或者直到出现可选的
超时。timeout是一个浮点数,用于指定以秒为单位的超时期限
1 import threading
2 import time
3
4 event = threading.Event()
5
6
7 def chihuoguo():
8 # 等待事件,进入等待阻塞状态
9 print('%s 进入准备状态' % threading.currentThread().getName())
10 time.sleep(1)
11 event.wait()
12 # 收到事件后进入运行状态
13 print('%s 开始运行' % threading.currentThread().getName())
14
15 # 设置线程组
16 threads = []
17
18 # 创建新线程
19 thread1 = threading.Thread(target=chihuoguo, name='one')
20 thread2 = threading.Thread(target=chihuoguo, name='two')
21
22 # 添加到线程组
23 threads.append(thread1)
24 threads.append(thread2)
25
26 # 开启线程
27 for thread in threads:
28 thread.start()
29
30 time.sleep(0.1)
31 # 发送事件通知
32 print('event start')
33 event.set()
使用event.clear()
1 import threading
2 import time
3
4 event = threading.Event()
5
6
7 def chihuoguo():
8 for i in range(1, 5):
9 # 等待事件,进入等待阻塞状态
10 print('%s %s 进入准备状态' % (threading.currentThread().getName(), i))
11 time.sleep(1)
12 event.wait()
13
14 if i == 3:
15 event.clear()
16 # 收到事件后进入运行状态
17 print('%s %s 开始运行' % (threading.currentThread().getName(), i))
18
19 # 设置线程组
20 threads = []
21
22 # 创建新线程
23 thread1 = threading.Thread(target=chihuoguo, name='one')
24 thread2 = threading.Thread(target=chihuoguo, name='two')
25
26 # 添加到线程组
27 threads.append(thread1)
28 threads.append(thread2)
29
30 # 开启线程
31 for thread in threads:
32 thread.start()
33
34 time.sleep(0.1)
35
36 # event.clear()
37 # 发送事件通知
38 print('event start')
39 event.set()
尽管Event对象可用于给其他线程发信号,但不应该使用它们来实现在生成者/消费者问题中十分典型的通知。例如,应该避免写出下面这样的代码
1 evt = threading.Event()
2
3
4 def producer():
5 while True:
6 # 生产者
7 ...
8 evt.signal()
9
10
11 def consumer():
12 while True:
13 # 一个等待项
14 evt.wait()
15 # 消费项
16 ...
17 # 清除事件并再次等待
18 evt.clear()
这段代码并不可靠,因为在evt.wait()和evt.clear()操作之间,生产者可能产生了一个新项。但是,通过清除事件,在生产者创建一个新项之前,消费者可能看不到
这个新项。最好的情况是,程序将经过一段很短的停滞,对项的处理被莫名其妙推迟了。最坏的情况是,由于事件信号丢失,整个程序将挂起。要解决这类问题最好使用
条件变量