Skip to content

Commit

Permalink
Make kv stale detection reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
ralsina committed Jul 7, 2023
1 parent 441dc83 commit 3b478ba
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
15 changes: 9 additions & 6 deletions spec/croupier_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ describe "Task" do
TaskManager.modified << "input"

# Only tasks depending on "input" or that have no inputs should be stale
tasks.values.count(&.stale?).should eq 4
# tasks.values.count(&.stale?).should eq 4
tasks.keys.select { |k| tasks[k].stale? }.should eq ["output1", "output2", "output3", "output4"]
end
end
Expand All @@ -390,23 +390,26 @@ describe "Task" do
end
end

it "should always consider tasks with kv inputs as stale" do
it "should not consider tasks with kv inputs as stale unless modified" do
with_scenario("empty") do
t = Task.new(id: "t", inputs: ["kv://foo"])
t.stale?.should be_true
t.run
t.stale = true
t.stale?.should be_false
TaskManager.modified << "kv://foo"
t.stale?.should be_true
end
end

it "should always consider tasks with kv outputs as stale" do
it "should consider tasks with missing kv outputs as stale" do
with_scenario("empty") do
p = TaskProc.new { "bar" }
t = Task.new(id: "t", inputs: ["foo"], outputs: ["kv://bar"], proc: p)
t.stale?.should be_true
t = Task.new(id: "t", inputs: ["kv://foo"], outputs: ["kv://bar"], proc: p)
t.run
t.stale = true
TaskManager.get("bar").should eq "bar"
t.stale?.should be_false
TaskManager.@_store.delete("bar")
t.stale?.should be_true
end
end
Expand Down
27 changes: 14 additions & 13 deletions src/croupier.cr
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,20 @@ module Croupier
return true if @always_run || @inputs.empty?
# Tasks don't get stale twice
return false unless @stale
# An input is from the k/v store
@inputs.any?(&.lchop?("kv://")) ||
# An output is from the k/v store
@outputs.any?(&.lchop?("kv://")) ||
# An output file is missing
@outputs.any? { |output| !File.exists?(output) } ||
# Any input file is modified
@inputs.any? { |input| TaskManager.modified.includes? input } ||
# Any input file is created by a stale task
@inputs.any? { |input|
TaskManager.tasks.has_key?(input) &&
TaskManager.tasks[input].stale?
}

file_outputs = @outputs.reject(&.lchop?("kv://"))
kv_outputs = @outputs.select(&.lchop?("kv://")).map(&.lchop("kv://"))

# ameba:disable Lint/UselessAssign
(missing_file_outputs = file_outputs.any? { |output| !File.exists?(output) }) ||
# ameba:disable Lint/UselessAssign
(missing_kv_outputs = kv_outputs.any? { |output| !TaskManager.get(output) }) ||
# ameba:disable Lint/UselessAssign
(modified_inputs = inputs.any? { |input| TaskManager.modified.includes? input }) ||
# ameba:disable Lint/UselessAssign
(stale_inputs = @inputs.any? { |input| TaskManager.tasks.has_key?(input) && TaskManager.tasks[input].stale? })

# p! missing_file_outputs, missing_kv_outputs, modified_inputs, stale_inputs
end

# A task is ready if it is stale but all its inputs are not.
Expand Down

0 comments on commit 3b478ba

Please sign in to comment.