python提供的并发工具相对比较底层,达不到模型的层次,所谓的并发模型是对
并发和并行常常被混淆,他们虽然有交集但其实指的是完全不同的两个事儿.
-
并发程序含有多个逻辑上的独立执行块,它们可以独立地并行执行,也可以串行执行
-
并行程序解决问题的速度往往比串行程序快得多,因为其可以同时执行整个任务的多个部分.并行程序可能有多个独立执行块,也可能仅有一个
我们还可以从另一种角度来看待并发和并行之间的差异:
- 并发是问题域中的概念——程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件
- 而并行则是方法域中的概念——通过将问题中的多个部分并行执行,从而加速解决问题.
引用Rob Pike的经典描述:
并发是同一时间应对(dealing with)多件事情的能力;并行是同一时间动手做(doing)多件事情的能力.
本文基于协程讲解并发模型.
import asyncio
from asyncio import Queue
class ActorExit(Exception):
pass
class Actor:
def __init__(self):# ,loop=None):
self.inbox = Queue()
self.running = False
def send_nowait(self, msg):
'''
Send a message to the actor
'''
self.inbox.put_nowait(msg)
async def send(self, msg):
'''
Send a message to the actor
'''
await self.inbox.put(msg)
async def close(self):
await self.send(ActorExit)
def close_nowait(self):
self.send_nowait(ActorExit)
async def handle_timeout(self):
pass
async def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()
async def run(self):
self.running = True
while self.running:
try:
message = await self.inbox.get()
except asyncio.TimeoutError:
await self.handle_timeout()
else:
if message is ActorExit:
print("actor closed")
return
else:
await self.receive(message)
class Pinger(Actor):
async def receive(self, message):
print(message)
await pong.send('ping')
await asyncio.sleep(0.5)
class Ponger(Actor):
async def receive(self, message):
print(message)
await ping.send('pong')
await asyncio.sleep(0.5)
async def sleep10():
await asyncio.sleep(3)
await ping.close()
await pong.close()
ping = Pinger()#(loop=loop)
pong = Ponger()#(loop=loop)
ping.send_nowait('start')
for coro in asyncio.as_completed((ping.run(), pong.run(),sleep10())):
earliest_result = await coro
start
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
actor closed
actor closed
要实现发布/订阅
的消息通信模式,你通常要引入一个单独的"交换机"或"网关"对象作为所有消息的中介.也就是说不直接将消息从一个任务发送到另一个,而是将其发送给一个频道,然后由频道将它发送给一个或多个被关联任务.下面是一个非常简单的频道实现例子:
from collections import defaultdict
from contextlib import contextmanager
class Exchange:
def __init__(self):
self._subscribers = set()
def attach(self, task):
self._subscribers.add(task)
def detach(self, task):
self._subscribers.remove(task)
@contextmanager
def subscribe(self, *tasks):
for task in tasks:
self.attach(task)
try:
yield self
finally:
for task in tasks:
self.detach(task)
async def send(self, msg):
for subscriber in self._subscribers:
await subscriber.send(msg)
_exchanges = defaultdict(Exchange)
def get_exchange(name):
return _exchanges[name]
一个频道就是一个普通对象,负责维护一个活跃的订阅者集合并为绑定,解绑和发送消息提供相应的方法. 每个交换机通过一个名称定位,get_exchange()
通过给定一个名称返回相应的Exchange
实例.
class DisplayMessages:
def __init__(self):
self.count = 0
async def send(self, msg):
self.count += 1
print('msg[{}]: {!r}'.format(self.count, msg))
# Return the Exchange instance associated with a given name
def get_exchange(name):
return _exchanges[name]
# Example of using the subscribe() method
async def main():
exc = get_exchange('name')
task_a, task_b = DisplayMessages(),DisplayMessages()
with exc.subscribe(task_a, task_b) as ex:
await ex.send('msg1')
await ex.send('msg2')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
loop.close()
msg[1]: 'msg1'
msg[1]: 'msg1'
msg[2]: 'msg2'
msg[2]: 'msg2'
from itertools import count
c = count()
next(c)
0
next(c)
1