Skip to content

Commit

Permalink
[CLIENT-3195] Fix ruby client does not return failed nodes on timeout.
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 17, 2024
1 parent fb54d1d commit dcd9078
Show file tree
Hide file tree
Showing 24 changed files with 113 additions and 101 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file.

## [4.2.0] 2024-12-18

- **Fixes**
- [CLIENT-3195] Fix ruby client does not return failed nodes on timeout.

## [4.1.0] 2024-10-22

- **New Features**
Expand Down
44 changes: 22 additions & 22 deletions lib/aerospike/aerospike_exception.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,79 +21,79 @@
module Aerospike
module Exceptions
class Aerospike < StandardError
attr_reader :result_code
attr_reader :result_code, :failed_nodes

def initialize(result_code, message = nil)
def initialize(result_code, message = nil, failed_nodes = nil)
@result_code = result_code
@failed_nodes = failed_nodes
message ||= ResultCode.message(result_code)
super(message)
end
end

class Timeout < Aerospike
attr_reader :timeout, :iterations, :failed_nodes, :failed_connections
attr_reader :timeout, :iterations, :failed_connections

def initialize(timeout, iterations, failed_nodes=nil, failed_connections=nil)
@timeout = timeout
@iterations = iterations
@failed_nodes = failed_nodes
@failed_connections = failed_connections

super(ResultCode::TIMEOUT)
super(ResultCode::TIMEOUT, nil, failed_nodes)
end
end

class InvalidCredentials < Aerospike
def initialize(msg = nil)
super(ResultCode::NOT_AUTHENTICATED, msg)
def initialize(msg = nil, node=nil)
super(ResultCode::NOT_AUTHENTICATED, msg, [node])
end
end

class Serialize < Aerospike
def initialize(msg=nil)
super(ResultCode::SERIALIZE_ERROR, msg)
super(ResultCode::SERIALIZE_ERROR, msg, [node])
end
end

class Parse < Aerospike
def initialize(msg=nil)
super(ResultCode::PARSE_ERROR, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::PARSE_ERROR, msg, [node])
end
end

class Connection < Aerospike
def initialize(msg=nil)
super(ResultCode::SERVER_NOT_AVAILABLE, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::SERVER_NOT_AVAILABLE, msg, [node])
end
end

class InvalidNode < Aerospike
def initialize(msg=nil)
super(ResultCode::INVALID_NODE_ERROR, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::INVALID_NODE_ERROR, msg, [node])
end
end

class ScanTerminated < Aerospike
def initialize(msg=nil)
super(ResultCode::SCAN_TERMINATED, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::SCAN_TERMINATED, msg, [node])
end
end

class QueryTerminated < Aerospike
def initialize(msg=nil)
super(ResultCode::QUERY_TERMINATED, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::QUERY_TERMINATED, msg, [node])
end
end

class CommandRejected < Aerospike
def initialize(msg=nil)
super(ResultCode::COMMAND_REJECTED, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::COMMAND_REJECTED, msg, [node])
end
end

class InvalidNamespace < Aerospike
def initialize(msg=nil)
super(ResultCode::INVALID_NAMESPACE, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::INVALID_NAMESPACE, msg, [node])
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def truncate(namespace, set_name = nil, before_last_update = nil, options = {})

response = send_info_command(policy, str_cmd, node).upcase
return if response == "OK"
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}")
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}", [node])
end

#-------------------------------------------------------
Expand Down
30 changes: 15 additions & 15 deletions lib/aerospike/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def batch_read_node(partition, replica_policy)
when Aerospike::Replica::RANDOM
random_node
else
raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value")
raise Aerospike::Exceptions::InvalidNode.new("invalid policy.replica value")
end
end

Expand All @@ -147,21 +147,21 @@ def read_node(partition, replica_policy, seq)
when Aerospike::Replica::RANDOM
random_node
else
raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value")
raise Aerospike::Exceptions::InvalidNode.new("invalid policy.replica value")
end
end

# Returns a node on the cluster for read operations
def master_node(partition)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

node_array = replica_array.get[0]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless node_array

node = node_array.get[partition.partition_id]
raise Aerospike::Exceptions::InvalidNode if !node || !node.active?
raise Aerospike::Exceptions::InvalidNode.new("no active node found") if !node || !node.active?

node
end
Expand All @@ -170,7 +170,7 @@ def master_node(partition)
def rack_node(partition, seq)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand All @@ -195,14 +195,14 @@ def rack_node(partition, seq)

return fallback if fallback

raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("no active node found")
end

# Returns a node on the cluster for read operations
def master_proles_node(partition)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand All @@ -214,14 +214,14 @@ def master_proles_node(partition)
return node if node && node.active?
end

raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("no active node found")
end

# Returns a random node on the cluster
def sequence_node(partition, seq)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand All @@ -233,7 +233,7 @@ def sequence_node(partition, seq)
return node if node && node.active?
end

raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("node active node found")
end

def get_node_for_key(replica_policy, key, is_write: false)
Expand All @@ -251,10 +251,10 @@ def node_partitions(node, namespace)

partition_map = partitions
replica_array = partition_map[namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

node_array = replica_array.get[0]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless node_array


pid = 0
Expand All @@ -281,7 +281,7 @@ def random_node

i = i.succ
end
raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("no active node found")
end

# Returns a list of all nodes in the cluster
Expand All @@ -296,7 +296,7 @@ def nodes
def get_node_by_name(node_name)
node = find_node_by_name(node_name)

raise Aerospike::Exceptions::InvalidNode unless node
raise Aerospike::Exceptions::InvalidNode.new("node `#{node_name}` not found") unless node

node
end
Expand Down
8 changes: 5 additions & 3 deletions lib/aerospike/cluster/partition_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def update_partitions(current_map)

info = info_map[REPLICAS_ALL]
if !info || info.length == 0
raise Aerospike::Exceptions::Connection.new("#{REPLICAS_ALL} response for node #{@node.name} is empty")
raise Aerospike::Exceptions::Connection.new("#{REPLICAS_ALL} response for node #{@node.name} is empty", @node)
end

@buffer = info
Expand Down Expand Up @@ -112,7 +112,8 @@ def parse_name
if namespace.length <= 0 || namespace.length >= 32
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid partition namespace #{namespace}. Response=#{response}"
"Invalid partition namespace #{namespace}. Response=#{response}",
@node
)
end

Expand All @@ -133,7 +134,8 @@ def parse_replica_count
if count < 0 || count > 4096
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid partition count #{count}. Response=#{response}"
"Invalid partition count #{count}. Response=#{response}",
@node
)
end

Expand Down
10 changes: 6 additions & 4 deletions lib/aerospike/cluster/rack_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def update_racks

info = info_map[RACK_IDS]
if !info || info.length == 0
raise Aerospike::Exceptions::Connection.new("#{RACK_IDS} response for node #{@node.name} is empty")
raise Aerospike::Exceptions::Connection.new("#{RACK_IDS} response for node #{@node.name} is empty", @node)
end

@buffer = info
Expand All @@ -54,7 +54,7 @@ def update_racks
namespace = parse_name
rack_id = parse_rack_id

@racks = {} if !@racks
@racks ||= {}
@racks[namespace] = rack_id
end

Expand All @@ -76,7 +76,8 @@ def parse_name
if namespace.length <= 0 || namespace.length >= 32
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid rack namespace #{namespace}. Response=#{response}"
"Invalid rack namespace #{namespace}. Response=#{response}",
@node
)
end

Expand All @@ -97,7 +98,8 @@ def parse_rack_id
if rack_id < 0
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid rack_id #{rack_id}. Response=#{response}"
"Invalid rack_id #{rack_id}. Response=#{response}",
@node
)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_index_exists_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def parse_row(result_code)
op_count = @data_buffer.read_int16(20)

if op_count > 0
raise Aerospike::Exceptions::Parse.new('Received bins that were not requested!')
raise Aerospike::Exceptions::Parse.new('Received bins that were not requested!', @node)
end

skip_key(field_count)
Expand Down
9 changes: 7 additions & 2 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def set_query(cluster, policy, statement, background, node_partitions)
if operations

unless background
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR)
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR, nil, [@node])
end

operations.each do |operation|
Expand Down Expand Up @@ -685,6 +685,7 @@ def set_query(cluster, policy, statement, background, node_partitions)

def execute
iterations = 0
failed_nodes = []

# set timeout outside the loop
limit = Time.now + @policy.timeout
Expand All @@ -705,6 +706,7 @@ def execute
@node = get_node
@conn = @node.get_connection(@policy.timeout)
rescue => e
failed_nodes << @node if @node
if @node
# Socket connection error has occurred. Decrease health and retry.
@node.decrease_health
Expand All @@ -724,6 +726,7 @@ def execute
begin
write_buffer
rescue => e
failed_nodes << @node if @node
Aerospike.logger.error(e)

# All runtime exceptions are considered fatal. Do not retry.
Expand All @@ -738,6 +741,7 @@ def execute
begin
@conn.write(@data_buffer, @data_offset)
rescue => e
failed_nodes << @node if @node
# IO errors are considered temporary anomalies. Retry.
# Close socket to flush out possible garbage. Do not put back in pool.
@conn.close if @conn
Expand All @@ -753,6 +757,7 @@ def execute
begin
parse_result
rescue => e
failed_nodes << @node if @node
case e
# do not log the following exceptions
when Aerospike::Exceptions::ScanTerminated
Expand Down Expand Up @@ -783,7 +788,7 @@ def execute
end # while

# execution timeout
raise Aerospike::Exceptions::Timeout.new(limit, iterations)
raise Aerospike::Exceptions::Timeout.new(limit, iterations, failed_nodes)
end

protected
Expand Down
4 changes: 2 additions & 2 deletions lib/aerospike/command/delete_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ def parse_result

if result_code == Aerospike::ResultCode::FILTERED_OUT
if @policy.fail_on_filtered_out
raise Aerospike::Exceptions::Aerospike.new(result_code)
raise Aerospike::Exceptions::Aerospike.new(result_code, nil, [@node])
end
@existed = true
return
end

raise Aerospike::Exceptions::Aerospike.new(result_code)
raise Aerospike::Exceptions::Aerospike.new(result_code, nil, [@node])
end

end # class
Expand Down
Loading

0 comments on commit dcd9078

Please sign in to comment.