Skip to content

Commit

Permalink
Upload fix (#769)
Browse files Browse the repository at this point in the history
* upload improvements:
- limit pages and bookmarks to 10000
- add settings to limit bookmarks and pages separately
- include page and bookmark creation in progress bar, last 80-90% for page indexing, and 90-100% for bookmark creation.
- optimize: use zscan_iter() for iterating over pages, add polyfill for fakeredis to still use zrange
- fix tests
- bump version to 4.8.4

previously, page/bookmark creation was taking a long time but not included in progress update
should fix #768, likely webrecorder/webrecorder-player#87, webrecorder/webrecorder-player#78, webrecorder/webrecorder-player#86
  • Loading branch information
ikreymer authored Nov 9, 2019
1 parent 245a297 commit fd448c1
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion webrecorder/test/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def test_upload_3_x_warc(self):
assert res.json['coll_title'] == 'Temporary Collection'
assert res.json['filename'] == 'example2.warc.gz'
assert res.json['files'] == 1
assert res.json['total_size'] == 5192
assert res.json['total_size'] == 6490
assert res.json['done'] == False

def assert_finished():
Expand Down
2 changes: 1 addition & 1 deletion webrecorder/webrecorder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = '4.8.3'
__version__ = '4.8.4'

2 changes: 0 additions & 2 deletions webrecorder/webrecorder/config/standalone_player.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ init_import_coll: 'collection'
cdxj_key_templ: 'c:{coll}:cdxj'
coll_cdxj_ttl: -1

max_detect_pages: 0

upload_coll:
id: 'collection'
title: 'Web Archive Collection'
Expand Down
3 changes: 2 additions & 1 deletion webrecorder/webrecorder/config/wr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ skip_key_secs: 330
open_rec_ttl: 5400
max_warc_size: 500000000

max_detect_pages: 0
max_detect_pages: 10000
max_auto_bookmarks: 10000

assets_path: ./webrecorder/config/assets.yaml

Expand Down
38 changes: 28 additions & 10 deletions webrecorder/webrecorder/models/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self, redis, config, wam_loader=None):
self.detect_list_info = config['page_detect_list']

self.max_detect_pages = config['max_detect_pages']
self.max_auto_bookmarks = config['max_auto_bookmarks']

def handle_upload(self, stream, upload_id, upload_key, infos, filename,
user, force_coll_name, total_size):
Expand Down Expand Up @@ -195,7 +196,7 @@ def _init_upload_status(self, user, total_size, num_files, filename=None, expire

with redis_pipeline(self.redis) as pi:
pi.hset(upload_key, 'size', 0)
pi.hset(upload_key, 'total_size', total_size * 2)
pi.hset(upload_key, 'total_size', int(total_size * 2.5))
pi.hset(upload_key, 'total_files', num_files)
pi.hset(upload_key, 'files', num_files)

Expand Down Expand Up @@ -240,8 +241,7 @@ def run_upload(self, upload_key, filename, stream, user, rec_infos, total_size,
else:
logger.debug('SKIP upload for zero-length recording')


self.process_pages(info, page_id_map)
self.process_pages(info, page_id_map, upload_key, total_size)

diff = info['offset'] - last_end
last_end = info['offset'] + info['length']
Expand Down Expand Up @@ -280,13 +280,13 @@ def run_upload(self, upload_key, filename, stream, user, rec_infos, total_size,
first_coll.sync_coll_index(exists=False, do_async=False)
first_coll.set_external_remove_on_expire()

def process_pages(self, info, page_id_map):
def process_pages(self, info, page_id_map, upload_key, total_size):
pages = info.get('pages')

# detect pages if none
detected = False
if pages is None:
pages = self.detect_pages(info['coll'], info['rec'])
pages = self.detect_pages(info['coll'], info['rec'], upload_key, total_size)
detected = True

# if no pages, nothing more to do
Expand All @@ -303,9 +303,16 @@ def process_pages(self, info, page_id_map):
if detected:
blist = info['collection'].create_bookmark_list(self.detect_list_info)

# if set, further limit number of automatic bookmarks
if self.max_auto_bookmarks:
pages = pages[:self.max_auto_bookmarks]

incr = int((total_size * 0.25) / len(pages))

for page in pages:
page['page_id'] = page['id']
bookmark = blist.create_bookmark(page, incr_stats=False)
self.redis.hincrby(upload_key, 'size', incr)

def har2warc(self, filename, stream):
"""Convert HTTP Archive format file to WARC archive.
Expand Down Expand Up @@ -437,7 +444,7 @@ def import_lists(self, collection, page_id_map):
bookmark_data['page_id'] = page_id_map.get(page_id)
bookmark = blist.create_bookmark(bookmark_data, incr_stats=False)

def detect_pages(self, coll, rec):
def detect_pages(self, coll, rec, upload_key, total_size):
"""Find pages in recording.
:param str coll: collection ID
Expand All @@ -449,17 +456,28 @@ def detect_pages(self, coll, rec):
key = self.cdxj_key.format(coll=coll, rec=rec)

pages = []
count = 0

total_cdx = self.redis.zcard(key)

#for member, score in self.redis.zscan_iter(key):
for member in self.redis.zrange(key, 0, -1):
incr = int((total_size * 0.25) / total_cdx)
count = 0

for member, score in self.redis.zscan_iter(key, match='*', count=100):
cdxj = CDXObject(member.encode('utf-8'))

if ((not self.max_detect_pages or len(pages) < self.max_detect_pages)
and self.is_page(cdxj)):
count += 1
self.redis.hincrby(upload_key, 'size', incr)

if self.is_page(cdxj):
pages.append(dict(url=cdxj['url'],
title=cdxj['url'],
timestamp=cdxj['timestamp']))

if self.max_detect_pages and len(pages) > self.max_detect_pages:
self.redis.hincrby(upload_key, 'size', incr * (total_cdx - count))
break

return pages

def is_page(self, cdxj):
Expand Down
11 changes: 10 additions & 1 deletion webrecorder/webrecorder/standalone/webrecorder_player.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def close(self):
super(WebrecPlayerRunner, self).close()

def _patch_redis(self):
redis.StrictRedis = fakeredis.FakeStrictRedis
redis.StrictRedis = FakeStrictRedis

if not self.cache_dir:
return
Expand Down Expand Up @@ -233,6 +233,15 @@ def add_args(cls, parser):
help='Writable directory to cache state (including CDXJ index) to avoid reindexing on load')


# ============================================================================
class FakeStrictRedis(fakeredis.FakeStrictRedis):
# not supported by this version of fakeredis, so just emulate with zrange
def zscan_iter(self, name, match=None, count=None):
data = self.zrange(name, 0, -1)
for item in data:
yield item, 0


# ============================================================================
webrecorder_player = WebrecPlayerRunner.main

Expand Down

0 comments on commit fd448c1

Please sign in to comment.