Skip to content

Commit

Permalink
Make sure statements that are added to a batch are prepared
Browse files Browse the repository at this point in the history
When executing a prepared statement it will make sure that it's prepared on the connection before sending the request, but when a prepared statement is added to a batch it doesn't do this check, instead it raises a NotPreparedError. With this patch the batch will prepare any unprepared statements before sending the batch request.

This fixes iconara#117
  • Loading branch information
iconara authored and JasonK committed Sep 4, 2014
1 parent 5410372 commit 343a272
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 38 deletions.
57 changes: 34 additions & 23 deletions lib/cql/client/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,31 @@ def add(*args)

def execute(options=nil)
options = @execute_options_decoder.decode_options(@options, options)
connection = nil
attempts = 0
begin
connection = @connection_manager.random_connection
request = Protocol::BatchRequest.new(BATCH_TYPES[@type], options[:consistency], options[:trace])
@parts.each do |cql_or_statement, *bound_args|
if cql_or_statement.is_a?(String)
type_hints = nil
if bound_args.last.is_a?(Hash) && bound_args.last.include?(:type_hints)
bound_args = bound_args.dup
type_hints = bound_args.pop[:type_hints]
end
request.add_query(cql_or_statement, bound_args, type_hints)
else
cql_or_statement.add_to_batch(request, connection, bound_args)
end
end
rescue NotPreparedError
attempts += 1
if attempts < 3
retry
connection = @connection_manager.random_connection
request = Protocol::BatchRequest.new(BATCH_TYPES[@type], options[:consistency], options[:trace])
unprepared_statements = nil
@parts.each do |part, *bound_args|
if part.is_a?(String) || part.prepared?(connection)
add_part(connection, request, part, bound_args)
else
raise
unprepared_statements ||= []
unprepared_statements << [part, bound_args]
end
end
@parts = []
@request_runner.execute(connection, request, options[:timeout])
if unprepared_statements.nil?
@request_runner.execute(connection, request, options[:timeout])
else
fs = unprepared_statements.map do |statement, _|
statement.prepare(connection)
end
Future.all(*fs).flat_map do
unprepared_statements.each do |statement, bound_args|
add_part(connection, request, statement, bound_args)
end
@request_runner.execute(connection, request, options[:timeout])
end
end
end

private
Expand All @@ -122,6 +120,19 @@ def execute(options=nil)
:unlogged => Protocol::BatchRequest::UNLOGGED_TYPE,
:counter => Protocol::BatchRequest::COUNTER_TYPE,
}.freeze

def add_part(connection, request, part, bound_args)
if part.is_a?(String)
type_hints = nil
if bound_args.last.is_a?(Hash) && bound_args.last.include?(:type_hints)
bound_args = bound_args.dup
type_hints = bound_args.pop[:type_hints]
end
request.add_query(part, bound_args, type_hints)
else
part.add_to_batch(request, connection, bound_args)
end
end
end

# @private
Expand Down
5 changes: 5 additions & 0 deletions lib/cql/client/prepared_statement.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ def prepare(connection)
f.map(self)
end

# @private
def prepared?(connection)
!!connection[self]
end

# @private
def add_to_batch(batch, connection, bound_args)
statement_id = connection[self]
Expand Down
31 changes: 16 additions & 15 deletions spec/cql/client/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def last_timeout
end
end

before do
prepared_statement.stub(:prepared?).and_return(true)
end

it 'creates a BATCH request and executes it on a random connection' do
batch.execute.value
connection.should have_received(:send_request).with(an_instance_of(Protocol::BatchRequest), nil)
Expand Down Expand Up @@ -104,22 +108,19 @@ def last_timeout
encoded_frame.to_s.should include(Protocol::QueryRequest.encode_values(Protocol::CqlByteBuffer.new, [3], [:int]))
end

it 'tries again when a prepared statement raises NotPreparedError' do
connection1 = double(:connection1)
connection2 = double(:connection2)
connection2.stub(:send_request).and_return(Cql::Future.resolved(Protocol::VoidResultResponse.new(nil)))
connection_manager.stub(:random_connection).and_return(connection1, connection2)
prepared_statement.stub(:add_to_batch).with(anything, connection1, anything).and_raise(NotPreparedError)
prepared_statement.stub(:add_to_batch).with(anything, connection2, anything)
batch.add(prepared_statement, 3, 'foo')
expect { batch.execute.value }.to_not raise_error
end

it 'gives up when the prepared statement has raised NotPreparedError three times' do
prepared_statement.stub(:add_to_batch).with(anything, connection, anything).and_raise(NotPreparedError)
it 'prepares the statement on the selected connection when NotPreparedError is raised' do
connection = double(:connection)
connection.stub(:send_request).and_return(Cql::Future.resolved(Protocol::VoidResultResponse.new(nil)))
connection_manager.stub(:random_connection).and_return(connection)
prepared_statement.stub(:prepared?).with(connection).and_return(false)
prepared_statement.stub(:add_to_batch)
prepared_statement.stub(:prepare) do |c|
c.should equal(connection), 'expected #prepare to be called with the connection returned by the connection manager'
prepared_statement.stub(:prepared?).with(connection).and_return(true)
Future.resolved(prepared_statement)
end
batch.add(prepared_statement, 3, 'foo')
expect { batch.execute.value }.to raise_error(NotPreparedError)
prepared_statement.should have_received(:add_to_batch).exactly(3).times
batch.execute.value.should be_a(VoidResult)
end

it 'returns a future that resolves to the response' do
Expand Down

0 comments on commit 343a272

Please sign in to comment.