Skip to content

Commit

Permalink
bigtable-backup-tool: Improvements (#895)
Browse files Browse the repository at this point in the history
* improvements in bigtable backup tool

Added some metrics for monitoring of backup jobs
Added a command to delete out of range backups, otherwise, there would be more backups than expected and alerts would start firing for unexpected backups
  • Loading branch information
sandeepsukhani authored Aug 20, 2019
1 parent 23f998a commit e65fd2e
Showing 1 changed file with 71 additions and 22 deletions.
93 changes: 71 additions & 22 deletions tools/bigtable-backup/bigtable-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

registry = CollectorRegistry()
bigtable_backup_job_last_run_seconds = Gauge('bigtable_backup_job_last_run_seconds', 'Last time a bigtable backup job ran at', registry=registry)
bigtable_backup_job_last_success_seconds = Gauge('bigtable_backup_job_last_success_seconds', 'Last time a bigtable backup job successfully finished', registry=registry)
bigtable_backup_job_runtime_seconds = Gauge('bigtable_backup_job_runtime_seconds', 'Runtime of last successfully finished bigtable backup job', registry=registry)
bigtable_backup_job_backups_created = Gauge('bigtable_backup_job_backups_created', 'Number of backups created during last run', registry=registry)
bigtable_backup_job_last_run_seconds = Gauge('bigtable_backup_job_last_run_seconds', 'Last time a bigtable backup job ran at.', registry=registry)
bigtable_backup_job_last_success_seconds = Gauge('bigtable_backup_job_last_success_seconds', 'Last time a bigtable backup job successfully finished.', registry=registry)
bigtable_backup_job_runtime_seconds = Gauge('bigtable_backup_job_runtime_seconds', 'Runtime of last successfully finished bigtable backup job.', registry=registry)
bigtable_backup_job_backups_created = Gauge('bigtable_backup_job_backups_created', 'Number of backups created during last run.', registry=registry)
bigtable_backup_job_last_active_table_backup_time_seconds = Gauge('bigtable_backup_job_last_active_table_backup_time_seconds', 'Last time an active table was backed up at.', registry=registry)

job_backup_active_periodic_table = "backup-active-periodic-table"
job_ensure_backups = "ensure-backups"
Expand All @@ -27,32 +28,18 @@ def backup_active_periodic_table(args):
table_id = args.bigtable_table_id_prefix + str(int(time.time() / args.periodic_table_duration))
create_backup(table_id, args)

bigtable_backup_job_last_active_table_backup_time_seconds.set_to_current_time()
push_job_finished_metric(args.prom_push_gateway_endpoint, args.namespace, job_backup_active_periodic_table, int(time.time() - start_time))


def ensure_backups(args):
push_job_started_metric(args.prom_push_gateway_endpoint, args.namespace, job_ensure_backups)
start_time = time.time()

# ensure-backups job specific metrics
bigtable_backup_job_num_tables_backed_up = Gauge('bigtable_backup_job_num_tables_backed_up', 'Number of table backups found during last run', registry=registry)
bigtable_backup_job_num_backup_ups = Gauge('bigtable_backup_job_num_backup_ups', 'Sum of number of backups per table found during last run', registry=registry)

# Read all the existing backups
popen = subprocess.Popen(['bigtable-backup', 'list-backups', '-ojson', '--backup-path', args.destination_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen.wait()

# build and push metrics related to existing backups
backups = json.loads(popen.stdout.readline())
if (args.duration == None and args.period_from == None) or (args.duration != None and args.period_from != None):
raise ValueError("Either of --duration or --periodic-table-duration must be set")

bigtable_backup_job_num_tables_backed_up.set(len(backups))
for __, timestamps in backups.items():
bigtable_backup_job_num_backup_ups.inc(len(timestamps))

push_metrics(args.prom_push_gateway_endpoint, args.namespace, job_ensure_backups)
backups = list_backups(args.destination_path)

if args.period_from == None:
period_from = datetime.utcnow() - timedelta(days=args.duration)
Expand All @@ -61,7 +48,7 @@ def ensure_backups(args):

oldest_table_number = int(args.period_from.timestamp() / args.periodic_table_duration)
newest_table_number = int(args.period_to.timestamp() / args.periodic_table_duration)
active_table_number = time.time() / args.periodic_table_duration
active_table_number = int(time.time() / args.periodic_table_duration)

print("Checking right backups exist")
while oldest_table_number <= newest_table_number:
Expand All @@ -71,6 +58,10 @@ def ensure_backups(args):
print("backup for {} not found".format(table_id))
create_backup(table_id, args)
bigtable_backup_job_backups_created.inc(1)
if table_id == active_table_number:
bigtable_backup_job_last_active_table_backup_time_seconds.set_to_current_time()

num_backups_deleted = 0

print("Checking whether all the backups are created after their period is over and deleting old unwanted backups")
for table_id, timestamps in backups.items():
Expand All @@ -86,12 +77,58 @@ def ensure_backups(args):
if table_number != active_table_number and len(timestamps) > 1:
for timestamp in timestamps[:-1]:
delete_backup(table_id, str(timestamp), args)
num_backups_deleted += 1

if args.delete_out_of_range_backups:
num_backups_deleted += delete_out_of_range_backups(oldest_table_number, newest_table_number, backups, args)

set_ensure_backups_specific_metrics(args, num_backups_deleted, active_table_number)
push_job_finished_metric(args.prom_push_gateway_endpoint, args.namespace, job_ensure_backups, int(time.time() - start_time))

def find_last_timestamp_from_table_number(table_number, periodic_secs):
return ((table_number + 1) * periodic_secs) - 1

def list_backups(backup_path):
popen = subprocess.Popen(['bigtable-backup', 'list-backups', '-ojson', '--backup-path', backup_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen.wait()

return json.loads(popen.stdout.readline())

def set_ensure_backups_specific_metrics(args, num_backups_deleted, active_table_number):
# ensure-backups job specific metrics
bigtable_backup_job_tables_backed_up = Gauge('bigtable_backup_job_tables_backed_up', 'Number of active and inactive tables backed up.', ['kind'], registry=registry)
bigtable_backup_job_backups = Gauge('bigtable_backup_job_backups', 'Number of backups for all active and inactive tables.', ['kind'], registry=registry)
bigtable_backup_job_backups_deleted = Gauge('bigtable_backup_job_backups_deleted', 'Number of backups deleted during last run.', registry=registry)
bigtable_backup_job_expected_inactive_table_backups = Gauge('bigtable_backup_job_expected_inactive_table_backups', 'Expected number of backups for inactive tables.', registry=registry)

duration = args.duration
if args.duration == None:
duration = (args.period_to - args.period_from).days

# there should be 1 backup per inactive table
periodic_table_duration_in_days = args.periodic_table_duration / 86400
bigtable_backup_job_expected_inactive_table_backups.set(int(duration/periodic_table_duration_in_days))

bigtable_backup_job_backups_deleted.set(num_backups_deleted)

backups = list_backups(args.destination_path)
inactive_table_backups_count = 0

# setting sum of number of backups per table
for table_id, timestamps in backups.items():
table_number = int(table_id.rsplit("_", 1)[-1])

label = 'active'
if active_table_number != table_number:
label = 'inactive'
inactive_table_backups_count += 1

bigtable_backup_job_backups.labels(label).inc(len(timestamps))

bigtable_backup_job_tables_backed_up.labels('inactive').set(inactive_table_backups_count)
if len(backups) != inactive_table_backups_count:
bigtable_backup_job_tables_backed_up.labels('active').set(1)

def valid_date(s):
try:
Expand Down Expand Up @@ -131,7 +168,6 @@ def create_backup(table_id, args):
else:
print("Backup created for table {}".format(table_id))


def delete_backup(table_id, timestamp, args):
popen = subprocess.Popen(['bigtable-backup', 'delete-backup', '--bigtable-table-id', table_id,
'--backup-path', args.destination_path, "--backup-timestamp", timestamp],
Expand Down Expand Up @@ -163,6 +199,17 @@ def push_metrics(endpoint, namespace, job):
except Exception as e:
print("failed to push metrics with error {}".format(e))

def delete_out_of_range_backups(oldest_table_number, newest_table_number, backups, args):
num_backups_deleted = 0
for table_id, timestamps in backups.items():
table_number = int(table_id.rsplit("_", 1)[-1])
if table_number < oldest_table_number or table_number > newest_table_number:
for timestamp in timestamps:
delete_backup(table_id, timestamp, args)
num_backups_deleted += 1

return num_backups_deleted


def main():
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -193,6 +240,8 @@ def main():
ensure_backups_parser.add_argument('--period-from', type=valid_date, help="Backups should exist starting from the date. Must not be set with --duration")
ensure_backups_parser.add_argument('--period-to', type=valid_date,
default=datetime.utcnow().strftime("%Y-%m-%d"))
ensure_backups_parser.add_argument('--delete-out-of-range-backups', help="Delete backups which are out of range of duration for which backups are being ensured",
default=False)
ensure_backups_parser.set_defaults(func=ensure_backups)

args = parser.parse_args()
Expand Down

0 comments on commit e65fd2e

Please sign in to comment.