文档章节

Python 学习笔记 - 线程(线程锁,信标,事件和条件)

乐搏学院
 乐搏学院
发布于 2017/04/10 10:43
字数 1876
阅读 7
收藏 0

前面学习了线程基本的概念和创建线程的两种方法,现在看看多线程如何处理竞争条件(racing condition)的问题,当多个线程同时执行的时候,怎么进行控制。

 

比如说,下面的例子中 我使用了第二种创建的方式,自定义一个类,继承Thread类,然后自定义run()来执行我的方法。在这个run方法里面,每次都对全局变量加1

 

在主线程里面,他调用一个自己定义的函数,在这个函数里面创建了5000个线程;每个线程都加入一个列表,然后对每个对象都使用join,这是确保主线程等着直到所有子线程完成。最后输出结果

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

import time

import threading

some_var = 0

class IncrementThread(threading.Thread):

    def run(self):

        #we want to read a global variable

        #and then increment it

        global some_var

        read_value = some_var

        print ("some_var in %s is %d" % (self.name, read_value))

        some_var = read_value + 1

        print ("some_var in %s after increment is %d" % (self.name, some_var))

def use_increment_thread():

    threads = []

    start=time.time()

    for in range(5000):

        = IncrementThread()

        threads.append(t)

        t.start()

    for in threads:

        t.join()

    print("Total time %s"%(time.time()-start))

    print ("After 5000 modifications, some_var should have become 5000")

    print ("After 5000 modifications, some_var is %d" % (some_var,))

use_increment_thread()

 

------------------

 

Total time 1.7780036926269531

After 5000 modifications, some_var should have become 5000

After 5000 modifications, some_var is 4987

 

可以看见结果并不是5000,这是为啥呢? 如果查看过程,会发现有些线程刚刚获取了一个值,还未来得及处理,执行的权力就转交给了另外一个线程,这样就导致计数错误。为了确保每一个线程都成功的执行了他应该执行的代码,我们可以加一把锁。

1

2

3

4

5

6

some_var in Thread-1524 is 1523

some_var in Thread-1524 after increment is 1524

some_var in Thread-1525 is 1524

some_var in Thread-1526 is 1524

some_var in Thread-1526 after increment is 1525

some_var in Thread-1527 is 1525

 

下面是修订过的代码,通过使用Lock()函数,我们在执行代码前acquire(),之后release(),在当前线程完成这段代码之前,其他的线程不可以执行相同的操作。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

some_var = 0

lock=threading.Lock()

class IncrementThread(threading.Thread):

    def run(self):

        #we want to read a global variable

        #and then increment it

        global some_var

        lock.acquire()

        read_value = some_var

        print ("some_var in %s is %d" % (self.name, read_value))

        some_var = read_value + 1

        print ("some_var in %s after increment is %d" % (self.name, some_var))

        lock.release()

def use_increment_thread():

    threads = []

    start=time.time()

    for in range(5000):

        = IncrementThread()

        threads.append(t)

        t.start()

    for in threads:

        t.join()

    print("Total time %s"%(time.time()-start))

    print ("After 5000 modifications, some_var should have become 5000")

    print ("After 5000 modifications, some_var is %d" % (some_var,))

use_increment_thread()

---------------

Total time 1.6369926929473877

After 5000 modifications, some_var should have become 5000

After 5000 modifications, some_var is 5000

 

线程锁,除了上面的Lock()之外,还有一些常用的,比如

Rlock(),允许多重嵌套锁,而Lock()只能锁一次;

还有一个常见的是BoundedSemaphore(信标),可以指定一次锁几个

 

例如,我可以指定一次放行5个,30个线程分6次出来

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

-*- coding:utf-8 -*-

import threading

import time

NUM = 10

def func(i,l):

    global NUM

    # 上锁

    l.acquire() # 30,5 25m5,20

    NUM -= 1

    time.sleep(2)

    print(NUM,i)

    # 开锁

    l.release()

# lock = threading.Lock()

# lock = threading.RLock()

lock = threading.BoundedSemaphore(5)

for in range(30):

    = threading.Thread(target=func,args=(i,lock,))

    t.start()

 

还有一种放行的方式叫做Event(),他是统一的放行或者堵塞。

工作方式是通过一个flag的值,set()设置为True,clear()设置为False。如果flag为False,wait()则会堵塞。初始化的时候flag默认为False,即堵塞

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

#!/usr/bin/env python

# -*- coding:utf-8 -*-

# Author:Alex Li

import threading

def func(i,e):

    print(i)

    e.wait() # 检测是什么等,如果是红灯,停;绿灯,行

    print(i+100)

event = threading.Event() #初始化,flag设置为False(红灯)

for in range(10):

    = threading.Thread(target=func, args=(i,event,))

    t.start()

#========

# event.clear() # 设置成红灯,可以不写,因为初始化已经实现了

inp = input('>>>')

if inp == "1":

    event.set() # 设置成绿灯

-----------------

 

0

1

2

3

4

5

6

7

8

9

>>>1

100

104

103

105

107

109

102

106

101

108

 

最后我们来看看condition(条件),我们可以灵活的设置一次放行1个或者多个线程。这些线程都hang住,直到收到notify(通知)才放行

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

import threading

class t1(threading.Thread):

    def __init__(self,i,con):

        self.i=i

        self.con=con

        super(t1,self).__init__()

    def run(self):

        print(self.i)

        self.con.acquire()

        self.con.wait()

        print(self.i+100)

        self.con.release()

c=threading.Condition()

def test(con):

    for in range(10):

        t=t1(i,con)

        t.start()

    while True:

        inp=input('>>>')

        if inp=='q':

            break

        con.acquire()

        con.notify(int(inp))

        con.release()

test(c)

--------------

0

1

2

3

4

5

6

7

8

9

>>>2

>>>100

101

3

>>>102

103

104

4

>>>105

107

108

106

可以看见上面的代码里面,在wait()和notify()的前后都上了锁,这个锁是初始化的时候自动创建的。如果我们把他去掉,他会直接抛出异常

1

2

3

4

5

6

7

8

Traceback (most recent call last):

  File "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py", line 56in <module>

    test(c)

  File "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s6.py", line 53in test

    con.notify(int(inp))

  File "C:\Program Files\Python3\lib\threading.py", line 343in notify

    raise RuntimeError("cannot notify on un-acquired lock")

RuntimeError: cannot notify on un-acquired lock

 

看看源码,他的确是强调只能对上锁的线程进行操作

1

2

3

4

5

6

7

8

9

10

11

    def notify(self, n=1):

        """Wake up one or more threads waiting on this condition, if any.

        If the calling thread has not acquired the lock when this method is

        called, a RuntimeError is raised.

        This method wakes up at most n of the threads waiting for the condition

        variable; it is a no-op if no threads are waiting.

        """

        if not self._is_owned():

            raise RuntimeError("cannot notify on un-acquired lock")

        all_waiters = self._waiters

        waiters_to_notify = _deque(_islice(all_waiters, n))

 

conditon还有一种写法是wait_for,他后面参数需要传入一个函数的名字,然后他会内部调用这个函数,如果返回值为真,那么就继续,否则就等着

 

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

import threading

def condition():

    ret = False

    = input('>>>')

    if == 'true':

        ret = True

    else:

        ret = False

    return ret

def func(i,con):

    print(i)

    con.acquire()

    con.wait_for(condition)

    print(i+100)

    con.release()

= threading.Condition()

for in range(10):

    = threading.Thread(target=func, args=(i,c,))

    t.start()

---------------

"C:\Program Files\Python3\python.exe" "C:/Users/yli/Documents/Tencent Files/38144205/FileRecv/FileRecv/day11/s7.py"

0

>>>1

2

3

4

5

6

7

8

9

true

100

>>>true

101

>>>ksdf

>>>true

103

>>>1

>>>1

>>>

>>>

 

当我们学完conditon之后,如果回头看前面event()的源码,会发现他本质就是调用的condition,当他放行的时候,他直接放行了所有的线程;因此Event的效果是要么全部停,要么全部开通

1

2

3

4

5

6

7

8

9

10

class Event:

    """Class implementing event objects.

    Events manage a flag that can be set to true with the set() method and reset

    to false with the clear() method. The wait() method blocks until the flag is

    true.  The flag is initially false.

    """

    # After Tim Peters' event class (without is_posted())

    def __init__(self):

        self._cond = Condition(Lock())

        self._flag = False

1

2

3

4

5

6

7

8

   def set(self):

        """Set the internal flag to true.

        All threads waiting for it to become true are awakened. Threads

        that call wait() once the flag is true will not block at all.

        """

        with self._cond:

            self._flag = True

            self._cond.notify_all()

 

登录乐搏学院官网http://www.learnbo.com/

或关注我们的官方微博微信,还有更多惊喜哦~

© 著作权归作者所有

乐搏学院
粉丝 9
博文 526
码字总数 707467
作品 0
丰台
程序员
私信 提问
Python:进程(threading)

这里是自己写下关于 Python 跟进程相关的 threading 模块的一点笔记,跟有些跟 Linux 调用挺像的,有共通之处。 Thread https://docs.python.org/3/library/threading.html?highlight=thread...

ypingcn
2018/01/31
0
0
Python学习记录-多进程和多线程

Python学习记录-多进程和多线程 [TOC] 1. 进程和线程 进程 狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。 广义定义:进程是一个具有...

ygqygq2
2018/04/27
0
0
Python3基础之学习笔记(九)-线程-进程-协程

文章目录 1. 线程与进程 2. 协程 1. 线程与进程 进程:是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 线程:是操作系统能够...

GoldenKitten
01/15
0
0
使用 Python 进行线程编程

对于 Python 来说,并不缺少并发选项,其标准库中包括了对线程、进程和异步 I/O 的支持。在许多情况下,通过创建诸如异步、线程和子进程之类的高层模块,Python 简化了各种并发方法的使用。除...

丰圣谋
2013/08/22
0
0
Python:使用threading模块实现多线程(转)

Python:使用threading模块实现多线程(转) 分类: python 标签: thread 评论: 暂无评论 阅读:5,420 views 综述 Python这门解释性语言也有专门的线程模型,Python虚拟机使用GIL(Global In...

威武不能笑
2014/12/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

group by分组后获得每组中时间最大的那条记录

用途: GROUP BY 语句用于 对一个或多个列对结果集进行分组。 例子: 原表: 现在,我们希望根据USER_ID 字段进行分组,那么,可使用 GROUP BY 语句。 我们使用下列 SQL 语句: SELECT ID,US...

豆花饭烧土豆
51分钟前
2
0
android6.0源码分析之Camera API2.0下的Preview(预览)流程分析

本文将基于android6.0的源码,对Camera API2.0下Camera的preview的流程进行分析。在文章android6.0源码分析之Camera API2.0下的初始化流程分析中,已经对Camera2内置应用的Open即初始化流程进...

天王盖地虎626
今天
4
0
java 序列化和反序列化

1. 概述 序列恢复为Java对象的过程。 对象的序列化主要有两 首先我们介绍下序列化和反序列化的概念: 序列化:把Java对象转换为字节序列的过程。 反序列化:把字节序列恢复为Java对象的过程。...

edison_kwok
今天
2
0
分布式数据一致性

狼王黄师傅
今天
2
0
经验

相信每位开发者在自己开发的过程中,都会反思一些问题,比如怎样提高编程能力、如何保持心态不砍产品经理、996 之后怎样恢复精力……最近开发者 Tomasz Łakomy 将他 7 年的开发生涯中学习到...

WinkJie
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部