forked from shawwn/CLIP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_fetch.py
92 lines (82 loc) · 3.29 KB
/
test_fetch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# https://stackoverflow.com/questions/48483348/how-to-limit-concurrency-with-python-asyncio
from random import randint
import asyncio
import tqdm
import contextlib
from clip import utils
import httpx
import asyncio
import traceback
from io import BytesIO
import PIL.Image
sem = asyncio.BoundedSemaphore(64)
async def data_to_url_async(path):
async with sem:
data = filebytes(path)
orig = data
data = data_to_upload_image_args(data)
async with httpx.AsyncClient() as client:
try:
response = await client.post('https://staging.gather.town/api/uploadImage',
headers={'Content-Type': 'application/json'},
data=data,
timeout=httpx.Timeout(timeout=None))
if response.status_code == 200:
url = response.text
print(json_response(True, url, data=orig, path=path))
sys.stdout.flush()
else:
response.raise_for_status()
except Exception as caught:
report_error(caught, data=orig, path=path, code=response.status_code)
async def process(callback, url, pbar, fake=False):
if fake:
wait_time = randint(1, 2)
pbar.write('downloading {} will take {} second(s)'.format(url, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
pbar.write('downloaded {}'.format(url))
else:
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, timeout=httpx.Timeout(timeout=None))
if response.status_code == 200:
await callback(None, response)
else:
await response.raise_for_status()
except Exception as caught:
#traceback.print_exc()
await callback(caught, response)
#report_error(caught, data=orig, path=path, code=response.status_code)
async def main(loop, url, concurrency=100):
with utils.LineStream() as stream:
received_bytes = 0
received_count = 0
failed_count = 0
dltasks = set()
async def callback(err, response):
nonlocal received_bytes, received_count, failed_count
if err is not None:
stream.pbar.write('{!r}: {!r}'.format(str(response.url), response))
failed_count += 1
return
received_count += 1
received_bytes += len(response.content)
image_bytes = response.content
image = PIL.Image.open(BytesIO(image_bytes))
url = str(response.url)
stream.pbar.write('Received {size} bytes: {url!r} {image!r}'.format(size=len(response.content), url=url, image=image))
for i, line in enumerate(stream(url)):
n = len(dltasks)
stream.pbar.set_description('%d in-flight / %d finished (%d failed) / %.2f MB [%s]' % (n, received_count, failed_count, received_bytes / (1024*1024), line.rsplit('/', 1)[-1]))
if len(dltasks) >= concurrency:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(
dltasks, return_when=asyncio.FIRST_COMPLETED)
task = process(callback, line, pbar=stream.pbar)
dltasks.add(loop.create_task(task))
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
if __name__ == '__main__':
url = 'https://battle.shawwn.com/danbooru2019-s.txt'
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop, url))