「python并发编程」 管道与队列实现进程间通信

1.1 队列实现进程间通信

1.1.1 Queue常用类

说明

Queue([maxsize])

创建共享的进程队列,maxsize是队列中允许的最大项,省略则队列无大小限制

1.1.2 Queue常用实例q常用方法

方法

说明

q.cancel_join_thread()

不会在进程退出时自动连接后台线程,可防止join_thread()方法阻塞

q.join_thread()

连接队列的后台线程。此方法用于q.close()方法之后,等待所有队列项被消耗完。调用q.cancel_join_thread()方法可以禁止这种行为

q.close()

关闭队列。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据

q.empty()

如调用此方法时q为空,返回True。此方法返回结果不可靠,在返回和使用结果之间,队列有可能已经加入新的项

q.full()

如调用此方法时q已满,返回Ture。同样此方法返回结果不可靠

q.get([block [, timeout]])

返回q中的一项,如果q为空,此方法将阻塞,直到队列中有项可用为止。block用于控制阻塞行为,默认为True。如果设为False,将引发Queue.Empty异常,timeout为可选超时时间

q.get_nowait()

等同于 q.get(False)方法

q.put(item [, block [, timeout]])

将item放入队列中,如果队列已满,此方法将阻塞至有可用空间为止。block控制阻塞行为,默认为True。如果设置为False,将会引发Queue.Full异常。timeout指定在阻塞模式中等待可用空间的时间长短,超时后将引发Queue.Full异常

q.put_nowait(item)

等同于q.put(item, False)方法

q.qsize()

返回队列中项的数量,结果不可靠

1.1.3 JoinableQueue

说明

JoinableQueue([maxsize])

创建可连接的共享进程队列,类似Queue的一个对象,但它允许项的消费者通知生产者项已被成功处理

1.1.4 JoinableQueue 实例除Queue实例方法处的方法

方法

说明

q.task_done()

消费者使用此方法发出信号,表示q.get()返回项已经被成功处理,如果调用此方法的次数大于从队列中删除的项的数量,将引发ValueError异常

q.join()

生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到为队列中的每个项均被调用q.task_done()方法为止

1.1.5 JoinableQueue实现进程间通信示例

import multiprocessing


def consumer(input_q): 
    """ 消费者
    """
    while True:
        item = input_q.get()
        # 处理项
        print(f'处理input_q: {item}')
        # 发出信号通知任务完成
        input_q.task_done()


def producer(sequence, output_q):
    """ 生产者
    """
    for item in sequence:
        output_q.put(item)


if __name__ == '__main__':
    q = multiprocessing.JoinableQueue()
    
    # 运行消费者进程
    cons_p = multiprocessing.Process(target=consumer, args=(q,))
    cons_p.daemon = True
    cons_p.start()

    # 运行生成者进程
    sequence = (1,2,3,4)
    producer(sequence, q)

    # 等待所有项被处理完成
    q.join()


1.1.6 JoinableQueue实现多个消费都进程池间通信示例

from msilib import sequence
import multiprocessing


def consumer(input_q): 
    """ 消费者
    """
    while True:
        item = input_q.get()
        # 处理项
        print(f'处理input_q: {item}')
        # 发出信号通知任务完成
        input_q.task_done()


def producer(sequence, output_q):
    """ 生产者
    """
    for item in sequence:
        output_q.put(item)


if __name__ == '__main__':
    q = multiprocessing.JoinableQueue()

    # 多个进程运行消费者模型
    cons_p_1 = multiprocessing.Process(target=consumer, args=(q, ))
    # 进程1处理项
    cons_p_1.daemon = True
    cons_p_1.start()

    cons_p_2 = multiprocessing.Process(target=consumer, args=(q, ))
    # 进程2处理项
    cons_p_2.daemon = True
    cons_p_2.start()

    # 运行生产者
    sequence = (1,2,3,4)
    producer(sequence, q)

    # 等待所有项被处理
    q.join()


1.1.7 哨兵模式实现多进程间通信示例

from ast import arg
from audioop import mul
from msilib import sequence
import multiprocessing
import sqlite3
from tkinter.messagebox import NO


def consumer(input_q):
    while True:
        item = input_q.get()
        # 处理哨兵
        if item is None:
            break
        # 处理项
        print(f'处理input_q: {item}')
    # 关闭
    print('consumer 处理完成!')


def producer(sequence, output_q):
    for item in sequence:
        output_q.put(item)
    

if __name__ == '__main__':
    q = multiprocessing.Queue()

    # 启动消费者进程
    cons_p = multiprocessing.Process(target=consumer, args=(q, ))
    cons_p.start()

    # 生产者
    sequence = (1,2,3,4)
    producer(sequence, q)
    # 在队列上安置哨并
    q.put(None)
    # 等待消费者进程关闭
    cons_p.join()


注意: 如果使用哨兵,一定要在队列上为每个消费都都安置哨兵,才能保证所有消费者进程都能关闭。

1.2 管道实现进程间通信

1.2.1 管道类

说明

Pipe([duplex])

在进程之间创建一条管道,并返回元组(conn1, conn2),其中conn1和conn2表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex置为False,conn1只能用于接收,conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法

1.2.2 Pipe()返回的Connection对象实例c具有的方法和属性

方法

说明

c.close()

关闭连接。如果c被垃圾回收,将自动调用此方法

c.fileno()

返回连接使用的整数文件描述符

c.pull([timeout])

如果连接上的数据可用,返回True,timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达

c.recv()

接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常

c.recv_bytes([maxlength])

接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常

c.recv_bytes_into(buffer [, offset])

接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常

c.send(obj)

通过连接发送对象。obj是与序列化兼容的任意对象

c.send_bytes(buffer [, offset [, size]])

通过连接发送字节数据缓冲区。buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收

1.2.3 管道实现生产者消费都模型示例

import multiprocessing


def consumer(pipe):
    output_p, input_p = pipe
    input_p.close()  # 关闭管道的输入端
    while True:
        try:
            item = output_p.recv()
        except EOFError:
            break
        # TODO 可替换为有用的工作
        print(item)
    # 关闭
    print('consumer done.')


def producer(sequence, input_p):
    for item in sequence:
        input_p.send(item)


if __name__ == '__main__':
    # 创建管道
    (output_p, input_p) = multiprocessing.Pipe()

    # 启动消费者进程
    cons_p = multiprocessing.Process(target=consumer, args=((output_p, input_p),))
    cons_p.start()

    # 关闭生产者中的输出管道
    output_p.close()

    # 生产项
    sequence = (1,2,3,4)
    producer(sequence, input_p)

    # 半闭输入管道,表示输入完成
    input_p.close()

    # 等待消费者进程结束
    cons_p.join()


说明:如果生产者或者消费者中都没有使用管道的某个端点,就就将其关闭。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者中也关闭了相同的管道端点

1.2.4 管道双向通信示例

import multiprocessing
from unittest import result


def adder(pipe):
    server_p, client_p = pipe
    client_p.close()
    while True:
        try:
            x,y = server_p.recv()
        except EOFError:
            break
        result = x + y  # TODO 可用实例业务代替
        server_p.send(result)
    # 关闭
    print('Server done.')


if __name__ == '__main__':
    # 生成管道
    (server_p, client_p) = multiprocessing.Pipe()
    
    # 启动服务器进程
    adder_p = multiprocessing.Process(target=adder, args=((server_p, client_p),))
    adder_p.start()
    
    # 关闭客户端中的服务器管道
    server_p.close()
    
    # 客户端发出请求
    client_p.send((3,4))
    print(client_p.recv())

    client_p.send(('Hello', 'World'))
    print(client_p.recv())

    # 完成,关闭管道
    client_p.close()

    # 等待消费者进程完成
    adder_p.join()
展开阅读全文

页面更新:2024-04-25

标签:队列   管道   进程   生产者   示例   字节   异常   对象   消费者   通信   方法

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top