进程间通信的几种方式

参考维基百科的分类,进程间通信主要有以下几种:

  • 文件(File)
  • 信号(Signal)
  • 套接字(Socket)
  • Unix 域套接字(Unix domain socket)
  • 消息队列(Message queue)
  • 匿名管道(Anonymous pipe)
  • 命名管道(Named pipe)
  • 共享内存(Shared memory)
  • 消息传递(Message passing)
  • 内存映射文件(Memory-mapped file)

看下使用 Python 怎么实现。

环境:Arch Linux + Python3.8

文件

符合直觉,数据存在硬盘上,可基于文本或二进制。

对于 pa.py

SHARED_FILE = '/tmp/shared_file'


def send():
    with open(SHARED_FILE, 'w', encoding='utf-8') as f:
        f.write('some data...')

对于 pb.py

def receive():
    with open(SHARED_FILE, encoding='utf-8') as f:
        print(f.read())

为了避免竞争,数据流向是单向的,可使用两个或以上的文件进行双向通信。

信号

通信偏向底层,且一般不用于传输数据。

Python 中有 os.kill(pid, sig) 以及 signal 模块。 os.kill 发送 sig 给进程 pid,一般会意外终止进程 pid

import os
import signal


os.kill(48523, signal.SIGKILL)

signal 模块的例子可以看 signal — Set handlers for asynchronous events 中给出的,简洁明了。

套接字

使用 socket 模块,C/S 形式的通信。

对于 pa.py

# Echo server program
import socket


HOST = '127.0.0.1'
PORT = 50007


def send():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((HOST, PORT))  # 绑定地址与端口
        s.listen(1)  # 监听的连接数
        conn, addr = s.accept()
        with conn:
            print('Connected by', addr)
            while True:
                data = conn.recv(1024)
                if not data:
                    break
                conn.sendall(data)

对于 pb.py

# Echo client program
import socket


HOST = '127.0.0.1'
PORT = 50007


def receive():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        s.sendall(b'Hello, world')
        data = s.recv(1024)
    print('Received', repr(data))

例子来自于官方文档中的 socket — Low-level networking interface,稍作修改。

在 Python3.8 中,新增了 socket.create_server 函数,合并了 bind 这一步。 对应的 socket.create_connection 函数用于创建连接。

Unix 域套接字

Unix 域套接字不经过网络,直接在内核中通信。

使用方法与 socket.AF_INETsocket.AF_INET6 相似, 只需把对应的 socket.AF_INETsocket.AF_INET6 改为 socket.AF_UNIX, 绑定或连接的地址由 (HOST, PORT) 变为本地文件,如 '/tmp/unix_socket'

消息队列

Python 标准库中,multiprocessing.Queue 只能用于有关系的进程中, 而借助第三方库的 ipcqueuesysv_ipc,可在独立的进程中通信。

ipcqueue 支持 POSIXSystem V 形式的消息队列。

看下 POSIX 标准的消息队列。

pa.py

from ipcqueue import posixmq


def send():
    q = posixmq.Queue('/foo')

    q.put([1, 'A'])
    q.put([2, 'B'], priority=2)  # priority 参数设置优先级
    q.put([3, 'C'], priority=0)

pb.py

from ipcqueue import posixmq


def receive():
    q = posixmq.Queue('/foo')

    print(q.get())
    print(q.get())
    print(q.get())

    q.close()
    q.unlink()

System V 消息队列稍有不同。

pa.py

from ipcqueue import sysvmq


def send():
    q = sysvmq.Queue(1)

    q.put([1, 'A'])
    q.put([2, 'B'], msg_type=2)
    q.put([3, 'C'], msg_type=2)
    q.put([4, 'D'], msg_type=1)

pb.py

from ipcqueue import sysvmq


def receive():
    q = sysvmq.Queue(1)

    print(q.get(msg_type=2))  # msg_type 参数设置消息类型
    print(q.get())
    print(q.get())
    print(q.get())

    q.close()

以上代码修改自 ipcqueue documentation

两者用法类似,但有一些不同,主要有:

  1. POSIX message queue 的标识符为字符,且必须以 / 开头; 而 System V message queue 标识符为整数,且不能为负数。 为 0 时,表示私有队列。
  2. POSIX message queue 可以设置优先线,读取消息按 FIFO 方式来读; System V message queue 无优先级概念,但可以设置消息类型, 并按消息类型以 FIFO 方式读取,而 POSIX message queue 不区分消息类型。

匿名管道

匿名管道用于父子进程间打开读写通道进行通信, 在大多数 Shell 中可以使用 | 来创建。

import os
import sys


def main():
    r, w = os.pipe()

    if os.fork():
        # Parent process receive data
        os.close(w)
        r = os.fdopen(r)
        print('Parent reading', r.read())
        r.close()
        sys.exit(0)
    else:
        # Child process write data
        os.close(r)
        w = os.fdopen(w, 'w')
        w.write('Hello, world')
        w.close()
        sys.exit(0)

要双向通信,可打开两条管道。multiprocessing.Pipe 可直接支持双工管道。

命名管道

不同于匿名管道,命名管道利用了文件系统,可用于两个无关的进程中进行通信。

pa.py

import os
import errno


FIFO = '/tmp/named_pipe'


def send():
    try:
        os.mkfifo(FIFO)
    except OSError as oe:
        if oe.errno != errno.EEXIST:
            raise oe

    with open(FIFO, 'w') as f:
        f.write('Hello, world')

pb.py

import os


FIFO = '/tmp/named_pipe'


def receive():
    try:
        os.mkfifo(FIFO)
    except OSError:
        pass

    with open(FIFO) as f:
        while True:
            data = f.read()
            if len(data):
                print(data)
            else:
                break

两者均是阻塞直到另一方打开,但可以使用 os.open 进行非阻塞通信。

代码修改自 Python read named PIPE,且仅适用于 Unix 平台。 Windows 系统需要借助 win32pipewin32file, 具体可以看 Named Pipes Communication between Python Server and Python Client on Window

共享内存

共享内存是可以被不同进程同时访问的内存空间,可跨不同进程通信。

Python3.8 中新增了 multiprocessing.shared_memory 模块以支持共享内存通信, 使用的是 System V 风格。之前提到的 sysv_ipc 也可以用于共享内存通信,同样是 System V 风格。

官方文档中使用 Numpy 演示 multiprocessing.shared_memory。 查看源代码,发现对象销毁会调用 self.close 关闭共享内存的访问, 所以必须保证两个进程活着才行。

sysv_ipc 除了消息队列和共享内存,还可使用信号量进行通信。

消息传递

消息传递是一项在计算机上激活动作(即运行程序)的技术。 与传统按名字调用程序的方式不同, 消息传递使用对象模型(object model)区别常规功能与特定实现。 被激活的程序发送一条消息,并依靠对象选择并执行对应的代码。 使用中间层的理由基本分为两类:封装和分发。

以上文字翻译自 Message passing,可能不太准确。

常见的消息传递系统有 RPC、RMI、MPI。

以 MPI 为例子。Python 中实现包括 pyMPI、mpi4py、pypar、MYMPI 和 ScientificPython 的子模块 MPI。这里使用 mpi4py 模块。

import numpy
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

rand_num = numpy.zeros(1)

if rank == 1:
    rand_num = numpy.random.random_sample(1)
    print("Process", rank, "drew the number", rand_num[0])
    comm.Send(rand_num, dest=0)

if rank == 0:
    print("Process", rank, "before receiving has the number", rand_num[0])
    comm.Recv(rand_num, source=1)
    print("Process", rank, "received the number", rand_num[0])

将代码保存为 mpi.py,执行命令 mpiexec -n 2 python mpi.py-n 选项指定进程数。同时启动一组进程, 每个进程都有一个唯一的编号(在这是 rank), 根据不同的编号,程序执行不同的代码。 不同进程可以通过 SendRecv 进行通信。

代码参考自 Python MPI: Message Passing

内存映射文件

与共享内存类似,但将内存映射为文件。直接看看怎么用。

这里直接使用标准库中的 mmap 模块。

pa.py

import mmap
import os


path = '/tmp/mmapfile'


def send():
    fd = os.open(path, os.O_CREAT | os.O_TRUNC | os.O_RDWR)
    data = b'Hello, world'
    os.write(fd, b'\x00' * len(data))

    mm = mmap.mmap(fd, 0)
    mm[:] = data

pb.py

import mmap
import os


path = '/tmp/mmapfile'


def receive():
    fd = os.open(path, os.O_RDONLY)
    mm = mmap.mmap(fd, 0, prot=mmap.PROT_READ)
    print(mm[:])

文件描述符为 -1 时,表示映射匿名内存。

Windows 有一点不同,还可直接使用 mmap.mmap(0, length, tagname)进行通信。

更多参考