From 3b2d958792a7e5e3c18f5da531789f148a172c17 Mon Sep 17 00:00:00 2001 From: Andrew Theurer Date: Fri, 8 Nov 2024 08:34:33 -0500 Subject: [PATCH 1/3] support cdmv8 - with backward compat with cdmv7 --- rickshaw-index | 224 ++++++++++++++++++++++-------- rickshaw-post-process-bench | 29 +++- schema/bench-params.json | 3 + schema/run.json | 12 ++ schema/sample-persistent-ids.json | 15 +- 5 files changed, 213 insertions(+), 70 deletions(-) diff --git a/rickshaw-index b/rickshaw-index index 32bffd6d..bafb10ad 100755 --- a/rickshaw-index +++ b/rickshaw-index @@ -49,7 +49,7 @@ $toolbox::logging::debug = 0; my @pids; my $index_tools = 1; -my @suported_cdm_vers = ('v6dev', 'v7dev'); +my @suported_cdm_vers = ('v7dev', 'v8dev'); my %result; my $base_run_dir; my %cdm = ( 'ver' => '' ); @@ -62,7 +62,13 @@ my @queued_docs; my @queued_ndjson; my @queued_terms; my %num_docs_submitted = ('run' => 0, 'iteration' => 0, 'param' => 0, 'tag' => 0, 'sample' => 0, 'period' => 0); -my $max_jobs = 200; +my $max_jobs = 20; +my $update_run_json = 0; +my $run_id_field; +my $iter_id_field; +my $samp_id_field; +my $period_id_field; +my $metric_id_field; sub usage { print "\nusage:\n\n"; @@ -70,6 +76,61 @@ sub usage { print "--max-jobs (maximum number of background jobs for indexing)\n"; } +sub add_persistent_uuid { + my $doc_ref = shift; # A reference to the object, like run, param, etc + my $doc_type = shift; # Valid types: run, tag, iteration, param, sample, period, metric_desc + my $update_var_ref = shift; # A reference to tracker var that indicates that we need to write data file + my $old_field_name = "id"; + my $new_field_name = $doc_type . "-uuid"; + + if ($cdm{'ver'} eq 'v7dev') { + # V7 uses the older field name, 'id' + if (exists $$doc_ref{$old_field_name}) { + # Already exists, nothing to do + } else { + $$doc_ref{$old_field_name} = Data::UUID->new->create_str(); + $$update_var_ref++; + debug_log(sprintf "Adding $old_field_name %s\n", $$doc_ref{$old_field_name}); + } + # Nothing else to do for v7 + return; + } + + # Newer than cdmv7: + + if (exists $$doc_ref{$new_field_name}) { + # Already exists, nothing to do + return; + } + + if (not exists $$doc_ref{$old_field_name}) { + # Did not find older style id, so simply create newer style id + $$doc_ref{$new_field_name} = Data::UUID->new->create_str(); + $$update_var_ref++; + debug_log(sprintf "Adding $new_field_name %s\n", $$doc_ref{$new_field_name}); + return; + } + + # Older style id does exist and newer style does not exist + + # The param doc actually used the id as the client/server id, + # so if this is detected, don't assume it is an older 'uuid' + if ($doc_type eq "param" and $$doc_ref{$old_field_name} =~ /^[\d]+$/) { + # Leave 'id' alone and create a new uuid + $$doc_ref{$new_field_name} = Data::UUID->new->create_str(); + debug_log(sprintf "Adding $new_field_name %s\n", $$doc_ref{$new_field_name}); + $$update_var_ref++; + return; + } + + # Older style id needs to move to newer style id + $$doc_ref{$new_field_name} = $$doc_ref{$old_field_name}; + debug_log(sprintf "Switching from %s to %s\n", $old_field_name, $new_field_name); + delete $$doc_ref{$old_field_name}; + $$update_var_ref++; + return; +} + sub http_request { my $method = shift; my $host = shift; @@ -118,8 +179,11 @@ sub create_es_doc { my $sample_idx = shift; my $period_idx = shift; + + + my %es_doc = ( 'cdm' => \%cdm ); - for my $field (qw(id harness host email name source begin end benchmark)) { + for my $field ($run_id_field, qw(harness host email name source begin end benchmark)) { if (exists $result{$field} and defined $result{$field}) { $es_doc{'run'}{$field} = $result{$field}; } elsif ($field =~ /benchmark|source/) { @@ -163,10 +227,10 @@ sub create_es_doc { my $param_idx = $sample_idx; if (exists $result{'iterations'}[$iter_idx]{'params'}[$param_idx]) { if (ref($result{'iterations'}[$iter_idx]{'params'}[$param_idx]) eq ref({})) { - for my $g (qw(status primary-metric primary-period id num path)) { + for my $g ($iter_id_field, qw(status primary-metric primary-period num path)) { $es_doc{'iteration'}{$g} = $result{'iterations'}[$iter_idx]{$g}; } - for my $field (qw(arg val)) { + for my $field (qw(arg val)) { ##TODO: add role and [cs]id if present if (exists $result{'iterations'}[$iter_idx]{'params'}[$param_idx]{$field} and defined $result{'iterations'}[$iter_idx]{'params'}[$param_idx]{$field}) { $es_doc{'param'}{$field} = $result{'iterations'}[$iter_idx]{'params'}[$param_idx]{$field}; } else { @@ -196,17 +260,17 @@ sub create_es_doc { } else { # All other doc types if (defined $iter_idx) { if (exists $result{'iterations'}[$iter_idx]) { - foreach my $iter_field (qw(status primary-metric primary-period id num path)) { + foreach my $iter_field ($iter_id_field, qw(status primary-metric primary-period num path)) { $es_doc{'iteration'}{$iter_field} = $result{'iterations'}[$iter_idx]{$iter_field}; } if (defined $sample_idx) { if (exists $result{'iterations'}[$iter_idx]{'samples'}[$sample_idx]) { - foreach my $sample_field (qw(status id num path)) { + foreach my $sample_field ($samp_id_field, qw(status num path)) { $es_doc{'sample'}{$sample_field} = $result{'iterations'}[$iter_idx]{'samples'}[$sample_idx]{$sample_field}; } if (defined $period_idx) { if (exists $result{'iterations'}[$iter_idx]{'samples'}[$sample_idx]) { - for my $period_field (qw(id name begin end)) { + for my $period_field ($period_id_field, qw(name begin end)) { $es_doc{'period'}{$period_field} = $result{'iterations'}[$iter_idx]{'samples'}[$sample_idx]{'periods'}[$period_idx]{$period_field}; } } else { @@ -339,17 +403,29 @@ sub write_queued_es_docs { sub wait_for_metric_descs { my @terms = @_; my $attempts = 1; - my $max_attempts = 20; + my $max_attempts = 8; my $submitted_metric_descs = scalar @terms; my $found_metric_descs = 0; + while ($found_metric_descs < $submitted_metric_descs) { if ($attempts > $max_attempts) { print "ERROR: could not ensure all OpenSearch metric_desc docs are indexed, exiting\n"; exit 1; } - sleep 2; - my $resp_ref= http_request("POST", "localhost:9200", "/cdm" . $cdm{'ver'} . "-metric_desc/_count/", - '{"query":{"terms":{"metric_desc.id": ' . $coder->encode(\@terms) . '}}}'); + sleep 5; + my $request_body; + my $request_path = "/cdm" . $cdm{'ver'} . "-metric_desc/_count/"; + if ($cdm{'ver'} eq 'v7dev') { + $request_body = '{"query":{"terms":{"metric_desc.id": ' . $coder->encode(\@terms) . '}}}'; + } else { # later than cdmv7 + $request_body = '{"query":{"terms":{"metric_desc.metric_desc-uuid": ' . $coder->encode(\@terms) . '}}}'; + } + #printf "submitting request_path:\n%s\n", $request_path; + #printf "submitting request_body:\n%s\n", $request_body; + + my $resp_ref = http_request("POST", "localhost:9200", $request_path, $request_body); + #print "\nrequest completed\nresponse:\n"; + #print Dumper $resp_ref; $found_metric_descs = $$resp_ref{'count'}; if ($found_metric_descs > $submitted_metric_descs) { printf "Something went wrong, the number of metrics found (%d) in OpenSearch is greater than the number submitted (%d)\n", $found_metric_descs, $submitted_metric_descs; @@ -462,11 +538,12 @@ sub index_metrics { debug_log(sprintf "Making sure %s has persistent IDs\n", $metr_json_file); my $update_metric_file = 0; for my $this_metr ( @$metr_ref ) { - if (! exists $$this_metr{'id'}) { - $$this_metr{'id'} = Data::UUID->new->create_str(); - debug_log(sprintf "Adding persistent metric ID %s\n", $$this_metr{'id'}); - $update_metric_file++; - } + #if (! exists $$this_metr{'id'}) { + #$$this_metr{'id'} = Data::UUID->new->create_str(); + #debug_log(sprintf "Adding persistent metric ID %s\n", $$this_metr{'id'}); + #$update_metric_file++; + #} + add_persistent_uuid($this_metr, "metric_desc", \$update_metric_file); } if ($update_metric_file > 0) { debug_log(sprintf "Added %d persistent IDs to %s\n", $update_metric_file, $metr_json_file); @@ -483,7 +560,9 @@ sub index_metrics { my %source; for my $this_metr ( @$metr_ref ) { my $idx = $$this_metr{'idx'}; - $uuid{$idx} = $$this_metr{'id'}; + #print "metric:\n"; + #print Dumper $this_metr; + $uuid{$idx} = $$this_metr{$metric_id_field}; my %metr_desc_doc = %$base_doc_ref; if (defined $$this_metr{'desc'} and defined $$this_metr{'desc'}{'class'} and defined $$this_metr{'desc'}{'source'} and defined $$this_metr{'desc'}{'type'}) { @@ -494,7 +573,7 @@ sub index_metrics { } $type{$idx} = $$this_metr{'desc'}{'type'}; $source{$idx} = $$this_metr{'desc'}{'source'}; - $metr_desc_doc{'metric_desc'}{'id'} = $uuid{$idx}; + $metr_desc_doc{'metric_desc'}{$metric_id_field} = $uuid{$idx}; if ( exists $$this_metr{'names'} ) { $metr_desc_doc{'metric_desc'}{'names'} = $$this_metr{'names'}; } @@ -510,12 +589,17 @@ sub index_metrics { my @names_list = sort(keys(%{ $metr_desc_doc{'metric_desc'}{'names'} })); $metr_desc_doc{'metric_desc'}{'names-list'} = \@names_list; + #print "metric_desc_doc:\n"; + #print Dumper \%metr_desc_doc; my $metr_desc_doc_json = $coder->encode(\%metr_desc_doc); #printf "metric_desc_doc:\n %s\n", $metr_desc_doc_json; # We do not use index_es_doc() here because that requires getting all info from the %result, # and %result (rickshaw-run.json) by design does not include any metric data, as it would be # way too large. + #print "Going to index this metric_desc doc: " . $metr_desc_doc_json . "\n"; my $resp_ref = http_request("POST", "localhost:9200", "/cdm" . $cdm{'ver'} . "-metric_desc/_doc/", $metr_desc_doc_json); + #print "response:\n"; + #print Dumper $resp_ref; $num_metric_docs_submitted++; } my $count = 0; @@ -529,19 +613,23 @@ sub index_metrics { my $end = $3; my $value = $4; my %data = ( 'begin' => $begin, 'end' => $end, 'value' => $value, 'duration' => $end - $begin + 1 ); - my %desc = ( 'id' => $uuid{$1} ); + #my %desc = ( 'metric_desc-uuid' => $uuid{$idx} ); + my %desc = ( $metric_id_field => $uuid{$idx} ); $metr_data_doc{'metric_desc'} = \%desc; $metr_data_doc{'metric_data'} = \%data; + #print Dumper \%metr_data_doc; my $metr_data_doc_json = $coder->encode(\%metr_data_doc); $ndjson .= sprintf "%s\n", '{ "index": {} }'; $ndjson .= sprintf "%s\n", $metr_data_doc_json; $count++; - if ($count >= 1000) { + if ($count >= 200) { if ($index_or_queue eq "index") { # OpenSearch docs type metric_data do not contain other sections run, iteration, sample, period, metric_desc, # as this would take up sunstantially more space for potentially millions of documents. + #print "going to index this ndjson:\n" . $ndjson; http_ndjson_request("POST", "localhost:9200", "/cdm" . $cdm{'ver'} . "-metric_data/_bulk", $ndjson); } else { + print "going to *queue* this ndjson:\n" . $ndjson; push(@queued_ndjson, $ndjson); } $ndjson = ""; @@ -597,7 +685,7 @@ sub index_metrics { sub indexed_doc_count { my $doc_type = shift; my $resp_ref= http_request("POST", "localhost:9200", "/cdm" . $cdm{'ver'} . "-" . $doc_type . "/_count/", - '{"query":{"bool":{"filter":[{"term":{"run.id":"' . $result{'id'} . '"}}]}}}'); + '{"query":{"bool":{"filter":[{"term":{"run.' . $run_id_field . '": "' . $result{$run_id_field} . '"}}]}}}'); debug_log(sprintf "response" . Dumper $resp_ref); return $$resp_ref{'count'}; } @@ -697,16 +785,16 @@ $sample_persistent_ids_schema_file = $rickshaw_project_dir . "/schema/sample-per # All OpenSearch document creation starts with the rickshaw-result.json which is a product of running # rickshaw-run, rickshaw-postprocess-bench, and rickshaw-postprocess-tools -my $result_file = $run_dir . "/rickshaw-run.json"; +my $run_file = $run_dir . "/rickshaw-run.json"; -my $fixup_status = rickshaw_run_schema_fixup($result_file, $result_schema_file); +my $fixup_status = rickshaw_run_schema_fixup($run_file, $result_schema_file); if ($fixup_status != 0) { exit $fixup_status; } # start processing rickshaw-run.json "for real" -debug_log(sprintf "Opening %s for normal processing\n", $result_file); -($file_rc, my $result_ref) = get_json_file($result_file, $result_schema_file); +debug_log(sprintf "Opening %s for normal processing\n", $run_file); +($file_rc, my $result_ref) = get_json_file($run_file, $result_schema_file); if ($file_rc > 0 or ! defined $result_ref) { print "Could not open the rickshaw-run file\n"; exit 1; @@ -720,29 +808,30 @@ if (defined $result_ref) { # add persistent IDs to the result data if it doesn't already exist if (exists $result{'iterations'}) { - my $update_result_json = 0; - debug_log(sprintf "Making sure %s has persistent IDs\n", $result_file); + debug_log(sprintf "Making sure %s has persistent IDs\n", $run_file); for my $iteration (@{ $result{'iterations'} }) { - if (! exists $$iteration{'id'}) { - $$iteration{'id'} = Data::UUID->new->create_str(); - debug_log(sprintf "Adding persistent iteration ID %s\n", $$iteration{'id'}); - $update_result_json++; - } + add_persistent_uuid($iteration, "iteration", \$update_run_json); + #if (! exists $$iteration{'id'}) { + #$$iteration{'id'} = Data::UUID->new->create_str(); + #debug_log(sprintf "Adding persistent iteration ID %s\n", $$iteration{'id'}); + #$update_run_json++; + #} for my $parameter (@{ $$iteration{'params'} }) { - if (! exists $$parameter{'id'}) { - $$parameter{'id'} = Data::UUID->new->create_str(); - debug_log(sprintf "Adding persistent parameter ID %s\n", $$parameter{'id'}); - $update_result_json++; - } + add_persistent_uuid($parameter, "param", \$update_run_json); + #if (! exists $$parameter{'id'}) { + #$$parameter{'id'} = Data::UUID->new->create_str(); + #debug_log(sprintf "Adding persistent parameter ID %s\n", $$parameter{'id'}); + #$update_run_json++; + #} } } - if ($update_result_json > 0) { - debug_log(sprintf "Added %d persistent IDs to %s\n", $update_result_json, $result_file); - debug_log(sprintf "Overwriting %s after persistent ID update\n", $result_file); - my $update_rc = put_json_file($result_file, \%result, $result_schema_file); + if ($update_run_json > 0) { + debug_log(sprintf "Added %d persistent IDs to %s\n", $update_run_json, $run_file); + debug_log(sprintf "Overwriting %s after persistent ID update\n", $run_file); + my $update_rc = put_json_file($run_file, \%result, $result_schema_file); if ($update_rc > 0) { print "Could not add persistent IDs to rickshaw-run file\n"; exit 1; @@ -807,16 +896,35 @@ if (not grep(/^$cdm{'ver'}$/, @suported_cdm_vers)) { printf "Either use an older version of CDM or (ideally) find a newer version of rickshaw which supports %s\n", $cdm{'ver'}; exit 1; } -printf "Exporting from %s to OpenSearch documents and POSTing to localhost:9200\n", $result_file; + +if ($cdm{'ver'} eq 'v7dev') { + $run_id_field = 'id'; + $iter_id_field = 'id'; + $samp_id_field = 'id'; + $period_id_field = 'id'; + $metric_id_field = 'id'; +} else { + $run_id_field = 'run-uuid'; + $iter_id_field = 'iteration-uuid'; + $samp_id_field = 'sample-uuid'; + $period_id_field = 'period-uuid'; + $metric_id_field = 'metric_desc-uuid'; +} + +printf "Exporting from %s to OpenSearch documents and POSTing to localhost:9200\n", $run_file; if (exists $result{'run-id'} and defined $result{'run-id'}) { # Convert to 'id', which matches OpenSearch docs $result{'id'} = $result{'run-id'}; delete $result{'run-id'}; } -if (not exists $result{'id'} or not defined $result{'id'}) { - $result{'id'} = Data::UUID->new->create_str(); -} + +add_persistent_uuid(\%result, "run", \$update_run_json); +#if (not exists $result{'id'} or not defined $result{'id'}) { + #$result{'id'} = Data::UUID->new->create_str(); +#} + +printf "run-uuid: %s\n", $result{$run_id_field}; my $host = `hostname`; chomp $host; @@ -849,12 +957,11 @@ if (-e $tool_dir and $index_tools == 1) { if (opendir(TOOLDIR, $tool_dir)) { my @tool_files = grep(/metric-data-\S+\.json/, readdir(TOOLDIR)); for my $tool_file (@tool_files) { - $tool_file =~ s/(metric-data-\S+)\.json.*/$1/; printf "Working on tool_file: %s\n", $tool_file; - my %job_args = ( 'tool-dir' => $tool_dir, + my %job_args = ( 'tool-dir' => $tool_dir, 'tool-file' => $tool_file, 'collector' => $collector, 'num' => $num, @@ -881,7 +988,7 @@ foreach my $job_args (@jobs) { #$tool_dir . "/" . $tool_file, $collector, $num, $base_metric_doc_ref); exit 0; } - if ($num_jobs > $max_jobs) { + if ($num_jobs >= $max_jobs) { printf "Waiting for %d indexing jobs to complete\n", $max_jobs; while (1) { my $wait_return = wait(); @@ -974,13 +1081,14 @@ if (exists $result{'iterations'}) { $$this_sample{'num'} = $sample_num; $$this_sample{'status'} = $samp_status; + # Keeping these persistent id files (and only these files) consistent for cdmv7 and v8 if (exists $$samp_persist_ids_ref{'samples'}{'id'}) { - $$this_sample{'id'} = $$samp_persist_ids_ref{'samples'}{'id'}; - debug_log(sprintf "Found existing persistent ID %s for sample %d\n", $$this_sample{'id'}, $$this_sample{'num'}); + $$this_sample{$samp_id_field} = $$samp_persist_ids_ref{'samples'}{'id'}; + debug_log(sprintf "Found existing persistent ID %s for sample %d\n", $$this_sample{$samp_id_field}, $$this_sample{'num'}); } else { $$samp_persist_ids_ref{'samples'}{'id'} = Data::UUID->new->create_str(); - $$this_sample{'id'} = $$samp_persist_ids_ref{'samples'}{'id'}; - debug_log(sprintf "Creating new persistent ID %s for sample %d\n", $$this_sample{'id'}, $$this_sample{'num'}); + $$this_sample{$samp_id_field} = $$samp_persist_ids_ref{'samples'}{'id'}; + debug_log(sprintf "Creating new persistent ID %s for sample %d\n", $$this_sample{$samp_id_field}, $$this_sample{'num'}); $update_samp_persist_ids_file++; } @@ -1052,11 +1160,11 @@ if (exists $result{'iterations'}) { last; } } - if (! defined $period{'id'}) { - my %period_id = ( 'name' => $period{'name'}, 'id' => Data::UUID->new->create_str() ); - debug_log(sprintf "Creating persistent ID %s for period %s\n", $period_id{'id'}, $period_id{'name'}); + if (! defined $period{'period-uuid'}) { + my %period_id = ( 'name' => $period{'name'}, 'period-uuid' => Data::UUID->new->create_str() ); + debug_log(sprintf "Creating persistent ID %s for period %s\n", $period_id{'period-uuid'}, $period_id{'name'}); push @{ $$samp_persist_ids_ref{'periods'} }, \%period_id; - $period{'id'} = $period_id{'id'}; + $period{'period-uuid'} = $period_id{'period-uuid'}; $update_samp_persist_ids_file++; } @@ -1237,7 +1345,7 @@ if (exists $result{'tags'}) { if (scalar @pids > 0) { printf "Waiting for %d indexing jobs to complete\n", scalar @pids; while (1) { - my $wait_return = wait(); + my $wait_return = wait(); if ($wait_return < 0) { last; } diff --git a/rickshaw-post-process-bench b/rickshaw-post-process-bench index 47821091..6d0b0f09 100755 --- a/rickshaw-post-process-bench +++ b/rickshaw-post-process-bench @@ -52,12 +52,11 @@ my %run; # A multi-dimensional, nested hash, schema TBD my $base_run_dir; my $run_file; # 'rickshaw-run.json' containing all configuration data # (generated by 'rickshaw-run' once a run is complete) -my $result_file; # 'rickshaw-result.json' containing all configuration and result data - # (generated by this script) my $file_rc; my %ids_to_benchmark; my %benchmark_to_ids; my %bench_dirs; +my $update_run_json = 0; # Track if we update %run and therefore need to write rickshaw-run.json sub usage { print "\nusage:\n\n"; @@ -80,8 +79,19 @@ sub dump_params { my $bench = $$param{'benchmark'}; my $id; if (exists $$param{'id'}) { - $id = $$param{'id'}; + if ($$param{'id'} =~ /^[0-9]+$/) { + # If this looks like a client/server id, keep it, as that's + # what 'id' was intended for in benchmark params + $id = $$param{'id'}; + } elsif ($$param{'id'} =~ /^[A-Z0-9]{,8}-[A-Z0-9]{,4}-[A-Z0-9]{,4}-[A-Z0-9]{,4}-[A-Z0-9]{,12}$/) { + # if this looks like "DDA62846-9603-11EF-A8EA-9E3E768A00E9", + # then this is incorrect and must be fixed. + $$param{'param-uuid'} = $$param{'id'}; + delete $$param{'id'}; + $update_run_json++; + } } + # fallback to client role when role is undefined in json my $role = $$param{'role'} // $default_role; @@ -114,7 +124,6 @@ my $rickshaw_project_dir; my $bench_schema_file = $rickshaw_project_dir . "/schema/benchmark.json"; my $bench_metric_schema_file = $rickshaw_project_dir . "/schema/bench-metric.json"; my $run_schema_file = $rickshaw_project_dir . "/schema/run.json"; -my $result_schema_file = $rickshaw_project_dir . "/schema/result.json"; my %bench_configs; # Process the cmdline params @@ -274,7 +283,7 @@ for (my $i = 1; $i <= scalar @{ $run{'iterations'} }; $i++) { printf "Working on " . $this_samp_dir . "\n"; if (opendir(my $samp_dh, $run_dir . "/" . $this_samp_dir)) { my @cs_names = grep(/^(client|server)$/, readdir($samp_dh)); - for my $cs_name (@cs_names) { + for my $cs_name (@cs_names) { my $cs_name_dir = $this_samp_dir . "/" . $cs_name; if (opendir(my $cs_name_dh, $run_dir . "/" . $cs_name_dir)) { my @cs_ids = grep(/^(\d+)$/, readdir($cs_name_dh)); @@ -332,3 +341,13 @@ for (my $i = 1; $i <= scalar @{ $run{'iterations'} }; $i++) { } } } + + +if ($update_run_json > 0) { + debug_log(sprintf "run data changed, so writing %s\n", $run_file); + my $update_rc = put_json_file($run_file, \%run, $run_schema_file); + if ($update_rc > 0) { + print "Could not update rickshaw-run file\n"; + exit 1; + } +} diff --git a/schema/bench-params.json b/schema/bench-params.json index e3e2f53c..9f487081 100644 --- a/schema/bench-params.json +++ b/schema/bench-params.json @@ -27,6 +27,9 @@ "all" ] }, + "uuid": { + "type": "string" + }, "id": { "type": "string" } diff --git a/schema/run.json b/schema/run.json index 12992e14..18df2be6 100644 --- a/schema/run.json +++ b/schema/run.json @@ -129,6 +129,10 @@ "items": { "type": "object", "properties": { + "iteration-uuid": { + "type": "string", + "pattern": "^.+$" + }, "id": { "type": "string", "pattern": "^.+$" @@ -138,6 +142,10 @@ "items": { "type": "object", "properties": { + "param-uuid": { + "type": "string", + "pattern": "^.+$" + }, "id": { "type": "string", "pattern": "^.+$" @@ -205,6 +213,10 @@ "type": "string", "pattern": "^.+$" }, + "run-uuid": { + "type": "string", + "pattern": "^.+$" + }, "run-id": { "type": "string", "pattern": "^.+$" diff --git a/schema/sample-persistent-ids.json b/schema/sample-persistent-ids.json index 7a14a2e9..4802ec88 100644 --- a/schema/sample-persistent-ids.json +++ b/schema/sample-persistent-ids.json @@ -31,11 +31,12 @@ "id": { "type": "string", "pattern": "^.+$" + }, + "sample-uuid": { + "type": "string", + "pattern": "^.+$" } }, - "required": [ - "id" - ], "additionalProperties": false }, "periods": { @@ -47,15 +48,15 @@ "type": "string", "pattern": "^.+$" }, + "period-uuid": { + "type": "string", + "pattern": "^.+$" + }, "name": { "type": "string", "pattern": "^.+$" } }, - "required": [ - "id", - "name" - ], "additionalProperties": false }, "additionalItems": false From ef00b61a8a0c6e9ee8fb4cd063e858d30ae912a8 Mon Sep 17 00:00:00 2001 From: Andrew Theurer Date: Mon, 11 Nov 2024 18:17:40 -0500 Subject: [PATCH 2/3] Fixes - assign cdm version earlier - fix period persisten id field name --- rickshaw-index | 107 ++++++++++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 58 deletions(-) diff --git a/rickshaw-index b/rickshaw-index index bafb10ad..d3b82ab2 100755 --- a/rickshaw-index +++ b/rickshaw-index @@ -87,6 +87,10 @@ sub add_persistent_uuid { # V7 uses the older field name, 'id' if (exists $$doc_ref{$old_field_name}) { # Already exists, nothing to do + if (not defined $$doc_ref{$old_field_name}) { + print "WHY is this null?" . Dumper $doc_ref; + exit 1; + } } else { $$doc_ref{$old_field_name} = Data::UUID->new->create_str(); $$update_var_ref++; @@ -179,9 +183,6 @@ sub create_es_doc { my $sample_idx = shift; my $period_idx = shift; - - - my %es_doc = ( 'cdm' => \%cdm ); for my $field ($run_id_field, qw(harness host email name source begin end benchmark)) { if (exists $result{$field} and defined $result{$field}) { @@ -792,53 +793,6 @@ if ($fixup_status != 0) { exit $fixup_status; } -# start processing rickshaw-run.json "for real" -debug_log(sprintf "Opening %s for normal processing\n", $run_file); -($file_rc, my $result_ref) = get_json_file($run_file, $result_schema_file); -if ($file_rc > 0 or ! defined $result_ref) { - print "Could not open the rickshaw-run file\n"; - exit 1; -} -if (defined $result_ref) { - %result = %{ $result_ref }; -} else { - printf "Could not find or load rickshaw-run.json in %s, exiting\n", $run_dir; - exit 1; -} - -# add persistent IDs to the result data if it doesn't already exist -if (exists $result{'iterations'}) { - - debug_log(sprintf "Making sure %s has persistent IDs\n", $run_file); - for my $iteration (@{ $result{'iterations'} }) { - add_persistent_uuid($iteration, "iteration", \$update_run_json); - #if (! exists $$iteration{'id'}) { - #$$iteration{'id'} = Data::UUID->new->create_str(); - #debug_log(sprintf "Adding persistent iteration ID %s\n", $$iteration{'id'}); - #$update_run_json++; - #} - - for my $parameter (@{ $$iteration{'params'} }) { - add_persistent_uuid($parameter, "param", \$update_run_json); - #if (! exists $$parameter{'id'}) { - #$$parameter{'id'} = Data::UUID->new->create_str(); - #debug_log(sprintf "Adding persistent parameter ID %s\n", $$parameter{'id'}); - #$update_run_json++; - #} - } - } - - if ($update_run_json > 0) { - debug_log(sprintf "Added %d persistent IDs to %s\n", $update_run_json, $run_file); - debug_log(sprintf "Overwriting %s after persistent ID update\n", $run_file); - my $update_rc = put_json_file($run_file, \%result, $result_schema_file); - if ($update_rc > 0) { - print "Could not add persistent IDs to rickshaw-run file\n"; - exit 1; - } - } -} - # Find the newest CDM version and verify all the required indices are present my $idx_resp_ref = http_request("GET", "localhost:9200", "_cat/indices?format=json", ''); my $latest_ver; @@ -911,6 +865,44 @@ if ($cdm{'ver'} eq 'v7dev') { $metric_id_field = 'metric_desc-uuid'; } +# start processing rickshaw-run.json "for real" +debug_log(sprintf "Opening %s for normal processing\n", $run_file); +($file_rc, my $result_ref) = get_json_file($run_file, $result_schema_file); +if ($file_rc > 0 or ! defined $result_ref) { + print "Could not open the rickshaw-run file\n"; + exit 1; +} +if (defined $result_ref) { + %result = %{ $result_ref }; +} else { + printf "Could not find or load rickshaw-run.json in %s, exiting\n", $run_dir; + exit 1; +} + +# add persistent IDs to the result data if it doesn't already exist +if (exists $result{'iterations'}) { + + debug_log(sprintf "Making sure %s has persistent IDs\n", $run_file); + for my $iteration (@{ $result{'iterations'} }) { + add_persistent_uuid($iteration, "iteration", \$update_run_json); + + for my $parameter (@{ $$iteration{'params'} }) { + add_persistent_uuid($parameter, "param", \$update_run_json); + } + } + + if ($update_run_json > 0) { + debug_log(sprintf "Added %d persistent IDs to %s\n", $update_run_json, $run_file); + debug_log(sprintf "Overwriting %s after persistent ID update\n", $run_file); + my $update_rc = put_json_file($run_file, \%result, $result_schema_file); + if ($update_rc > 0) { + print "Could not add persistent IDs to rickshaw-run file\n"; + exit 1; + } + } +} + + printf "Exporting from %s to OpenSearch documents and POSTing to localhost:9200\n", $run_file; if (exists $result{'run-id'} and defined $result{'run-id'}) { @@ -920,9 +912,6 @@ if (exists $result{'run-id'} and defined $result{'run-id'}) { } add_persistent_uuid(\%result, "run", \$update_run_json); -#if (not exists $result{'id'} or not defined $result{'id'}) { - #$result{'id'} = Data::UUID->new->create_str(); -#} printf "run-uuid: %s\n", $result{$run_id_field}; @@ -1156,15 +1145,17 @@ if (exists $result{'iterations'}) { foreach my $period_id (@{ $$samp_persist_ids_ref{'periods'} }) { if ($period{'name'} eq $$period_id{'name'}) { debug_log(sprintf "Found persistent ID %s for period name %s\n", $$period_id{'id'}, $period{'name'}); - $period{'id'} = $$period_id{'id'}; + # The id field from $samp_persist_ids_ref is always 'id' because we can't + # have a 'rewquired-field' in the json-schema be [id|period-uuid]; + $period{$period_id_field} = $$period_id{'id'}; last; } } - if (! defined $period{'period-uuid'}) { - my %period_id = ( 'name' => $period{'name'}, 'period-uuid' => Data::UUID->new->create_str() ); - debug_log(sprintf "Creating persistent ID %s for period %s\n", $period_id{'period-uuid'}, $period_id{'name'}); + if (! defined $period{$period_id_field}) { + my %period_id = ( 'name' => $period{'name'}, 'id' => Data::UUID->new->create_str() ); + debug_log(sprintf "Creating persistent ID %s for period %s\n", $period_id{'id'}, $period_id{'name'}); push @{ $$samp_persist_ids_ref{'periods'} }, \%period_id; - $period{'period-uuid'} = $period_id{'period-uuid'}; + $period{$period_id_field} = $period_id{'id'}; $update_samp_persist_ids_file++; } From 410b77320ef05a1fc66e006df395e1be5f3c3d64 Mon Sep 17 00:00:00 2001 From: Andrew Theurer Date: Wed, 13 Nov 2024 13:40:28 -0500 Subject: [PATCH 3/3] cleanup --- rickshaw-index | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/rickshaw-index b/rickshaw-index index d3b82ab2..00e0009d 100755 --- a/rickshaw-index +++ b/rickshaw-index @@ -404,7 +404,7 @@ sub write_queued_es_docs { sub wait_for_metric_descs { my @terms = @_; my $attempts = 1; - my $max_attempts = 8; + my $max_attempts = 20; my $submitted_metric_descs = scalar @terms; my $found_metric_descs = 0; @@ -539,11 +539,6 @@ sub index_metrics { debug_log(sprintf "Making sure %s has persistent IDs\n", $metr_json_file); my $update_metric_file = 0; for my $this_metr ( @$metr_ref ) { - #if (! exists $$this_metr{'id'}) { - #$$this_metr{'id'} = Data::UUID->new->create_str(); - #debug_log(sprintf "Adding persistent metric ID %s\n", $$this_metr{'id'}); - #$update_metric_file++; - #} add_persistent_uuid($this_metr, "metric_desc", \$update_metric_file); } if ($update_metric_file > 0) { @@ -623,7 +618,8 @@ sub index_metrics { $ndjson .= sprintf "%s\n", '{ "index": {} }'; $ndjson .= sprintf "%s\n", $metr_data_doc_json; $count++; - if ($count >= 200) { + # Limit the batch size to avoid a http error with too large of a request + if ($count >= 1000) { if ($index_or_queue eq "index") { # OpenSearch docs type metric_data do not contain other sections run, iteration, sample, period, metric_desc, # as this would take up sunstantially more space for potentially millions of documents.