Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prepared) correct check for consensus timeout #24

Merged
merged 4 commits into from
Jan 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
set -e

if [ "$OPENRESTY_TESTS" != "yes" ]; then
busted -v --coverage -o gtest
make lint
busted -v --coverage -o gtest --repeat 3
luacov-coveralls -i cassandra
else
prove -l t
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ env:
- LUA=lua5.3
- LUA=luajit-2.0
- LUA=luajit-2.1
- OPENRESTY_TESTS: yes
- OPENRESTY_TESTS: "yes"
LUA: luajit-2.1
before_install:
- bash .ci/setup_cassandra.sh
Expand Down
6 changes: 3 additions & 3 deletions spec/integration/cassandra_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ describe("spawn_session()", function()
describe("execute()", function()
after_each(function()
-- drop keyspace in case tests failed
session:execute("DROP KEYSPACE resty_cassandra_spec")
utils.drop_keyspace(session, "resty_cassandra_spec")
end)
it("should require argument #1 to be a string", function()
assert.has_error(function()
Expand Down Expand Up @@ -200,7 +200,7 @@ describe("spawn_session()", function()
assert.falsy(err)
assert.is_table(res)

local res, err = session:execute("DROP KEYSPACE resty_cassandra_spec")
res, err = session:execute("DROP KEYSPACE resty_cassandra_spec")
assert.falsy(err)
assert.is_table(res)
assert.equal(0, #res)
Expand All @@ -221,7 +221,7 @@ describe("spawn_session()", function()
assert.falsy(err)

_, err = session:execute [[
CREATE TABLE resty_cassandra_spec.fixture_table(
CREATE TABLE IF NOT EXISTS resty_cassandra_spec.fixture_table(
id uuid PRIMARY KEY,
value varchar
)
Expand Down
6 changes: 1 addition & 5 deletions spec/spec_utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ function _M.create_keyspace(session, keyspace)
end

function _M.drop_keyspace(session, keyspace)
local res, err = session:execute("DROP KEYSPACE "..keyspace)
if err then
error(err)
end
return res
session:execute("DROP KEYSPACE "..keyspace)
end

local delta = 0.0000001
Expand Down
23 changes: 16 additions & 7 deletions src/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,20 @@ end
function RequestHandler:wait_for_schema_consensus()
log.info("Waiting for schema consensus")

local match, err
local match, t_diff, err
local start = time_utils.get_time()

repeat
time_utils.wait(0.5)
match, err = check_schema_consensus(self)
until match or err ~= nil or (time_utils.get_time() - start) < self.options.protocol_options.max_schema_consensus_wait
t_diff = time_utils.get_time() - start
until match or err ~= nil or t_diff >= self.options.protocol_options.max_schema_consensus_wait

return err
if err ~= nil then
return err
elseif not match then
log.err("Waiting for schema consensus timed out. "..t_diff.." > "..self.options.protocol_options.max_schema_consensus_wait)
end
end

function RequestHandler:send_on_next_coordinator(request)
Expand Down Expand Up @@ -757,7 +762,7 @@ local function prepare_query(request_handler, query)
local prepared_key_lock = prepared_key.."_lock"
local lock, lock_err, elapsed = lock_mutex(request_handler.options.prepared_shm, prepared_key_lock)
if lock_err then
return nil, lock_err
return nil, "Could not create lock for prepare request: "..lock_err
end

if elapsed and elapsed == 0 then
Expand All @@ -767,26 +772,30 @@ local function prepare_query(request_handler, query)
local res, err = request_handler:send(prepare_request)
if err then
return nil, err
elseif res.query_id == nil then
return nil, "Could not retrieve query id from prepare request"
end
query_id = res.query_id
local ok, cache_err = cache.set_prepared_query_id(request_handler.options, query, query_id)
if not ok then
return nil, cache_err
return nil, "Could not insert query id in cache for prepared query: "..cache_err
end
log.info("Query prepared for host "..request_handler.coordinator.address)
else
-- once the lock is resolved, all other workers can retry to get the query, and should
-- instantly succeed. We then skip the preparation part.
query_id, cache_err = cache.get_prepared_query_id(request_handler.options, query)
if cache_err then
return nil, cache_err
return nil, "Could not get query id from cache for prepared query: "..cache_err
elseif query_id == nil then
return nil, "No query id found in cache for prepared query"
end
end

-- UNLOCK MUTEX
lock_err = unlock_mutex(lock)
if lock_err then
return nil, "Error unlocking mutex for query preparation: "..lock_err
return nil, "Error unlocking mutex for query for prepare request: "..lock_err
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/cassandra/options.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ local DEFAULTS = {
},
protocol_options = {
default_port = 9042,
max_schema_consensus_wait = 5000
max_schema_consensus_wait = 10000
},
socket_options = {
connect_timeout = 1000, -- ms
Expand Down