什么是进程的通信
这里举一个通信机制的例子:我们都很熟悉通信这个词,比如一个人想给他的女友打电话。一旦通话建立,便会形成一个隐式的队列(请注意这个术语)。此时这个人就会通过对话的方式不停的将信息告诉女友,而这个人的女友也是在倾听着。我认为在大多数情况下,情况可能是倒过来的。
这里可以将他们两个人比作是两个进程,"这个人"的进程需要将信息发送给"女友"的进程,就需要一个队列的帮助。由于女友需要时刻接收队列中的信息,因此她可以同时进行其他事情,这意味着两个进程之间的通信主要依赖于队列。
这个队列可以支持发送消息与接收消息,“这个人"负责发送消息,反之"女友” 负责的是接收消息。
既然队列才是重点,那么来看一下队列要如何创建。
队列的创建 - multiprocessing
依然使用 multiprocessing 模块,调用该模块的 Queue 函数来实现队列的创建。
函数名 | 介绍 | 参数 | 返回值 |
---|
Queue | 队列的创建 | mac_count | 队列对象 |
Queue 函数功能介绍:调用 Queue 可以创建队列;它有一个参数 mac_count 代表队列最大可以创建多少信息,如果不传默认是无限长度。实例化一个队列对象之后,需要操作这个队列的对象进行放入与取出数据。
进程之间通信的方法
函数名 | 介绍 | 参数 | 返回值 |
---|
put | 将消息放入队列 | message | 无 |
get | 获取队列消息 | 无 | str |
put 函数功能介绍:将数据传入。它有一个参数 message ,是一个字符串类型。
get 函数功能介绍:用来接收队列中的数据。(其实这里就是一个常用的json场景,有很多的数据传输都是 字符串 的,队列的插入与获取就是使用的字符串,所以 json 就非常适用这个场景。)
接下来就来练习一下 队列的使用 。
进程间的通信 - 队列演示案例
代码示例如下:
# coding:utf-8
import json
import multiprocessing
class Work(object): # 定义一个 Work 类
def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue
self.queue = queue
def send(self, message): # 定义一个 send(发送) 函数,传入 message
# [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
message = json.dumps(message)
self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去
def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
while 1:
result = self.queue.get() # 获取 '队列对象' --> queue 传入的message
# 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
res = json.loads(result)
except:
res = result
print('接收到的信息为:{}'.format(res))
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
receive = multiprocessing.Process(target=work.receive)
send.start()
receive.start()
使用队列建立进程间通信遇到的异常
但是这里会出现一个 报错,如下图:
报错截图示例如下:

这里的报错提示是 文件没有被发现的意思 。其实这里是我们使用 队列做 put() 和 get()的时候 有一把无形的锁加了上去,就是上图中圈中的 .SemLock 。我们不需要去关心造成这个错误的具体原因,要解决这个问题其实也很简单。
FileNotFoundError: [Errno 2] No such file or directory 异常的解决
需要阻塞进程的只是 send 或 receive 子进程中的一个,只要阻塞其中一个即可,这是理论上的情况。但是我们的 receive子进程是一个 while循环,它会一直执行,所以只需要给 send 子进程加上一个 join 即可。
解决示意图如下:

PS:虽然解决了报错问题,但是程序没有正常退出。
实际上由于我们的 receive 进程是个 while循环,并不知道要处理到什么时候,没有办法立刻终止。所以我们需要在 receive 进程 使用 terminate() 函数终结接收端。
运行结果如下:

批量给 send 函数加入数据
新建一个函数,写入 for循环 模拟批量添加要发送的消息
然后再给这个模拟批量发送数据的函数添加一个线程。
示例代码如下:
# coding:utf-8
import json
import time
import multiprocessing
class Work(object): # 定义一个 Work 类
def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue
self.queue = queue
def send(self, message): # 定义一个 send(发送) 函数,传入 message
# [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
message = json.dumps(message)
self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去
def send_all(self): # 定义一个 send_all(发送)函数,然后通过for循环模拟批量发送的 message
for i in range(20):
self.queue.put('第 {} 次循环,发送的消息为:{}'.format(i, i))
time.sleep(1)
def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
while 1:
result = self.queue.get() # 获取 '队列对象' --> queue 传入的message
# 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
res = json.loads(result)
except:
res = result
print('接收到的信息为:{}'.format(res))
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
receive = multiprocessing.Process(target=work.receive)
send_all = multiprocessing.Process(target=work.send_all,)
send_all.start() # 这里因为 send 只执行了1次,然后就结束了。而 send_all 却要循环20次,它的执行时间是最长的,信息也是发送的最多的
send.start()
receive.start()
# send.join() # 使用 send 的阻塞会造成 send_all 循环还未结束 ,receive.terminate() 函数接收端就会终结。
send_all.join() # 所以我们只需要阻塞最长使用率的进程就可以了
receive.terminate()
运行结果如下:

从上图中我们可以看到 send 与 send_all 两个进程都可以通过 queue这个实例化的 Queue 对象发送消息,同样的 receive接收函数也会将两个进程传入的 message 打印输出出来。
小节
在这一章节,我们成功运用队列实现了跨进程通信,同时也掌
.........................................................