From 3b478ba5f5996835e20ab639e0689ac6b43ceb79 Mon Sep 17 00:00:00 2001 From: Roberto Alsina Date: Fri, 7 Jul 2023 16:59:24 -0300 Subject: [PATCH] Make kv stale detection reliable --- spec/croupier_spec.cr | 15 +++++++++------ src/croupier.cr | 27 ++++++++++++++------------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/spec/croupier_spec.cr b/spec/croupier_spec.cr index 8a3f5eb..d510a2a 100644 --- a/spec/croupier_spec.cr +++ b/spec/croupier_spec.cr @@ -374,7 +374,7 @@ describe "Task" do TaskManager.modified << "input" # Only tasks depending on "input" or that have no inputs should be stale - tasks.values.count(&.stale?).should eq 4 + # tasks.values.count(&.stale?).should eq 4 tasks.keys.select { |k| tasks[k].stale? }.should eq ["output1", "output2", "output3", "output4"] end end @@ -390,23 +390,26 @@ describe "Task" do end end - it "should always consider tasks with kv inputs as stale" do + it "should not consider tasks with kv inputs as stale unless modified" do with_scenario("empty") do t = Task.new(id: "t", inputs: ["kv://foo"]) - t.stale?.should be_true t.run t.stale = true + t.stale?.should be_false + TaskManager.modified << "kv://foo" t.stale?.should be_true end end - it "should always consider tasks with kv outputs as stale" do + it "should consider tasks with missing kv outputs as stale" do with_scenario("empty") do p = TaskProc.new { "bar" } - t = Task.new(id: "t", inputs: ["foo"], outputs: ["kv://bar"], proc: p) - t.stale?.should be_true + t = Task.new(id: "t", inputs: ["kv://foo"], outputs: ["kv://bar"], proc: p) t.run t.stale = true + TaskManager.get("bar").should eq "bar" + t.stale?.should be_false + TaskManager.@_store.delete("bar") t.stale?.should be_true end end diff --git a/src/croupier.cr b/src/croupier.cr index bbe1a01..08e3fb0 100644 --- a/src/croupier.cr +++ b/src/croupier.cr @@ -174,19 +174,20 @@ module Croupier return true if @always_run || @inputs.empty? # Tasks don't get stale twice return false unless @stale - # An input is from the k/v store - @inputs.any?(&.lchop?("kv://")) || - # An output is from the k/v store - @outputs.any?(&.lchop?("kv://")) || - # An output file is missing - @outputs.any? { |output| !File.exists?(output) } || - # Any input file is modified - @inputs.any? { |input| TaskManager.modified.includes? input } || - # Any input file is created by a stale task - @inputs.any? { |input| - TaskManager.tasks.has_key?(input) && - TaskManager.tasks[input].stale? - } + + file_outputs = @outputs.reject(&.lchop?("kv://")) + kv_outputs = @outputs.select(&.lchop?("kv://")).map(&.lchop("kv://")) + + # ameba:disable Lint/UselessAssign + (missing_file_outputs = file_outputs.any? { |output| !File.exists?(output) }) || + # ameba:disable Lint/UselessAssign + (missing_kv_outputs = kv_outputs.any? { |output| !TaskManager.get(output) }) || + # ameba:disable Lint/UselessAssign + (modified_inputs = inputs.any? { |input| TaskManager.modified.includes? input }) || + # ameba:disable Lint/UselessAssign + (stale_inputs = @inputs.any? { |input| TaskManager.tasks.has_key?(input) && TaskManager.tasks[input].stale? }) + + # p! missing_file_outputs, missing_kv_outputs, modified_inputs, stale_inputs end # A task is ready if it is stale but all its inputs are not.