Skip to content

Commit

Permalink
Merge pull request #5 from boegel/backup_modules
Browse files Browse the repository at this point in the history
reimplement find_backup_name_candidate to use timestamp rather than increasing number
  • Loading branch information
damianam authored Aug 17, 2017
2 parents 03bad2a + 04dc98f commit 9f4d813
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 44 deletions.
12 changes: 8 additions & 4 deletions easybuild/tools/filetools.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
:author: Davide Vanzo (ACCRE, Vanderbilt University)
:author: Damian Alvarez (Forschungszentrum Juelich GmbH)
"""
import datetime
import fileinput
import glob
import hashlib
Expand Down Expand Up @@ -1166,12 +1167,15 @@ def rmtree2(path, n=3):

def find_backup_name_candidate(src_file):
"""Returns a non-existing file to be used as destination for backup files"""
dst_file = src_file

i = 0
# e.g. 20170817234510 on Aug 17th 2017 at 23:45:10
timestamp = datetime.datetime.now()
dst_file = '%s_%s' % (src_file, timestamp.strftime('%Y%m%d%H%M%S'))
while os.path.exists(dst_file):
dst_file = '%s_%d' % (src_file, i)
i += 1
_log.debug("Backup of %s at %s already found at %s, trying again in a second...", src_file, timestamp)
time.sleep(1)
timestamp = datetime.datetime.now()
dst_file = '%s_%s' % (src_file, timestamp.strftime('%Y%m%d%H%M%S'))

return dst_file

Expand Down
109 changes: 69 additions & 40 deletions test/framework/filetools.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@author: Stijn De Weirdt (Ghent University)
@author: Ward Poelmans (Ghent University)
"""
import datetime
import os
import re
import shutil
Expand Down Expand Up @@ -500,71 +501,99 @@ def test_back_up_file(self):
txt = 'foobar'
ft.write_file(fp, txt)

test_files = ['test.txt']
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
known_files = ['test.txt']
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), known_files)

# Test simple file backup
ft.back_up_file(fp)
test_files.append('test.txt_0')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), 'test.txt_0')), txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 2)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('test.txt_'))
first_normal_backup = os.path.join(os.path.dirname(fp), new_file)
known_files = os.listdir(os.path.dirname(fp))
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), txt)
self.assertEqual(ft.read_file(fp), txt)

# Test hidden simple file backup
ft.back_up_file(fp, hidden=True)
test_files.insert(0, '.test.txt')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), '.test.txt')), txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 3)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('.test.txt_'))
first_hidden_backup = os.path.join(os.path.dirname(fp), new_file)
known_files = os.listdir(os.path.dirname(fp))
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), txt)
self.assertEqual(ft.read_file(fp), txt)

# Test simple file backup with extension
ft.back_up_file(fp, backup_extension='bck')
test_files.insert(2, 'test.txt.bck')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), 'test.txt.bck')), txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 4)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('test.txt.bck_'))
first_bck_backup = os.path.join(os.path.dirname(fp), new_file)
known_files = os.listdir(os.path.dirname(fp))
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), txt)
self.assertEqual(ft.read_file(fp), txt)

# Test hidden simple file backup with extension
ft.back_up_file(fp, backup_extension='bck', hidden=True)
test_files.insert(1, '.test.txt.bck')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), '.test.txt.bck')), txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 5)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('.test.txt.bck_'))
first_hidden_bck_backup = os.path.join(os.path.dirname(fp), new_file)
known_files = os.listdir(os.path.dirname(fp))
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), txt)
self.assertEqual(ft.read_file(fp), txt)

new_txt = 'barfoo'
ft.write_file(fp, new_txt)
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(len(os.listdir(os.path.dirname(fp))), 5)

# Test file backup with existing backup
ft.back_up_file(fp)
test_files.append('test.txt_1')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), 'test.txt_0')), txt)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), 'test.txt_1')), new_txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 6)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('test.txt_'))
known_files = os.listdir(os.path.dirname(fp))
self.assertTrue(ft.read_file(first_normal_backup), txt)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), new_txt)
self.assertEqual(ft.read_file(fp), new_txt)

# Test hidden file backup with existing backup
ft.back_up_file(fp, hidden=True)
test_files.insert(2, '.test.txt_0')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), '.test.txt')), txt)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), '.test.txt_0')), new_txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 7)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('.test.txt_'))
known_files = os.listdir(os.path.dirname(fp))
self.assertTrue(ft.read_file(first_hidden_backup), txt)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), new_txt)
self.assertEqual(ft.read_file(fp), new_txt)

# Test file backup with extension and existing backup
ft.back_up_file(fp, backup_extension='bck')
test_files.insert(5, 'test.txt.bck_0')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(fp+'.bck_0'), new_txt)
self.assertEqual(ft.read_file(fp+'.bck'), txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 8)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('test.txt.bck_'))
known_files = os.listdir(os.path.dirname(fp))
self.assertTrue(ft.read_file(first_bck_backup), txt)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), new_txt)
self.assertEqual(ft.read_file(fp), new_txt)

# Test hidden file backup with extension and existing backup
ft.back_up_file(fp, backup_extension='bck', hidden=True)
test_files.insert(2, '.test.txt.bck_0')
self.assertEqual(sorted(os.listdir(os.path.dirname(fp))), test_files)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), '.test.txt.bck')), txt)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), '.test.txt.bck_0')), new_txt)
test_files = os.listdir(os.path.dirname(fp))
self.assertEqual(len(test_files), 9)
new_file = [x for x in test_files if x not in known_files][0]
self.assertTrue(new_file.startswith('.test.txt.bck_'))
known_files = os.listdir(os.path.dirname(fp))
self.assertTrue(ft.read_file(first_hidden_bck_backup), txt)
self.assertEqual(ft.read_file(os.path.join(os.path.dirname(fp), new_file)), new_txt)
self.assertEqual(ft.read_file(fp), new_txt)

def test_move_logs(self):
Expand Down Expand Up @@ -1360,22 +1389,22 @@ def test_find_backup_name_candidate(self):
test_file = os.path.join(self.test_prefix, 'test.txt')
ft.write_file(test_file, 'foo')

res = ft.find_backup_name_candidate(test_file)
self.assertTrue(os.path.samefile(os.path.dirname(res), self.test_prefix))
self.assertEqual(os.path.basename(res), 'test.txt_0')
# timestamp should be exactly 14 digits (year, month, day, hours, minutes, seconds)
regex = re.compile('^test\.txt_[0-9]{14}$')

ft.write_file(os.path.join(self.test_prefix, 'test.txt_0'), '')
res = ft.find_backup_name_candidate(test_file)
self.assertTrue(os.path.samefile(os.path.dirname(res), self.test_prefix))
self.assertEqual(os.path.basename(res), 'test.txt_1')
fn = os.path.basename(res)
self.assertTrue(regex.match(fn), "'%s' matches pattern '%s'" % (fn, regex.pattern))

# generate backup files until test.txt_122
for idx in range(1, 123):
ft.write_file(os.path.join(self.test_prefix, 'test.txt_%d' % idx), '')
# create expected next backup location to (try and) see if it's handled well
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ft.write_file(os.path.join(self.test_prefix, 'test.txt_%s' % timestamp), '')

res = ft.find_backup_name_candidate(test_file)
self.assertTrue(os.path.samefile(os.path.dirname(res), self.test_prefix))
self.assertEqual(os.path.basename(res), 'test.txt_123')
fn = os.path.basename(res)
self.assertTrue(regex.match(fn), "'%s' matches pattern '%s'" % (fn, regex.pattern))


def suite():
Expand Down

0 comments on commit 9f4d813

Please sign in to comment.