Skip to content

Commit

Permalink
Merge pull request openucx#32 from TomAugspurger/event-notif+collection
Browse files Browse the repository at this point in the history
Event notif+collection
  • Loading branch information
TomAugspurger authored Feb 18, 2019
2 parents c7ba7af + 0f4f83f commit d64ac17
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 193 deletions.
17 changes: 9 additions & 8 deletions benchmarks/lat-bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def run_iters(ep, first_buffer_region, second_buffer_region, msg_log, send_first
start = time.time()
issue_lat = 0
progress_lat = 0

if args.use_obj:
msg_len = sys.getsizeof(str(list(range(msg_len))))

Expand Down Expand Up @@ -185,7 +185,7 @@ async def run_iters_async(ep, first_buffer_region, second_buffer_region, msg_log
warmup_iters = int((0.1 * max_iters))
if args.use_obj:
msg_len = sys.getsizeof(str(list(range(msg_len))))

for j in range(warmup_iters):
first_req = await first_op(first_msg, msg_len)
second_req = await second_op(second_msg, msg_len)
Expand All @@ -200,7 +200,7 @@ async def run_iters_async(ep, first_buffer_region, second_buffer_region, msg_log
print("{}\t\t{:.2f}\t\t{:.2f}".format(msg_len, get_avg_us(lat),
((msg_len/(lat/2)) / 1000000)))

def talk_to_client(ep):
def talk_to_client(ep, listener):

global args
global cb_not_done
Expand All @@ -212,7 +212,7 @@ def talk_to_client(ep):

ucp.destroy_ep(ep)
cb_not_done = False
ucp.stop_listener()
ucp.stop_listener(listener)

def talk_to_server(ip, port):

Expand All @@ -227,7 +227,7 @@ def talk_to_server(ip, port):

ucp.destroy_ep(ep)

async def talk_to_client_async(ep):
async def talk_to_client_async(ep, listener):

global args
send_first = True
Expand All @@ -237,7 +237,7 @@ async def talk_to_client_async(ep):
free_mem(send_buffer_region, recv_buffer_region, args)

ucp.destroy_ep(ep)
ucp.stop_listener()
ucp.stop_listener(listener)

async def talk_to_server_async(ip, port):

Expand Down Expand Up @@ -280,7 +280,7 @@ async def talk_to_server_async(ip, port):
if server:
if args.intra_node:
ucp.set_cuda_dev(1)
ucp.start_listener(talk_to_client, is_coroutine = False)
ucp.start_listener(talk_to_client, is_coroutine=False)
while cb_not_done:
ucp.progress()
else:
Expand All @@ -290,7 +290,8 @@ async def talk_to_server_async(ip, port):
if server:
if args.intra_node:
ucp.set_cuda_dev(1)
coro = ucp.start_listener(talk_to_client_async, is_coroutine = True)
listener = ucp.start_listener(talk_to_client_async, is_coroutine=True)
coro = listener.coroutine
else:
coro = talk_to_server_async(init_str.encode(), int(args.port))

Expand Down
Loading

0 comments on commit d64ac17

Please sign in to comment.