Skip to content

Commit

Permalink
Added files
Browse files Browse the repository at this point in the history
  • Loading branch information
dabeaz committed Apr 10, 2015
1 parent 6ab35ee commit 7f27427
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 2 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
# concurrencylive
Code from Concurrency Live - PyCon 2015
Concurrency Live - PyCon 2015
=============================

This is the live-demo code written during my talk "Python Concurrency
Grom The Ground Up: Live", presented at PyCon 2015, April 10, 2015.

100 changes: 100 additions & 0 deletions aserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# server.py
# Fib microservice

from socket import *
from fib import fib
from collections import deque
from select import select
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import ProcessPoolExecutor as Pool

pool = Pool(4)

tasks = deque()
recv_wait = { } # Mapping sockets -> tasks (generators)
send_wait = { }
future_wait = { }

future_notify, future_event = socketpair()

def future_done(future):
tasks.append(future_wait.pop(future))
future_notify.send(b'x')

def future_monitor():
while True:
yield 'recv', future_event
future_event.recv(100)

tasks.append(future_monitor())

def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# No active tasks to run
# wait for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))


task = tasks.popleft()
try:
why, what = next(task) # Run to the yield
if why == 'recv':
# Must go wait somewhere
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
elif why == 'future':
future_wait[what] = task
what.add_done_callback(future_done)

else:
raise RuntimeError("ARG!")
except StopIteration:
print("task done")

class AsyncSocket(object):
def __init__(self, sock):
self.sock = sock
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def accept(self):
yield 'recv', self.sock
client, addr = self.sock.accept()
return AsyncSocket(client), addr
def __getattr__(self, name):
return getattr(self.sock, name)

def fib_server(address):
sock = AsyncSocket(socket(AF_INET, SOCK_STREAM))
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = yield from sock.accept() # blocking
print("Connection", addr)
tasks.append(fib_handler(client))

def fib_handler(client):
while True:
req = yield from client.recv(100) # blocking
if not req:
break
n = int(req)
future = pool.submit(fib, n)
yield 'future', future
result = future.result() # Blocks
resp = str(result).encode('ascii') + b'\n'
yield from client.send(resp) # blocking
print("Closed")

tasks.append(fib_server(('',25000)))
run()
6 changes: 6 additions & 0 deletions fib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)

17 changes: 17 additions & 0 deletions perf1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# perf1.py
# Time of a long running request

from socket import *
import time

sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 25000))

while True:
start = time.time()
sock.send(b'30')
resp =sock.recv(100)
end = time.time()
print(end-start)


27 changes: 27 additions & 0 deletions perf2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# perf2.py
# requests/sec of fast requests

from socket import *
import time

sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 25000))

n = 0

from threading import Thread
def monitor():
global n
while True:
time.sleep(1)
print(n, 'reqs/sec')
n = 0
Thread(target=monitor).start()

while True:
sock.send(b'1')
resp =sock.recv(100)
n += 1



33 changes: 33 additions & 0 deletions server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# server.py
# Fib microservice

from socket import *
from fib import fib
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool

pool = Pool(4)

def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print("Connection", addr)
Thread(target=fib_handler, args=(client,), daemon=True).start()

def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
future = pool.submit(fib, n)
result = future.result()
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Closed")

fib_server(('',25000))

0 comments on commit 7f27427

Please sign in to comment.