From 705ab8a96283801ad1c9b095ea824ac8129bcbc0 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Sun, 24 Mar 2024 21:00:18 +0000 Subject: [PATCH 1/9] Implemented state saving and completion check using pickle files Signed-off-by: Arunima Chaudhuri --- DAS/simulator.py | 41 +++++++++++++++++++++++++++++++++++++++++ study.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/DAS/simulator.py b/DAS/simulator.py index 7d4b341..a932e97 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -9,6 +9,9 @@ from DAS.results import * from DAS.observer import * from DAS.node import * +import os +import pickle +import uuid class Simulator: """This class implements the main DAS simulator.""" @@ -273,7 +276,9 @@ def run(self): trafficStatsVector = [] malicious_nodes_not_added_count = 0 steps = 0 + unique_run_id = str(uuid.uuid4()) while(True): + vectors_data = [] missingVector.append(missingSamples) self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) @@ -350,7 +355,43 @@ def run(self): self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) break + + for i in range(0, self.shape.numberNodes): + validator_data = { + 'validator_ID': self.validators[i].ID, + 'rowIDs': list(self.validators[i].rowIDs), + 'columnIDs': list(self.validators[i].columnIDs), + 'amImalicious': self.validators[i].amImalicious, + 'amIaddedToQueue': self.validators[i].amIaddedToQueue, + 'msgSentCount': self.validators[i].msgSentCount, + 'msgRecvCount': self.validators[i].msgRecvCount, + 'sampleSentCount': self.validators[i].sampleSentCount, + 'sampleRecvCount': self.validators[i].sampleRecvCount, + 'restoreRowCount': self.validators[i].restoreRowCount, + 'restoreColumnCount': self.validators[i].restoreColumnCount, + 'repairedSampleCount': self.validators[i].repairedSampleCount, + 'rowNeighbors': list(self.validators[i].rowNeighbors), + 'columnNeighbors': list(self.validators[i].columnNeighbors) + } + vectors_data.append(validator_data) + + vectors_data += (progressVector,missingVector) + backup_folder = f"results/{self.execID}/backup" + if not os.path.exists(backup_folder): + os.makedirs(backup_folder) + backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") + with open(backup_file, 'ab') as f: + pickle.dump(vectors_data, f) steps += 1 + + + backup_folder = f"results/{self.execID}/backup" + if not os.path.exists(backup_folder): + os.makedirs(backup_folder) + backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") + + with open(backup_file, 'ab') as f: # Open in append binary mode + pickle.dump("completed", f) for i in range(0,self.shape.numberNodes): if not self.validators[i].amIaddedToQueue : diff --git a/study.py b/study.py index 380cf30..6ade619 100644 --- a/study.py +++ b/study.py @@ -45,7 +45,50 @@ def runOnce(config, shape, execID): return result + +def check_simulation_completion(state_file): + backup_dir = os.path.join(os.path.dirname(state_file), "backup") + if not os.path.exists(backup_dir): + return False + + all_completed = True + for filename in sorted(os.listdir(backup_dir), reverse=True): # Iterate in reverse order + if not filename.endswith(".pkl"): + continue + full_path = os.path.join(backup_dir, filename) + try: + with open(full_path, 'rb') as f: + items = [] + while True: + try: + item = pickle.load(f) + items.append(item) # Load all items + except EOFError: # Reached end of file + break + last_item = items[-1] # Access the last item + # print(last_item) + if last_item != "completed": + all_completed = False + break # No need to continue checking other files + except (OSError, pickle.UnpicklingError) as e: + print(f"Error loading state from {full_path}: {e}") + all_completed = False # Treat errors as incomplete + break # No need to continue checking other files + return all_completed + + def study(): + restart_path = None + for arg in sys.argv[1:]: + if arg.startswith("--restart="): + restart_path = arg[len("--restart="):] + + if restart_path: + execID = restart_path.split("/")[1] + state_file = f"results/{execID}/backup" + print(check_simulation_completion(state_file)) + sys.exit(0) + if len(sys.argv) < 2: print("You need to pass a configuration file in parameter") exit(1) From 04090fd89b16c24b3b6c89e70ea27185ac7c12a2 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Tue, 26 Mar 2024 13:06:11 +0000 Subject: [PATCH 2/9] identify incomplete simulations Signed-off-by: Arunima Chaudhuri --- study.py | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/study.py b/study.py index 6ade619..7013d14 100644 --- a/study.py +++ b/study.py @@ -52,6 +52,7 @@ def check_simulation_completion(state_file): return False all_completed = True + incomplete_files = [] for filename in sorted(os.listdir(backup_dir), reverse=True): # Iterate in reverse order if not filename.endswith(".pkl"): continue @@ -69,12 +70,12 @@ def check_simulation_completion(state_file): # print(last_item) if last_item != "completed": all_completed = False - break # No need to continue checking other files + incomplete_files.append(full_path) except (OSError, pickle.UnpicklingError) as e: print(f"Error loading state from {full_path}: {e}") all_completed = False # Treat errors as incomplete break # No need to continue checking other files - return all_completed + return all_completed, incomplete_files def study(): @@ -86,8 +87,40 @@ def study(): if restart_path: execID = restart_path.split("/")[1] state_file = f"results/{execID}/backup" - print(check_simulation_completion(state_file)) - sys.exit(0) + all_completed, incomplete_files = check_simulation_completion(state_file) + if all_completed: + print("Simulation is already completed.") + sys.exit(0) # Exit gracefully if already completed + else: + print(incomplete_files) + # Load the state (if available) + # all_results = [] + # for incomplete_file in incomplete_files: + # latest_state = None + # try: + # with open(incomplete_file, 'rb') as f: + # items = [] + # while True: + # try: + # item = pickle.load(f) + # items.append(item) + # except EOFError: + # break + # latest_state = items[-1] # Assuming state is the last item + # except (OSError, pickle.UnpicklingError) as e: + # print(f"Error loading state from {incomplete_file}: {e}") + # if latest_state: + # try: + # # Assuming configuration file is 'smallConf.py' + # config = importlib.import_module("smallConf") + # results = Parallel(config.numJobs)(delayed(runOnce)(config, shape, execID, latest_state) for shape in config.nextShape()) + # all_results.extend(results) # Collect results from all restarts + # except ModuleNotFoundError as e: + # print(f"Error importing configuration file 'smallConf.py': {e}") + # else: + # print(f"No state found for restart from {incomplete_file}. Skipping.") + + sys.exit(0) if len(sys.argv) < 2: print("You need to pass a configuration file in parameter") From ec0f9cc7818d74963e98061f544ef18f9026592a Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Tue, 26 Mar 2024 13:08:38 +0000 Subject: [PATCH 3/9] Clear code Signed-off-by: Arunima Chaudhuri --- study.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/study.py b/study.py index 7013d14..e3044c8 100644 --- a/study.py +++ b/study.py @@ -93,33 +93,6 @@ def study(): sys.exit(0) # Exit gracefully if already completed else: print(incomplete_files) - # Load the state (if available) - # all_results = [] - # for incomplete_file in incomplete_files: - # latest_state = None - # try: - # with open(incomplete_file, 'rb') as f: - # items = [] - # while True: - # try: - # item = pickle.load(f) - # items.append(item) - # except EOFError: - # break - # latest_state = items[-1] # Assuming state is the last item - # except (OSError, pickle.UnpicklingError) as e: - # print(f"Error loading state from {incomplete_file}: {e}") - # if latest_state: - # try: - # # Assuming configuration file is 'smallConf.py' - # config = importlib.import_module("smallConf") - # results = Parallel(config.numJobs)(delayed(runOnce)(config, shape, execID, latest_state) for shape in config.nextShape()) - # all_results.extend(results) # Collect results from all restarts - # except ModuleNotFoundError as e: - # print(f"Error importing configuration file 'smallConf.py': {e}") - # else: - # print(f"No state found for restart from {incomplete_file}. Skipping.") - sys.exit(0) if len(sys.argv) < 2: From c4db8e408e4499f473208d7b0ee94b4a0eb0aa11 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Wed, 27 Mar 2024 11:22:56 +0000 Subject: [PATCH 4/9] store config shape in pickle files Signed-off-by: Arunima Chaudhuri --- DAS/simulator.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index a932e97..8a1867f 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -277,6 +277,13 @@ def run(self): malicious_nodes_not_added_count = 0 steps = 0 unique_run_id = str(uuid.uuid4()) + backup_folder = f"results/{self.execID}/backup" + if not os.path.exists(backup_folder): + os.makedirs(backup_folder) + backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") + + with open(backup_file, 'ab') as f: + pickle.dump(self.shape.__dict__, f) while(True): vectors_data = [] missingVector.append(missingSamples) @@ -374,7 +381,7 @@ def run(self): 'columnNeighbors': list(self.validators[i].columnNeighbors) } vectors_data.append(validator_data) - + # Alse store for initNetwork vectors_data += (progressVector,missingVector) backup_folder = f"results/{self.execID}/backup" if not os.path.exists(backup_folder): From d591c1724fc5f1aca22480ffa13e105158af23ba Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Wed, 27 Mar 2024 20:38:47 +0000 Subject: [PATCH 5/9] Implement stop and resume functionality Signed-off-by: Arunima Chaudhuri --- DAS/simulator.py | 38 ++------------------------- study.py | 68 +++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 61 insertions(+), 45 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 8a1867f..947dba7 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -281,11 +281,10 @@ def run(self): if not os.path.exists(backup_folder): os.makedirs(backup_folder) backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") - with open(backup_file, 'ab') as f: pickle.dump(self.shape.__dict__, f) + while(True): - vectors_data = [] missingVector.append(missingSamples) self.logger.debug("Expected Samples: %d" % expected, extra=self.format) self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format) @@ -362,42 +361,9 @@ def run(self): self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) missingVector.append(missingSamples) break - - for i in range(0, self.shape.numberNodes): - validator_data = { - 'validator_ID': self.validators[i].ID, - 'rowIDs': list(self.validators[i].rowIDs), - 'columnIDs': list(self.validators[i].columnIDs), - 'amImalicious': self.validators[i].amImalicious, - 'amIaddedToQueue': self.validators[i].amIaddedToQueue, - 'msgSentCount': self.validators[i].msgSentCount, - 'msgRecvCount': self.validators[i].msgRecvCount, - 'sampleSentCount': self.validators[i].sampleSentCount, - 'sampleRecvCount': self.validators[i].sampleRecvCount, - 'restoreRowCount': self.validators[i].restoreRowCount, - 'restoreColumnCount': self.validators[i].restoreColumnCount, - 'repairedSampleCount': self.validators[i].repairedSampleCount, - 'rowNeighbors': list(self.validators[i].rowNeighbors), - 'columnNeighbors': list(self.validators[i].columnNeighbors) - } - vectors_data.append(validator_data) - # Alse store for initNetwork - vectors_data += (progressVector,missingVector) - backup_folder = f"results/{self.execID}/backup" - if not os.path.exists(backup_folder): - os.makedirs(backup_folder) - backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") - with open(backup_file, 'ab') as f: - pickle.dump(vectors_data, f) steps += 1 - - - backup_folder = f"results/{self.execID}/backup" - if not os.path.exists(backup_folder): - os.makedirs(backup_folder) - backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") - with open(backup_file, 'ab') as f: # Open in append binary mode + with open(backup_file, 'ab') as f: pickle.dump("completed", f) for i in range(0,self.shape.numberNodes): diff --git a/study.py b/study.py index e3044c8..597f9be 100644 --- a/study.py +++ b/study.py @@ -53,7 +53,9 @@ def check_simulation_completion(state_file): all_completed = True incomplete_files = [] - for filename in sorted(os.listdir(backup_dir), reverse=True): # Iterate in reverse order + completed_files = [] + completed_shapes = [] + for filename in sorted(os.listdir(backup_dir), reverse=True): if not filename.endswith(".pkl"): continue full_path = os.path.join(backup_dir, filename) @@ -63,19 +65,65 @@ def check_simulation_completion(state_file): while True: try: item = pickle.load(f) - items.append(item) # Load all items - except EOFError: # Reached end of file + items.append(item) + except EOFError: break last_item = items[-1] # Access the last item # print(last_item) if last_item != "completed": all_completed = False incomplete_files.append(full_path) + else: + completed_files.append(full_path) + completed_shapes.append(items[0]) except (OSError, pickle.UnpicklingError) as e: print(f"Error loading state from {full_path}: {e}") - all_completed = False # Treat errors as incomplete - break # No need to continue checking other files - return all_completed, incomplete_files + all_completed = False + break + return all_completed, incomplete_files, completed_files, completed_shapes + + +def start_simulation(execID, completed_files, completed_shapes, incomplete_files): + config = importlib.import_module("smallConf") + logger = initLogger(config) + format = {"entity": "Study"} + + results = [] + if not os.path.exists("results"): + os.makedirs("results") + dir = "results/"+execID + if not os.path.exists(dir): + os.makedirs(dir) + if config.saveGit: + with open(dir+"/git.diff", 'w') as f: + subprocess.run(["git", "diff"], stdout=f) + with open(dir+"/git.describe", 'w') as f: + subprocess.run(["git", "describe", "--always"], stdout=f) + subprocess.run(["cp", sys.argv[1], dir+"/"]) + + logger.info("Starting simulations:", extra=format) + start = time.time() + for shape in config.nextShape(): + comparison_dict = shape.__dict__.copy() + ignore_keys = ['randomSeed'] + for key in ignore_keys: + del comparison_dict[key] + + if any(all(comparison_dict[key] == completed_shape[key] for key in comparison_dict.keys() if key not in ignore_keys) for completed_shape in completed_shapes): + print(f"Skipping simulation for shape: {shape.__dict__} (already completed)") + else: + results.append(delayed(runOnce)(config, shape, execID)) + + results = Parallel(config.numJobs)(results) + end = time.time() + logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) + + if config.visualization: + vis = Visualizer(execID, config) + vis.plotHeatmaps() + + visual = Visualizor(execID, config, results) + visual.plotHeatmaps("nn", "fr") def study(): @@ -87,12 +135,14 @@ def study(): if restart_path: execID = restart_path.split("/")[1] state_file = f"results/{execID}/backup" - all_completed, incomplete_files = check_simulation_completion(state_file) + all_completed, incomplete_files, completed_files, completed_shapes = check_simulation_completion(state_file) + print(completed_shapes) if all_completed: print("Simulation is already completed.") - sys.exit(0) # Exit gracefully if already completed + sys.exit(0) else: - print(incomplete_files) + print("Restarting simulations.") + start_simulation(execID, completed_files, completed_shapes, incomplete_files) sys.exit(0) if len(sys.argv) < 2: From 2aa0074163500684b5d61a914eaf3e5408b1da35 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Thu, 28 Mar 2024 08:59:39 +0000 Subject: [PATCH 6/9] Removed debug prints Signed-off-by: Arunima Chaudhuri --- study.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/study.py b/study.py index 597f9be..1a955d8 100644 --- a/study.py +++ b/study.py @@ -68,8 +68,7 @@ def check_simulation_completion(state_file): items.append(item) except EOFError: break - last_item = items[-1] # Access the last item - # print(last_item) + last_item = items[-1] if last_item != "completed": all_completed = False incomplete_files.append(full_path) @@ -110,7 +109,7 @@ def start_simulation(execID, completed_files, completed_shapes, incomplete_files del comparison_dict[key] if any(all(comparison_dict[key] == completed_shape[key] for key in comparison_dict.keys() if key not in ignore_keys) for completed_shape in completed_shapes): - print(f"Skipping simulation for shape: {shape.__dict__} (already completed)") + logger.info("Skipping simulation for shape (already completed): %s" % (str(shape.__dict__)), extra=format) else: results.append(delayed(runOnce)(config, shape, execID)) @@ -136,7 +135,6 @@ def study(): execID = restart_path.split("/")[1] state_file = f"results/{execID}/backup" all_completed, incomplete_files, completed_files, completed_shapes = check_simulation_completion(state_file) - print(completed_shapes) if all_completed: print("Simulation is already completed.") sys.exit(0) From 04004ed1fb520592687258dc6240d2f642f1deff Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Fri, 29 Mar 2024 21:49:57 +0000 Subject: [PATCH 7/9] make the changes on study level Signed-off-by: Arunima Chaudhuri --- DAS/simulator.py | 12 ------------ study.py | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 947dba7..3657b03 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -9,9 +9,6 @@ from DAS.results import * from DAS.observer import * from DAS.node import * -import os -import pickle -import uuid class Simulator: """This class implements the main DAS simulator.""" @@ -276,13 +273,6 @@ def run(self): trafficStatsVector = [] malicious_nodes_not_added_count = 0 steps = 0 - unique_run_id = str(uuid.uuid4()) - backup_folder = f"results/{self.execID}/backup" - if not os.path.exists(backup_folder): - os.makedirs(backup_folder) - backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") - with open(backup_file, 'ab') as f: - pickle.dump(self.shape.__dict__, f) while(True): missingVector.append(missingSamples) @@ -363,8 +353,6 @@ def run(self): break steps += 1 - with open(backup_file, 'ab') as f: - pickle.dump("completed", f) for i in range(0,self.shape.numberNodes): if not self.validators[i].amIaddedToQueue : diff --git a/study.py b/study.py index 1a955d8..bc71add 100644 --- a/study.py +++ b/study.py @@ -5,6 +5,9 @@ import subprocess from joblib import Parallel, delayed from DAS import * +import os +import pickle +import uuid # Parallel execution: # The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see @@ -29,6 +32,14 @@ def runOnce(config, shape, execID): shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) + unique_run_id = str(uuid.uuid4()) + backup_folder = f"results/{execID}/backup" + if not os.path.exists(backup_folder): + os.makedirs(backup_folder) + backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") + with open(backup_file, 'ab') as f: + pickle.dump(shape.__dict__, f) + sim = Simulator(shape, config, execID) sim.initLogger() sim.initValidators() @@ -43,6 +54,9 @@ def runOnce(config, shape, execID): visual = Visualizor(execID, config, [result]) visual.plotAll() + with open(backup_file, 'ab') as f: + pickle.dump("completed", f) + return result @@ -98,7 +112,6 @@ def start_simulation(execID, completed_files, completed_shapes, incomplete_files subprocess.run(["git", "diff"], stdout=f) with open(dir+"/git.describe", 'w') as f: subprocess.run(["git", "describe", "--always"], stdout=f) - subprocess.run(["cp", sys.argv[1], dir+"/"]) logger.info("Starting simulations:", extra=format) start = time.time() From 4ad181c91d3cc977bb82073d99f359269483a56c Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Mon, 13 May 2024 16:14:14 +0000 Subject: [PATCH 8/9] use shape for naming pickle files for restarting simulations Signed-off-by: Arunima Chaudhuri --- study.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/study.py b/study.py index bc71add..6776e90 100644 --- a/study.py +++ b/study.py @@ -32,11 +32,10 @@ def runOnce(config, shape, execID): shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) - unique_run_id = str(uuid.uuid4()) backup_folder = f"results/{execID}/backup" if not os.path.exists(backup_folder): os.makedirs(backup_folder) - backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl") + backup_file = os.path.join(backup_folder, f"simulation_data_{shape}.pkl") with open(backup_file, 'ab') as f: pickle.dump(shape.__dict__, f) From 1935fafe19cca4a3e520edb0fb625dce17cdd9a2 Mon Sep 17 00:00:00 2001 From: Arunima Chaudhuri Date: Tue, 14 May 2024 06:45:54 +0000 Subject: [PATCH 9/9] Fixed simulation completion check Signed-off-by: Arunima Chaudhuri --- study.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/study.py b/study.py index 6776e90..7ad33b5 100644 --- a/study.py +++ b/study.py @@ -147,7 +147,23 @@ def study(): execID = restart_path.split("/")[1] state_file = f"results/{execID}/backup" all_completed, incomplete_files, completed_files, completed_shapes = check_simulation_completion(state_file) - if all_completed: + + current_shapes = [] + config = importlib.import_module("smallConf") + + completed_shapes_without_seed = completed_shapes + for shape in config.nextShape(): + shape_dict = copy.deepcopy(shape.__dict__) + del shape_dict['randomSeed'] + current_shapes.append(shape_dict) + for shape in completed_shapes_without_seed: + if 'randomSeed' in shape: + del shape['randomSeed'] + + completed_set = {frozenset(shape.items()) for shape in completed_shapes_without_seed} + current_set = {frozenset(shape.items()) for shape in current_shapes} + + if all_completed and completed_set == current_set: print("Simulation is already completed.") sys.exit(0) else: