diff --git a/source/adios2/helper/adiosNetwork.cpp b/source/adios2/helper/adiosNetwork.cpp index a29abd462b..309260fcdf 100644 --- a/source/adios2/helper/adiosNetwork.cpp +++ b/source/adios2/helper/adiosNetwork.cpp @@ -79,7 +79,7 @@ std::string GetFQDN() noexcept { for (p = info; p != NULL; p = p->ai_next) { - printf("hostname: %s\n", p->ai_canonname); + // printf("hostname: %s\n", p->ai_canonname); if (strchr(p->ai_canonname, '.') != NULL) { strncpy(hostname, p->ai_canonname, sizeof(hostname)); diff --git a/source/utils/adios_campaign_manager/adios2_campaign_manager.py b/source/utils/adios_campaign_manager/adios2_campaign_manager.py index c752d95aac..171af0b4cc 100755 --- a/source/utils/adios_campaign_manager/adios2_campaign_manager.py +++ b/source/utils/adios_campaign_manager/adios2_campaign_manager.py @@ -6,61 +6,91 @@ import zlib from datetime import datetime from os import chdir, getcwd, remove, stat -from os.path import exists, isdir +from os.path import exists, isdir, expanduser from re import sub from socket import getfqdn -from time import time +from time import time_ns # from adios2.adios2_campaign_manager import * ADIOS_ACA_VERSION = "1.0" + +def ReadConfig(): + path = expanduser("~/.config/adios2/campaign.cfg") + try: + with open(path) as f: + lines = f.readlines() + for line in lines: + lst = line.split() + if lst[0] == "campaignstorepath": + adios_campaign_store = expanduser(lst[1]) + except FileNotFoundError: + adios_campaign_store = None + return adios_campaign_store + + def SetupArgs(): parser = argparse.ArgumentParser() parser.add_argument( - "command", help="Command: create/update/delete", - choices=['create', 'update', 'delete', 'info']) - parser.add_argument("--verbose", "-v", - help="More verbosity", action="count") - parser.add_argument("--project", "-p", - help="Project name", - required=True) - parser.add_argument("--app", "-a", - help="Application name", - required=True) - parser.add_argument("--shot", "-s", - help="Shot name", - required=True) - parser.add_argument("--campaign_store", "-c", - help="Path to local campaign store", - required=True) - parser.add_argument("--hostname", "-n", - help="Host name unique for hosts in a campaign", - required=False) + "command", + help="Command: create/update/delete/info/list", + choices=["create", "update", "delete", "info", "list"], + ) + parser.add_argument( + "campaign", help="Campaign name or path, with .aca or without", default=None, nargs="?" + ) + parser.add_argument("--verbose", "-v", help="More verbosity", action="count", default=0) + parser.add_argument( + "--campaign_store", "-s", help="Path to local campaign store", default=None + ) + parser.add_argument( + "--hostname", "-n", help="Host name unique for hosts in a campaign", required=False + ) args = parser.parse_args() # default values args.update = False - args.CampaignFileName = args.campaign_store + "/" + \ - args.project + "_" + args.app + "_" + args.shot + ".aca" + if args.campaign_store is None: + args.campaign_store = ReadConfig() + + if args.campaign_store is not None: + while args.campaign_store[-1] == "/": + args.campaign_store = args.campaign_store[:-1] + + args.CampaignFileName = args.campaign + if args.campaign is not None: + if not args.campaign.endswith(".aca"): + args.CampaignFileName += ".aca" + if (not exists(args.CampaignFileName) and + not args.CampaignFileName.startswith("/") and + args.campaign_store is not None): + args.CampaignFileName = args.campaign_store + "/" + args.CampaignFileName + args.LocalCampaignDir = "adios-campaign/" - # print("Verbosity = {0}".format(args.verbose)) - print(f"Campaign File Name = {args.CampaignFileName}") + if args.verbose > 0: + print(f"# Verbosity = {args.verbose}") + print(f"# Command = {args.command}") + print(f"# Campaign File Name = {args.CampaignFileName}") + print(f"# Campaign Store = {args.campaign_store}") return args def CheckCampaignStore(args): - if not isdir(args.campaign_store): - print("ERROR: Campaign directory " + args.campaign_store + - " does not exist", flush=True) + if args.campaign_store is not None and not isdir(args.campaign_store): + print("ERROR: Campaign directory " + args.campaign_store + " does not exist", flush=True) exit(1) def CheckLocalCampaignDir(args): if not isdir(args.LocalCampaignDir): - print("ERROR: Shot campaign data '" + args.LocalCampaignDir + - "' does not exist. Run this command where the code was executed.", flush=True) + print( + "ERROR: Shot campaign data '" + + args.LocalCampaignDir + + "' does not exist. Run this command where the code was executed.", + flush=True, + ) exit(1) @@ -102,7 +132,7 @@ def decompressBuffer(buf: bytearray): def AddFileToArchive(args: dict, filename: str, cur: sqlite3.Cursor, dsID: int): compressed = 1 try: - f = open(filename, 'rb') + f = open(filename, "rb") compressed_data, len_orig, len_compressed = compressFile(f) except IOError: @@ -110,38 +140,58 @@ def AddFileToArchive(args: dict, filename: str, cur: sqlite3.Cursor, dsID: int): return statres = stat(filename) - ct = statres.st_ctime - - cur.execute('insert into bpfile values (?, ?, ?, ?, ?, ?, ?)', - (dsID, filename, compressed, len_orig, len_compressed, ct, compressed_data)) - con.commit() - - # test - # if (filename == "dataAll.bp/md.0"): - # data = decompressBuffer(compressed_data) - # of = open("dataAll.bp-md.0", "wb") - # of.write(data) - # of.close() + ct = statres.st_ctime_ns - -def AddDatasetToArchive(args: dict, dataset: str, cur: sqlite3.Cursor, hostID: int, dirID: int): - if (IsADIOSDataset(dataset)): - print(f"Add dataset {dataset} to archive") - statres = stat(dataset) - ct = statres.st_ctime - curDS = cur.execute('insert into bpdataset values (?, ?, ?, ?)', - (hostID, dirID, dataset, ct)) - dsID = curDS.lastrowid - cwd = getcwd() - chdir(dataset) - mdFileList = glob.glob('*md.*') - profileList = glob.glob('profiling.json') - files = mdFileList + profileList - for f in files: - AddFileToArchive(args, f, cur, dsID) - chdir(cwd) + cur.execute( + "insert into bpfile " + "(bpdatasetid, name, compression, lenorig, lencompressed, ctime, data) " + "values (?, ?, ?, ?, ?, ?, ?) " + "on conflict (bpdatasetid, name) do update " + "set compression = ?, lenorig = ?, lencompressed = ?, ctime = ?, data = ?", + ( + dsID, + filename, + compressed, + len_orig, + len_compressed, + ct, + compressed_data, + compressed, + len_orig, + len_compressed, + ct, + compressed_data, + ), + ) + + +def AddDatasetToArchive(hostID: int, dirID: int, dataset: str, cur: sqlite3.Cursor) -> int: + statres = stat(dataset) + ct = statres.st_ctime_ns + select_cmd = ( + "select rowid from bpdataset " + f"where hostid = {hostID} and dirid = {dirID} and name = '{dataset}'" + ) + res = cur.execute(select_cmd) + row = res.fetchone() + if row is not None: + rowID = row[0] + print( + f"Found dataset {dataset} in database on host {hostID} " + f"in dir {dirID}, rowid = {rowID}" + ) else: - print(f"WARNING: Dataset {dataset} is not an ADIOS dataset. Skip") + print(f"Add dataset {dataset} to archive") + curDS = cur.execute( + "insert into bpdataset (hostid, dirid, name, ctime) values (?, ?, ?, ?)", + (hostID, dirID, dataset, ct), + ) + rowID = curDS.lastrowid + # print( + # f"Inserted bpdataset {dataset} in database on host {hostID}" + # f" in dir {dirID}, rowid = {rowID}" + # ) + return rowID def ProcessDBFile(args: dict, jsonlist: list, cur: sqlite3.Cursor, hostID: int, dirID: int): @@ -149,8 +199,20 @@ def ProcessDBFile(args: dict, jsonlist: list, cur: sqlite3.Cursor, hostID: int, # print(f"Process entry {entry}:") if isinstance(entry, dict): if "name" in entry: - AddDatasetToArchive( - args, entry['name'], cur, hostID, dirID) + dsID = 0 + dataset = entry["name"] + if IsADIOSDataset(dataset): + dsID = AddDatasetToArchive(hostID, dirID, dataset, cur) + cwd = getcwd() + chdir(dataset) + mdFileList = glob.glob("*md.*") + profileList = glob.glob("profiling.json") + files = mdFileList + profileList + for f in files: + AddFileToArchive(args, f, cur, dsID) + chdir(cwd) + else: + print(f"WARNING: Dataset {dataset} is not an ADIOS dataset. Skip") else: print(f"WARNING: your object is not a dictionary, skip : {entry}") @@ -158,50 +220,60 @@ def ProcessDBFile(args: dict, jsonlist: list, cur: sqlite3.Cursor, hostID: int, def GetHostName(): host = getfqdn() if host.startswith("login"): - host = sub('^login[0-9]*\\.', '', host) + host = sub("^login[0-9]*\\.", "", host) if host.startswith("batch"): - host = sub('^batch[0-9]*\\.', '', host) - shorthost = host.split('.')[0] + host = sub("^batch[0-9]*\\.", "", host) + shorthost = host.split(".")[0] return host, shorthost def AddHostName(longHostName, shortHostName): - res = cur.execute( - 'select rowid from host where hostname = "' + shortHostName + '"') + res = cur.execute('select rowid from host where hostname = "' + shortHostName + '"') row = res.fetchone() if row is not None: hostID = row[0] print(f"Found host {shortHostName} in database, rowid = {hostID}") else: - curHost = cur.execute('insert into host values (?, ?)', - (shortHostName, longHostName)) + curHost = cur.execute("insert into host values (?, ?)", (shortHostName, longHostName)) hostID = curHost.lastrowid print(f"Inserted host {shortHostName} into database, rowid = {hostID}") return hostID -def Info(args: dict, cur: sqlite3.Cursor): - res = cur.execute('select id, name, version, ctime from info') - info = res.fetchone() - t = datetime.fromtimestamp(float(info[3])) - print(f"{info[1]}, version {info[2]}, created on {t}") +def MergeDBFiles(dbfiles: list): + # read db files here + result = list() + for f1 in dbfiles: + try: + con = sqlite3.connect(f1) + except sqlite3.Error as e: + print(e) - res = cur.execute('select rowid, hostname, longhostname from host') - hosts = res.fetchall() - for host in hosts: - print(f"hostname = {host[1]} longhostname = {host[2]}") - res2 = cur.execute( - 'select rowid, name from directory where hostid = "' + str(host[0]) + '"') - dirs = res2.fetchall() - for dir in dirs: - print(f" dir = {dir[1]}") - res3 = cur.execute( - 'select rowid, name, ctime from bpdataset where hostid = "' + str(host[0]) + - '" and dirid = "' + str(dir[0]) + '"') - bpdatasets = res3.fetchall() - for bpdataset in bpdatasets: - t = datetime.fromtimestamp(float(bpdataset[2])) - print(f" dataset = {bpdataset[1]} created on {t}") + cur = con.cursor() + try: + cur.execute("select * from bpfiles") + except sqlite3.Error as e: + print(e) + record = cur.fetchall() + for item in record: + result.append({"name": item[0]}) + cur.close() + return result + + +def AddDirectory(hostID: int, path: str) -> int: + res = cur.execute( + "select rowid from directory where hostid = " + str(hostID) + ' and name = "' + path + '"' + ) + row = res.fetchone() + if row is not None: + dirID = row[0] + print(f"Found directory {path} with hostID {hostID} in database, rowid = {dirID}") + else: + curDirectory = cur.execute("insert into directory values (?, ?)", (hostID, path)) + dirID = curDirectory.lastrowid + print(f"Inserted directory {path} into database, rowid = {dirID}") + return dirID def Update(args: dict, cur: sqlite3.Cursor): @@ -212,13 +284,8 @@ def Update(args: dict, cur: sqlite3.Cursor): hostID = AddHostName(longHostName, shortHostName) rootdir = getcwd() - # curHost = cur.execute('insert into host values (?, ?)', - # (shortHostName, longHostName)) - # hostID = curHost.lastrowid + dirID = AddDirectory(hostID, rootdir) - curDir = cur.execute('insert or replace into directory values (?, ?)', - (hostID, rootdir)) - dirID = curDir.lastrowid con.commit() db_list = MergeDBFiles(dbFileList) @@ -230,66 +297,118 @@ def Update(args: dict, cur: sqlite3.Cursor): def Create(args: dict, cur: sqlite3.Cursor): - epoch = int(time()) + epoch = time_ns() + cur.execute("create table info(id TEXT, name TEXT, version TEXT, ctime INT)") + cur.execute( + "insert into info values (?, ?, ?, ?)", + ("ACA", "ADIOS Campaign Archive", ADIOS_ACA_VERSION, epoch), + ) + cur.execute("create table host" + "(hostname TEXT PRIMARY KEY, longhostname TEXT)") + cur.execute("create table directory" + "(hostid INT, name TEXT, PRIMARY KEY (hostid, name))") cur.execute( - "create table info(id TEXT, name TEXT, version TEXT, ctime INT)") - cur.execute('insert into info values (?, ?, ?, ?)', - ("ACA", "ADIOS Campaign Archive", ADIOS_ACA_VERSION, epoch)) - cur.execute("create table host" + - "(hostname TEXT PRIMARY KEY, longhostname TEXT)") - cur.execute("create table directory" + - "(hostid INT, name TEXT, PRIMARY KEY (hostid, name))") - cur.execute("create table bpdataset" + - "(hostid INT, dirid INT, name TEXT, ctime INT" + - ", PRIMARY KEY (hostid, dirid, name))") - cur.execute("create table bpfile" + - "(bpdatasetid INT, name TEXT, compression INT, lenorig INT" + - ", lencompressed INT, ctime INT, data BLOB" + - ", PRIMARY KEY (bpdatasetid, name))") + "create table bpdataset" + + "(hostid INT, dirid INT, name TEXT, ctime INT" + + ", PRIMARY KEY (hostid, dirid, name))" + ) + cur.execute( + "create table bpfile" + + "(bpdatasetid INT, name TEXT, compression INT, lenorig INT" + + ", lencompressed INT, ctime INT, data BLOB" + + ", PRIMARY KEY (bpdatasetid, name))" + ) Update(args, cur) -def MergeDBFiles(dbfiles: list): - # read db files here - result = list() - for f1 in dbfiles: - try: - con = sqlite3.connect(f1) - except sqlite3.Error as e: - print(e) +def timestamp_to_datetime(timestamp: int) -> datetime: + digits = len(str(int(timestamp))) + t = float(timestamp) + if digits > 18: + t = t / 1000000000 + elif digits > 15: + t = t / 1000000 + elif digits > 12: + t = t / 1000 + return datetime.fromtimestamp(t) - cur = con.cursor() - try: - cur.execute("select * from bpfiles") - except sqlite3.Error as e: - print(e) - record = cur.fetchall() - for item in record: - result.append({"name": item[0]}) - cur.close() - return result +def Info(args: dict, cur: sqlite3.Cursor): + res = cur.execute("select id, name, version, ctime from info") + info = res.fetchone() + t = timestamp_to_datetime(info[3]) + print(f"{info[1]}, version {info[2]}, created on {t}") + + res = cur.execute("select rowid, hostname, longhostname from host") + hosts = res.fetchall() + for host in hosts: + print(f"hostname = {host[1]} longhostname = {host[2]}") + res2 = cur.execute( + 'select rowid, name from directory where hostid = "' + str(host[0]) + '"' + ) + dirs = res2.fetchall() + for dir in dirs: + print(f" dir = {dir[1]}") + res3 = cur.execute( + 'select rowid, name, ctime from bpdataset where hostid = "' + + str(host[0]) + + '" and dirid = "' + + str(dir[0]) + + '"' + ) + bpdatasets = res3.fetchall() + for bpdataset in bpdatasets: + t = timestamp_to_datetime(bpdataset[2]) + print(f" dataset = {bpdataset[1]} created on {t}") -if __name__ == "__main__": +def List(): + path = args.campaign + if path is None: + if args.campaign_store is None: + print("ERROR: Set --campaign_store for this command") + return 1 + path = args.campaign_store + else: + while path[-1] == "/": + path = path[:-1] + + # List the local campaign store + acaList = glob.glob(path + "/**/*.aca", recursive=True) + if len(acaList) == 0: + print("There are no campaign archives in " + path) + return 2 + else: + startCharPos = len(path) + 1 + for f in acaList: + print(f[startCharPos:]) + return 0 + + +def Delete(): + if exists(args.CampaignFileName): + print(f"Delete archive {args.CampaignFileName}") + remove(args.CampaignFileName) + return 0 + else: + print(f"ERROR: archive {args.CampaignFileName} does not exist") + return 1 + + +if __name__ == "__main__": args = SetupArgs() CheckCampaignStore(args) - if (args.command == "delete"): - if exists(args.CampaignFileName): - print(f"Delete archive {args.CampaignFileName}") - remove(args.CampaignFileName) - exit(0) - else: - print(f"ERROR: archive {args.CampaignFileName} does not exist") - exit(1) + if args.command == "list": + exit(List()) + + if args.command == "delete": + exit(Delete()) - if (args.command == "create"): + if args.command == "create": print("Create archive") if exists(args.CampaignFileName): print(f"ERROR: archive {args.CampaignFileName} already exist") exit(1) - elif (args.command == "update" or args.command == 'info'): + elif args.command == "update" or args.command == "info": print(f"{args.command} archive") if not exists(args.CampaignFileName): print(f"ERROR: archive {args.CampaignFileName} does not exist") @@ -298,19 +417,19 @@ def MergeDBFiles(dbfiles: list): con = sqlite3.connect(args.CampaignFileName) cur = con.cursor() - if (args.command == "info"): + if args.command == "info": Info(args, cur) else: CheckLocalCampaignDir(args) # List the local campaign directory - dbFileList = glob.glob(args.LocalCampaignDir + '/*.db') + dbFileList = glob.glob(args.LocalCampaignDir + "/*.db") if len(dbFileList) == 0: print("There are no campaign data files in " + args.LocalCampaignDir) exit(2) - if (args.command == "create"): + if args.command == "create": Create(args, cur) - elif (args.command == "update"): + elif args.command == "update": Update(args, cur) cur.close()