Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Y24-120 Part-2 Bug taken link error when creating sequencing batches #4340

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8d6d55e
Add migration for unique-together index on ancestor and descendant
yoldas Sep 12, 2024
5896e32
Applied migration to db schema
yoldas Sep 12, 2024
ab1f587
Include column names in the unique index
yoldas Sep 23, 2024
775d85e
Apply migration to schema
yoldas Sep 23, 2024
1b0fb4e
Override create_edge on asset_links to handle race conditions
yoldas Sep 23, 2024
e601915
Merge branch 'develop' into 4133-y24-120-bug-taken-link-error-when-cr…
yoldas Sep 23, 2024
d720c8e
Bubble up non-duplication errors
yoldas Sep 23, 2024
6b9b83d
Remove custom exception class definitions from test
yoldas Sep 23, 2024
480e3d3
Update comment about return values
yoldas Sep 23, 2024
937460d
Merge branch 'develop' into 4133-y24-120-bug-taken-link-error-when-cr…
yoldas Sep 23, 2024
dc6a704
Add guard against nil before converting link to edge
yoldas Sep 24, 2024
80d377a
Replace the recover proecedure with auditing as it is wrong to put du…
yoldas Sep 24, 2024
bac2c98
Rubocop
yoldas Sep 24, 2024
03124f4
Clean the database after tests because of the new DB connections in t…
yoldas Sep 24, 2024
dee9a7e
Removed after block
yoldas Sep 24, 2024
c4a13a8
Add after all cleanup
yoldas Sep 24, 2024
5862b9f
Merge branch 'develop' into 4133-y24-120-bug-taken-link-error-when-cr…
yoldas Sep 24, 2024
165dd5e
Delete test asset link and labware records
yoldas Sep 24, 2024
6f50b2c
Debug side effect
yoldas Sep 24, 2024
f04d1cf
Debug 2
yoldas Sep 24, 2024
bc857a2
Revert debug
yoldas Sep 24, 2024
0aec888
Debug handles race condition at find_link
yoldas Sep 24, 2024
a69265a
Debug2 handles race condition at find_link
yoldas Sep 24, 2024
4ebd702
Rewrite test for unique validation error
yoldas Sep 25, 2024
9ea98ae
Rewrite example handles unique constraint violation
yoldas Sep 25, 2024
b393fdd
Update tests for create_edge method
yoldas Sep 25, 2024
0ac25f7
Deleted old version of the tests
yoldas Sep 25, 2024
ca87241
Remove redundant require
yoldas Sep 25, 2024
068d0b9
Merge branch 'develop' into 4133-y24-120-bug-taken-link-error-when-cr…
yoldas Sep 25, 2024
ca3a8eb
Merge branch 'develop' into 4133-y24-120-bug-taken-link-error-when-cr…
yoldas Sep 27, 2024
80954a0
Add UAT action to generate randomised FluidX barcodes
yoldas Oct 2, 2024
5bb6bb8
Fill errors collection instead of raising exception
yoldas Oct 2, 2024
8531eae
Continue index in the next iteration
yoldas Oct 2, 2024
f7a918a
Separate random part and index with zero
yoldas Oct 2, 2024
e3bfcb2
Merge branch 'develop' into 4133-y24-120-bug-taken-link-error-when-cr…
yoldas Oct 2, 2024
e81b4a0
Deleted file as it does not belong to this PR
yoldas Oct 2, 2024
c9a058a
Rubocop
yoldas Oct 2, 2024
a9ff092
Deleting the part-1 test as it is conflicting with part-2
yoldas Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions app/models/asset_link.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,81 @@
end
end
end

# Creates an edge between the ancestor and descendant nodes using save.
#
# This method first attempts to find an existing link between the ancestor
# and descendant. If no link is found, it builds a new edge and saves it.
# If a link is found, it makes the link an edge and saves it.
#
# This method is overridden to handle race conditions in finding an
# existing link and has_duplicates validation. It also assumes that there
# is a unique-together index on ancestor_id and descendant_id columns.
#
# @param ancestor [Dag::Standard::EndPoint] The ancestor node.
# @param descendant [Dag::Standard::EndPoint] The descendant node.
# @return [Boolean] Returns true if the edge is successfully created or
# already exists, false otherwise.
# @raise [ActiveRecord::RecordNotUnique] Re-raises any exception if it is
# not a constraint violation that involves ancestor_id and descendant_id
# columns.
def self.create_edge(ancestor, descendant)
# Two processes try to find an existing link.
link = find_link(ancestor, descendant)
# Either or both may find no link and try to create a new edge.
if link.nil?
edge = build_edge(ancestor, descendant)
result = save_edge_or_handle_error(edge)
return result unless result.nil? # Bubble up.
# Losing process finds the edge created by the winning process.
link = find_link(ancestor, descendant)

Check warning on line 106 in app/models/asset_link.rb

View check run for this annotation

Codecov / codecov/patch

app/models/asset_link.rb#L106

Added line #L106 was not covered by tests
end

return if link.nil?

link.make_direct
link.changed? ? link.save : true
end

# Saves the edge between the ancestor and descendant nodes or handles errors.
#
# @param edge [AssetLink] The edge object containing the errors.
# @return [Boolean] Returns true if the edge is successfully saved,
# nil if the error is unique validation or constraint violation,
# false if the error is another validation error.
# @raise [ActiveRecord::RecordNotUnique] Re-raises an exception if the
# exception caught is not a unique constraint violation.
def self.save_edge_or_handle_error(edge)
# Winning process successfully saves the edge (direct link).
return true if edge.save
# has_duplicate validation may see it for the losing process before
# hitting the DB.
return false unless unique_validation_error?(edge) # Bubble up.
edge.errors.clear # Clear all errors and use the existing link.

Check warning on line 129 in app/models/asset_link.rb

View check run for this annotation

Codecov / codecov/patch

app/models/asset_link.rb#L129

Added line #L129 was not covered by tests
rescue ActiveRecord::RecordNotUnique => e
# Unique constraint violation is triggered for the losing process after
# hitting the DB.
raise unless unique_violation_error?(edge, e) # Bubble up.

Check warning on line 133 in app/models/asset_link.rb

View check run for this annotation

Codecov / codecov/patch

app/models/asset_link.rb#L133

Added line #L133 was not covered by tests
end

# Checks if the validation error includes a specific message indicating a
# unique link already exists.
#
# @param edge [AssetLink] The edge object containing the errors.
# @return [Boolean] Returns true if the errors include the message "Link
# already exists between these points", false otherwise.
def self.unique_validation_error?(edge)
edge.errors[:base].include?('Link already exists between these points')
end

# Checks if the unique constraint violation involves the specified columns.
#
# @param edge [AssetLink] The edge object containing the column names.
# @param exception [ActiveRecord::RecordNotUnique] The exception raised due
# to the unique constraint violation.
# @return [Boolean] Returns true if the exception message includes both the
# ancestor and descendant column names, false otherwise.
def self.unique_violation_error?(edge, exception)
[edge.ancestor_id_column_name, edge.descendant_id_column_name].all? { |col| exception.message.include?(col) }

Check warning on line 154 in app/models/asset_link.rb

View check run for this annotation

Codecov / codecov/patch

app/models/asset_link.rb#L154

Added line #L154 was not covered by tests
end
end
26 changes: 26 additions & 0 deletions db/migrate/20240912000109_add_unique_index_to_asset_links.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# This migration adds a unique-together index on ancestor_id and descendant_id
# in order to prevent duplicate links between the same ancestor and descendant
# labware.
#
# Before this migration, the database allowed duplicate asset_links between the
# same ancestor and descendant labware. Therefore, the migration will fail if
# there are any duplicate links in the database. To fix this, they must be
# removed before running the migration using the rake task:
#
# bundle exec rake 'support:remove_duplicate_asset_links[csv_file_path]'
#
# The rake task will write the removed records into a CSV file that can be used
# for auditing purposes.
#
# Note that the column names in the index name below is used for finding the
# reason of the database unique constraint violation by the AssetLink model.
class AddUniqueIndexToAssetLinks < ActiveRecord::Migration[6.1]
def change
add_index :asset_links,
%i[ancestor_id descendant_id],
unique: true,
name: 'index_asset_links_on_ancestor_id_and_descendant_id'
end
end
1 change: 1 addition & 0 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
t.integer "count"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["ancestor_id", "descendant_id"], name: "index_asset_links_on_ancestor_id_and_descendant_id", unique: true
t.index ["ancestor_id", "direct"], name: "index_asset_links_on_ancestor_id_and_direct"
t.index ["descendant_id", "direct"], name: "index_asset_links_on_descendant_id_and_direct"
end
Expand Down
280 changes: 280 additions & 0 deletions spec/models/asset_links_create_edge_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe AssetLink, type: :model do
# rubocop:disable RSpec/InstanceVariable,Metrics/MethodLength,RSpec/ExampleLength,RSpec/MultipleExpectations
# Test the overridden create_edge class method.
describe '.create_edge' do
# Wait for child processes to finish.
#
# @param pids [Array<Integer>] Process IDs
# @raise [Timeout::Error] If a child process does not finish within 10 seconds.
# @return [void]
def wait_for_child_processes(pids)
exits = []
pids
.each
.with_index(1) do |pid, index|
Timeout.timeout(10) do
_pid, status = Process.wait2(pid)
exits << status.exitstatus
end
rescue Timeout::Error
raise StandardError, "parent: Timeout waiting for child#{index} to finish"
end

expect(exits).to eq([0, 0])
end

# Delete created records.
after do
@ancestor&.destroy
@descendant&.destroy
@edge&.destroy
end

it 'handles race condition at find_link' do
# In this example, the first and second processes are forked from the
# main process and they are in race condition to find an existing link
# between the ancestor and descendant and create an edge if no link is
# found. Both first and second processes finds no link, but the second
# process creates the edge first. The first also tries to create the
# edge based on the result of the find_link method. Only one link must
# be created.

# Parent
ActiveRecord::Base.connection.reconnect!
@ancestor = ancestor = create(:labware)
@descendant = descendant = create(:labware)
ActiveRecord::Base.connection.commit_db_transaction

# Create a duplex pipe for IPC.
first_socket, second_socket = UNIXSocket.pair

# First child
pid1 =
fork do
ActiveRecord::Base.connection.reconnect!

find_link_call_count = 0
# Patch find_link method
allow(described_class).to receive(:find_link).and_wrap_original do |m, *args|
find_link_call_count += 1
link = m.call(*args)
if find_link_call_count == 1
expect(link).to be_nil # No link found
message = 'paused'
first_socket.send(message, 0) # Notify the second child
message = 'child1: Timeout waiting for resume message'
raise StandardError, message unless first_socket.wait_readable(10)
message = 'resume' # Wait for the second child to create the edge
first_socket.recv(message.size)
elsif find_link_call_count == 2
expect(link).not_to be_nil # Found one now
end
link
end
described_class.create_edge(ancestor, descendant)

ActiveRecord::Base.connection.close
end

# Second child
pid2 =
fork do
ActiveRecord::Base.connection.reconnect!
message = 'child2: Timeout waiting for paused message'
raise StandardError, message unless second_socket.wait_readable(10)
message = 'paused' # Wait for the first child to call find_link
second_socket.recv(message.size)
described_class.create_edge(ancestor, descendant)
message = 'resume' # Notify the first child
second_socket.send(message, 0)
ActiveRecord::Base.connection.close
end

# Parent
wait_for_child_processes([pid1, pid2])

expect(described_class.where(ancestor:, descendant:).count).to eq(1)
@edge = edge = described_class.last
expect(edge.ancestor).to eq(ancestor)
expect(edge.descendant).to eq(descendant)
expect(edge.direct).to be_truthy
expect(edge.count).to eq(1)

ActiveRecord::Base.connection.close
end

it 'handles unique validation error' do
# In this example, the first and second processes are forked from the
# main process. Neither of them finds and existing link between the
# ancestor and descendant. Both try to create the edge. One of them
# succeeds and the other one fails due to the has_duplicates validation.
# Failing process should use the existing link.

# Parent
ActiveRecord::Base.connection.reconnect!
@ancestor = ancestor = create(:labware)
@descendant = descendant = create(:labware)
ActiveRecord::Base.connection.commit_db_transaction

# Create a duplex pipe for IPC.
first_socket, second_socket = UNIXSocket.pair

# First child
pid1 =
fork do
ActiveRecord::Base.connection.reconnect!
find_link_call_count = 0
allow(described_class).to receive(:find_link).and_wrap_original do |m, *args|
find_link_call_count += 1
link = m.call(*args)
if find_link_call_count == 1
expect(link).to be_nil # No link found
message = 'paused' # Notify the second child
first_socket.send(message, 0)
message = 'child1: Timeout waiting for resume message'
raise StandardError, message unless first_socket.wait_readable(10)
message = 'resume' # Wait for the second child to create the edge
first_socket.recv(message.size)
end
link
end

# Patch unique_validation_error? method
unique_validation_error_return_value = nil
allow(described_class).to receive(:unique_validation_error?).and_wrap_original do |m, *args|
unique_validation_error_return_value = m.call(*args)
end

result = described_class.create_edge(ancestor, descendant)
expect(result).to be_truthy
expect(described_class).to have_received(:unique_validation_error?)
expect(unique_validation_error_return_value).to be_truthy

ActiveRecord::Base.connection.close
end

# Second child
pid2 =
fork do
ActiveRecord::Base.connection.reconnect!
message = 'child2: Timeout waiting for paused message'
raise StandardError, message unless second_socket.wait_readable(10)
message = 'paused' # Wait for the first child to call find_link
second_socket.recv(message.size)
described_class.create_edge(ancestor, descendant)
message = 'resume' # Notify the first child
second_socket.send(message, 0)
ActiveRecord::Base.connection.close
end

# Parent
wait_for_child_processes([pid1, pid2])

expect(described_class.where(ancestor:, descendant:).count).to eq(1)
@edge = edge = described_class.last
expect(edge.ancestor).to eq(ancestor)
expect(edge.descendant).to eq(descendant)
expect(edge.direct).to be_truthy
expect(edge.count).to eq(1)
end

it 'handles unique constraint violation' do
# In this example, the first and second processes are forked from the
# main process. Neither of them finds and existing link between the
# ancestor and descendant. Both try to create the edge. One of them
# succeeds and the other one fails due to the database unique constraint
# violation. Failing process should use the existing link.

# Parent
ActiveRecord::Base.connection.reconnect!
@ancestor = ancestor = create(:labware)
@descendant = descendant = create(:labware)
ActiveRecord::Base.connection.commit_db_transaction

# Create a duplex pipe for IPC.
first_socket, second_socket = UNIXSocket.pair

# First child
pid1 =
fork do
ActiveRecord::Base.connection.reconnect!
find_link_call_count = 0
allow(described_class).to receive(:find_link).and_wrap_original do |m, *args|
find_link_call_count += 1
link = m.call(*args)
if find_link_call_count == 1
expect(link).to be_nil
message = 'paused' # Notify the second child
first_socket.send(message, 0)
message = 'child1: Timeout waiting for resume message'
raise StandardError, message unless first_socket.wait_readable(10)
message = 'resume' # Wait for the second child to create the edge
first_socket.recv(message.size)
end
link
end

# Patch has_duplicates method.
# rubocop:disable RSpec/AnyInstance
allow_any_instance_of(Dag::CreateCorrectnessValidator).to receive(:has_duplicates).and_return(false)
# rubocop:enable RSpec/AnyInstance

# Patch save_edge_or_handle_error method.
allow(described_class).to receive(:save_edge_or_handle_error).and_wrap_original do |method, *args|
edge = args[0]
message =
"Duplicate entry '#{ancestor.id}-#{descendant.id}' " \
"for key 'index_asset_links_on_ancestor_id_and_descendant_id'"
exception = ActiveRecord::RecordNotUnique.new(message)
allow(edge).to receive(:save).and_raise(exception)

method.call(*args)
end

# Patch unique_violation_error? method.
unique_violation_error_return_value = nil
allow(described_class).to receive(:unique_violation_error?).and_wrap_original do |m, *args|
unique_violation_error_return_value = m.call(*args)
end

result = described_class.create_edge(ancestor, descendant)
expect(result).to be_truthy
expect(described_class).to have_received(:unique_violation_error?)
expect(unique_violation_error_return_value).to be_truthy

ActiveRecord::Base.connection.close
end

# Second child
pid2 =
fork do
ActiveRecord::Base.connection.reconnect!
message = 'child2: Timeout waiting for paused message'
raise StandardError, message unless second_socket.wait_readable(10)
message = 'paused' # Wait for the first child to call find_link
second_socket.recv(message.size)
described_class.create_edge(ancestor, descendant)
message = 'resume' # Notify the first child
second_socket.send(message, 0)
ActiveRecord::Base.connection.close
end

# Parent
wait_for_child_processes([pid1, pid2])

expect(described_class.where(ancestor:, descendant:).count).to eq(1)
@edge = edge = described_class.last
expect(edge.ancestor).to eq(ancestor)
expect(edge.descendant).to eq(descendant)
expect(edge.direct).to be_truthy
expect(edge.count).to eq(1)

ActiveRecord::Base.connection.close
end
end
# rubocop:enable RSpec/InstanceVariable,Metrics/MethodLength,RSpec/ExampleLength,RSpec/MultipleExpectations
end
Loading
Loading