文档章节

7.线程通信

Eappo_Geng
 Eappo_Geng
发布于 10/22 20:38
字数 2264
阅读 14
收藏 0

在现实生活中,如果一个人团队正在共同完成任务,那么他们之间应该有通信,以便正确完成任务。 同样的比喻也适用于线程。

在编程中,要减少处理器的理想时间,我们创建了多个线程,并为每个线程分配不同的子任务。

因此,必须有一个通信设施,他们应该互相沟通交流,以同步的方式完成工作。

线程安全通信的Python数据结构

多线程代码出现了将信息从一个线程传递到另一个线程的问题。 标准的通信原语不能解决这个问题。 因此,需要实现我们自己的组合对象,以便在线程之间共享对象以使通信线程安全。

以下是一些数据结构,它们在进行一些更改后提供线程安全通信。

Set

为了以线程安全的方式使用set数据结构,需要扩展set类来实现我们自己的锁定机制。

from threading import Lock
class MySet(set):
    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(MySet, self).__init__(*args, **kwargs)

    def add(self, elem):
        self._lock.acquire()
        try:
            super(MySet, self).add(elem)
        finally:
            self._lock.release()

    def delete(self, elem):
        self._lock.acquire()
        try:
            super(MySet, self).remove(elem)
        finally:
            self._lock.release()

在这个类的构造函数中创建一个锁对象。 现在有两个函数 - add()delete()。 这些函数被定义并且是线程安全的。

使用修饰器实现
这是线程安全通信的另一个关键方法是使用装饰器。

from threading import Lock
def lock_decoraotr(method):
    def new_deco_method(self, *args, **kwargs):
        with self._lock:  # Lock()的进入函数(__enter__ 是加锁,退出函数 __exit__ 是解锁)
            return method(self, *args, **kwargs)
    return new_deco_method

class MySet(set):
    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(MySet, self).__init__(*args, **kwargs)

    @lock_decoraotr
    def add(self, element):
        return super(MySet, self).add(element)

    @lock_decoraotr
    def remove(self, element):
        return super(MySet, self).remove(element)

Lock()的进入函数(__enter__ 是加锁,退出函数 __exit__ 是解锁)

在上面的例子中,已经定义了一个名为lock_decorator的装饰器方法,该方法从Python方法类继承。 然后在这个类的构造函数中创建一个锁对象。 现在有两个函数 - add()delete()。 这些函数被定义并且是线程安全的。

List

列表数据结构对于临时内存存储而言是线程安全,快速以及简单的结构。在Cpython中,GIL可以防止对它们的并发访问。当我们知道列表是线程安全的,但是数据在哪里呢。

实际上,该列表的数据不受保护。例如,如果另一个线程试图做同样的事情,则L.append(x)不保证能够返回预期的结果。这是因为尽管append()是一个原子操作并且是线程安全的,但另一个线程试图以并发方式修改列表数据,因此可以看到竞争条件对输出的副作用。

为了解决这类问题并安全地修改数据,我们必须实现一个适当的锁定机制,这进一步确保多个线程不会潜在竞争条件。为了实现适当的锁定机制,可以像前面的例子那样扩展这个类。

队列

队列类型

在本节中,我们将获得关于不同类型的队列的信息。 Python提供了三种从queue模块使用的队列选项 -

  • 队列(FIFO,先进先出)
  • 栈(LIFO,后进先出)
  • 优先级

我们将在随后的章节中了解不同的队列。

队列(FIFO,先进先出)
它是Python提供的最常用的队列实现。 在这种先排队的机制中,首先得到服务。 FIFO也被称为正常队列。

from threading import Lock, Thread
import time
import random

def lock_decoraotr(method):
    def new_dec_method(self, *args, **kwargs):
        with self._lock:
            return method(self, *args, **kwargs)
    return new_dec_method

class Queue1():
    '''队列'''
    def __init__(self):
        self._lock = Lock()
        self.queue = []

    @lock_decoraotr
    def list_in(self, elem):
        self.queue.append(elem)

    @lock_decoraotr
    def list_out(self):
        if self.queue:
            return self.queue.pop(0)

    def show(self):
        return self.queue


def get_queue(queue):
    while True:
        time.sleep(2)
        print(queue.list_out(), "出队, 出队后的数据:", queue.show())

def put_queue(queue):
    while True:
        time.sleep(1)
        elem = random.randint(1, 10)
        queue.list_in(elem)

        print(elem, "入队, 入队后的数据:", queue.show())

if __name__ == '__main__':
    '''队列操作'''
    queue = Queue1()
    t1 = Thread(target=put_queue, args=(queue,))
    t2 = Thread(target=get_queue, args=(queue,))
    t1.start()
    t2.start()
10 入队, 入队后的数据: [10]
10 出队, 出队后的数据: []
10 入队, 入队后的数据: [10]
10 入队, 入队后的数据: [10, 10]
10 出队, 出队后的数据: [10]
5 入队, 入队后的数据: [10, 5]
5 入队, 入队后的数据: [10, 5, 5]
10 出队, 出队后的数据: [5, 5]
2 入队, 入队后的数据: [5, 5, 2]
10 入队, 入队后的数据: [5, 5, 2, 10]
5 出队, 出队后的数据: [5, 2, 10]
9 入队, 入队后的数据: [5, 2, 10, 9]
8 入队, 入队后的数据: [5, 2, 10, 9, 8]
5 出队, 出队后的数据: [2, 10, 9, 8]
1 入队, 入队后的数据: [2, 10, 9, 8, 1]
1 入队, 入队后的数据: [2, 10, 9, 8, 1, 1]
2 出队, 出队后的数据: [10, 9, 8, 1, 1]
5 入队, 入队后的数据: [10, 9, 8, 1, 1, 5]
3 入队, 入队后的数据: [10, 9, 8, 1, 1, 5, 3]
...

栈(LIFO,后进先出)

队列使用与FIFO(先进先出)队列完全相反的类比。 在这个队列机制中,最后一个将首先获得服务。 这与实现堆栈数据结构相似。 LIFO队列在实施深度优先搜索时非常有用,如人工智能算法。

from threading import Lock, Thread
import time
import random

def lock_decoraotr(method):
    def new_dec_method(self, *args, **kwargs):
        with self._lock:
            return method(self, *args, **kwargs)
    return new_dec_method

class Queue2():
    '''栈'''
    def __init__(self):
        self._lock = Lock()
        self.stack = []

    @lock_decoraotr
    def list_in(self, elem):
        self.stack.append(elem)

    @lock_decoraotr
    def list_out(self):
        if self.stack:
            return self.stack.pop(-1)

    def show(self):
        return self.stack

def get_queue(queue):
    while True:
        time.sleep(2)
        print(queue.list_out(), "出队, 出队后的数据:", queue.show())

def put_queue(queue):
    while True:
        time.sleep(1)
        elem = random.randint(1, 10)
        queue.list_in(elem)

        print(elem, "入队, 入队后的数据:", queue.show())

if __name__ == '__main__':
    '''栈操作'''
    stack = Queue2()
    t1 = Thread(target=put_queue, args=(stack,))
    t2 = Thread(target=get_queue, args=(stack,))
    t1.start()
    t2.start()
4 入队, 入队后的数据: [4]
4 出队, 出队后的数据: []
7 入队, 入队后的数据: [7]
8 入队, 入队后的数据: [7, 8]
8 出队, 出队后的数据: [7]
7 入队, 入队后的数据: [7, 7]
1 入队, 入队后的数据: [7, 7, 1]
1 出队, 出队后的数据: [7, 7]
8 入队, 入队后的数据: [7, 7, 8]
1 入队, 入队后的数据: [7, 7, 8, 1]
1 出队, 出队后的数据: [7, 7, 8]
6 入队, 入队后的数据: [7, 7, 8, 6]
6 入队, 入队后的数据: [7, 7, 8, 6, 6]
6 出队, 出队后的数据: [7, 7, 8, 6]
6 入队, 入队后的数据: [7, 7, 8, 6, 6]
3 入队, 入队后的数据: [7, 7, 8, 6, 6, 3]
...

优先队列

在FIFO和LIFO队列中,项目顺序与插入顺序有关。 但是,有很多情况下优先级比插入顺序更重要。

让我们考虑一个真实世界的例子。 假设机场的安保人员正在检查不同类别的人员。 VIP人员,航空公司工作人员,海关人员,类别可能会优先检查,而不是像到平民那样根据到达情况进行检查。

需要考虑优先队列的另一个重要方面是如何开发任务调度器。 一种常见的设计是在队列中优先处理最具代理性的任务。 该数据结构可用于根据队列的优先级值从队列中提取项目。

from threading import Lock, Thread
import time
import random

def lock_decoraotr(method):
    def new_dec_method(self, *args, **kwargs):
        with self._lock:
            return method(self, *args, **kwargs)
    return new_dec_method

class Queue3():
    def __init__(self):
        self._lock = Lock()
        self.priority = []

    @lock_decoraotr
    def list_in(self, elem):
        # print(elem)
        self.priority.append(elem)

    @lock_decoraotr
    def list_out(self):
        if self.priority:
            # 找出优先级最高的数输出
            elem = min(self.priority, key=lambda x:x[0])
            self.priority.remove(elem)
            return elem

    def show(self):
        return self.priority

def get_queue(queue):
    while True:
        time.sleep(2)
        print(queue.list_out(), "出队, 出队后的数据:", queue.show())

def put_queue(queue):
    while True:
        time.sleep(1)
        elem = (random.randint(1, 10), random.random())
        queue.list_in(elem)
        print(elem, "入队, 入队后的数据:", queue.show())

if __name__ == '__main__':
    '''优先级操作'''
    p = Queue3()
    t1 = Thread(target=put_queue, args=(p,))
    t2 = Thread(target=get_queue, args=(p,))
    t1.start()
    t2.start()
(7, 0.7661931798693407) 入队, 入队后的数据: [(7, 0.7661931798693407)]
(7, 0.7661931798693407) 出队, 出队后的数据: []
(10, 0.5066977063848174) 入队, 入队后的数据: [(10, 0.5066977063848174)]
(8, 0.706281203111283) 入队, 入队后的数据: [(10, 0.5066977063848174), (8, 0.706281203111283)]
(8, 0.706281203111283) 出队, 出队后的数据: [(10, 0.5066977063848174)]
(3, 0.9535116245244307) 入队, 入队后的数据: [(10, 0.5066977063848174), (3, 0.9535116245244307)]
(5, 0.12634403802640948) 入队, 入队后的数据: [(10, 0.5066977063848174), (3, 0.9535116245244307), (5, 0.12634403802640948)]
(3, 0.9535116245244307) 出队, 出队后的数据: [(10, 0.5066977063848174), (5, 0.12634403802640948)]
(9, 0.9762457150750048) 入队, 入队后的数据: [(10, 0.5066977063848174), (5, 0.12634403802640948), (9, 0.9762457150750048)]
(3, 0.27709827939028453) 入队, 入队后的数据: [(10, 0.5066977063848174), (5, 0.12634403802640948), (9, 0.9762457150750048), (3, 0.27709827939028453)]
...

© 著作权归作者所有

Eappo_Geng
粉丝 7
博文 120
码字总数 135265
作品 0
徐汇
程序员
私信 提问
进程间、线程间通信方式

一、进程间的通信方式 (1)管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。 (2)有名管道 (nam...

为了美好的明天
2018/02/07
210
0
Objective-C-GCD

队列 & 线程的执行方式(任务) 任务:block,消息等函数块,他是由线程执行的。 线程执行任务的方式: 同步线程:当前线程a只能执行完事件A才能执行任务B。 异步线程:当前线程a执行A的时候...

BKF_
2016/03/03
286
0
C# 使用多线程如何传递两个以上参数的实现方法(附示例)

某些情况下当我们启动一个线程的时候会向该线程传递参数,有时除了功能上需要之外,我觉得还有就是为了我们能管理好启动的线程组(当然,只开一两个线程什么的也谈不上不好管理了,我说的线程...

余二五
2017/11/15
0
0
并发编程(7):线程之间的通信wait和notify

概念 线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程间的通信就成为整体的必用方式之一。当线程存在通信指挥,系统间的交互性会更强大,在提高CPU利用...

科技探索者
2017/11/15
0
0
多线程-线程控制

join()方法,当某个程序执行路中的调用其他线程的thread1.join()方法,该程序需要等待thread1执行结束再继续运行。 线程让步:yield,仅是将线程从执行状态转为就绪状态,不阻塞线程。在多...

skanda
2016/11/13
10
0

没有更多内容

加载失败,请刷新页面

加载更多

射频 - 灵敏度

灵敏度是一个规格指示器,显示设备接收信号的程度,并在“令人满意的错误率”内对其进行解码。 (“令人满意的错误率”的错误率应该有多低,通常由每个应用自己的规范定义)。灵敏度以功率电...

demyar
28分钟前
6
0
java @Contract 的pom坐标

应用 @Contract(value = "_ -> param1", pure = true) int getNum(int i){ return i;} pom <dependency> <groupId>org.jetbrains</groupId> ......

macker
32分钟前
6
0
git 拉取指定分支

//默认(xxx指的是url)git clone xxx//指定分支(xxx指的是url)git clone -b 分支名 xxx

uug
39分钟前
7
0
阿里云报SQLSTATE[HY000] [2002] php_network_getaddresses: getaddrinfo failed:根本原因

报错内容如下: 系统架构如下: 15台阿里云服务器集群(主服放在华南可用区A区,其他14台分别部署在A/B/C/D/E区都有) + 1台阿里云Reids(A区) + 5台阿里云mysql服务器(1主4从)(A区) 报错请况: 这个...

银装素裹
42分钟前
6
0
Android Handler理解

Handler是什么 Handler是Android 线程间通信工具类。 一般用于子线程向主线程发送消息,将子线程中执行结果通知主线程,从而在主线程中执行UI更新操作。 源码角度理解 Handler负责发送(sen...

二营长的意大利炮手
47分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部