Skip to content

Commit

Permalink
fix(dao) complete update to 0.4.0 + fix #681
Browse files Browse the repository at this point in the history
rate-limiting plugins don't use a batch statement anymore to increment
the counters, which used to overload a coordinator and generate a
warning. Since the new driver uses load balancing policies, we increment
the counters with regular queries.
  • Loading branch information
thibaultcha committed Dec 17, 2015
1 parent a0238fa commit e289c37
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 58 deletions.
2 changes: 1 addition & 1 deletion kong-0.5.4-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies = {
"yaml ~> 1.1.2-1",
"lapis ~> 1.3.1-1",
"stringy ~> 0.4-1",
"lua-cassandra ~> 0.3.6-0",
"lua-cassandra ~> 0.4.0-0",
"multipart ~> 0.2-1",
"lua-path ~> 0.2.3-1",
"lua-cjson ~> 2.1.0-1",
Expand Down
8 changes: 4 additions & 4 deletions kong/dao/cassandra/factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
-- Also provides helper methods for preparing queries among the DAOs, migrating the
-- database and dropping it.

local log = require "cassandra.log"
log.set_lvl("ERR")

local constants = require "kong.constants"
local cassandra = require "cassandra"
local DaoError = require "kong.dao.error"
local stringy = require "stringy"
local Object = require "classic"
local utils = require "kong.tools.utils"

-- Silent outside of ngx_lua logging
cassandra.set_log_level("QUIET")

local CassandraFactory = Object:extend()

-- Shorthand for accessing one of the underlying DAOs
Expand Down Expand Up @@ -106,7 +106,7 @@ function CassandraFactory:get_session_options()
contact_points = self._properties.contact_points,
keyspace = self._properties.keyspace,
query_options = {
prepare = false -- todo
prepare = true
}
}

Expand Down
26 changes: 21 additions & 5 deletions kong/plugins/rate-limiting/daos.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
local cassandra = require "cassandra"
local BaseDao = require "kong.dao.cassandra.base_dao"
local cassandra = require "cassandra"
local timestamp = require "kong.tools.timestamp"

local ngx_log = ngx.log
local ngx_err = ngx.ERR
local tostring = tostring

local RateLimitingMetrics = BaseDao:extend()

function RateLimitingMetrics:new(properties)
Expand All @@ -26,19 +30,31 @@ end

function RateLimitingMetrics:increment(api_id, identifier, current_timestamp, value)
local periods = timestamp.get_timestamps(current_timestamp)
local batch = cassandra:BatchStatement(cassandra.batch_types.COUNTER)
local options = self._factory:get_session_options()
local session, err = cassandra.spawn_session(options)
if err then
ngx_log(ngx_err, "[rate-limiting] could not spawn session to Cassandra: "..tostring(err))
return
end

local ok = true
for period, period_date in pairs(periods) do
batch:add(self.queries.increment_counter, {
local res, err = session:execute(self.queries.increment_counter, {
cassandra.counter(value),
cassandra.uuid(api_id),
identifier,
cassandra.timestamp(period_date),
period
})
if not res then
ok = false
ngx_log(ngx_err, "[rate-limiting] could not increment counter for period '"..period.."': ", tostring(err))
end
end

return RateLimitingMetrics.super._execute(self, batch)
session:set_keep_alive()

return ok
end

function RateLimitingMetrics:find_one(api_id, identifier, current_timestamp, period)
Expand Down Expand Up @@ -86,4 +102,4 @@ function RateLimitingMetrics:find_by_keys()
error("ratelimiting_metrics:find_by_keys() not supported", 2)
end

return { ratelimiting_metrics = RateLimitingMetrics }
return {ratelimiting_metrics = RateLimitingMetrics}
24 changes: 20 additions & 4 deletions kong/plugins/response-ratelimiting/daos.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
local cassandra = require "cassandra"
local BaseDao = require "kong.dao.cassandra.base_dao"
local cassandra = require "cassandra"
local timestamp = require "kong.tools.timestamp"

local ngx_log = ngx.log
local ngx_err = ngx.ERR
local tostring = tostring

local ResponseRateLimitingMetrics = BaseDao:extend()

function ResponseRateLimitingMetrics:new(properties)
Expand All @@ -26,19 +30,31 @@ end

function ResponseRateLimitingMetrics:increment(api_id, identifier, current_timestamp, value, name)
local periods = timestamp.get_timestamps(current_timestamp)
local batch = cassandra:BatchStatement(cassandra.batch_types.COUNTER)
local options = self._factory:get_session_options()
local session, err = cassandra.spawn_session(options)
if err then
ngx_log(ngx_err, "[response-rate-limiting] could not spawn session to Cassandra: "..tostring(err))
return
end

local ok = true
for period, period_date in pairs(periods) do
batch:add(self.queries.increment_counter, {
local res, err = session:execute(self.queries.increment_counter, {
cassandra.counter(value),
cassandra.uuid(api_id),
identifier,
cassandra.timestamp(period_date),
name.."_"..period
})
if not res then
ok = false
ngx_log(ngx_err, "[response-rate-limiting] could not increment counter for period '"..period.."': ", tostring(err))
end
end

return ResponseRateLimitingMetrics.super._execute(self, batch)
session:set_keep_alive()

return ok
end

function ResponseRateLimitingMetrics:find_one(api_id, identifier, current_timestamp, period, name)
Expand Down
7 changes: 3 additions & 4 deletions spec/integration/dao/cassandra/base_dao_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ describe("Cassandra", function()
setup(function()
spec_helper.prepare_db()

local cluster, err = cassandra.spawn_cluster {
local err
session, err = cassandra.spawn_session {
shm = "factory_specs",
keyspace = configuration.dao_config.keyspace,
contact_points = configuration.dao_config.contact_points
}
assert.falsy(err)

session, err = cluster:spawn_session {keyspace = configuration.dao_config.keyspace}
assert.falsy(err)
end)

teardown(function()
Expand Down
12 changes: 0 additions & 12 deletions spec/integration/dao/cassandra/cascade_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ local dao_factory = env.dao_factory
dao_factory:load_plugins({"keyauth", "basicauth", "oauth2"})

describe("Cassandra cascade delete", function()

setup(function()
spec_helper.prepare_db()
end)

describe("API -> plugins", function()
local api, untouched_api

Expand All @@ -34,11 +32,9 @@ describe("Cassandra cascade delete", function()
api = fixtures.api[1]
untouched_api = fixtures.api[2]
end)

teardown(function()
spec_helper.drop_db()
end)

it("should delete foreign plugins when deleting an API", function()
local ok, err = dao_factory.apis:delete(api)
assert.falsy(err)
Expand Down Expand Up @@ -84,11 +80,9 @@ describe("Cassandra cascade delete", function()
consumer = fixtures.consumer[1]
untouched_consumer = fixtures.consumer[2]
end)

teardown(function()
spec_helper.drop_db()
end)

it("should delete foreign plugins when deleting a Consumer", function()
local ok, err = dao_factory.consumers:delete(consumer)
assert.falsy(err)
Expand Down Expand Up @@ -126,11 +120,9 @@ describe("Cassandra cascade delete", function()
consumer = fixtures.consumer[1]
untouched_consumer = fixtures.consumer[2]
end)

teardown(function()
spec_helper.drop_db()
end)

it("should delete foreign keyauth_credentials when deleting a Consumer", function()
local ok, err = dao_factory.consumers:delete(consumer)
assert.falsy(err)
Expand Down Expand Up @@ -167,11 +159,9 @@ describe("Cassandra cascade delete", function()
consumer = fixtures.consumer[1]
untouched_consumer = fixtures.consumer[2]
end)

teardown(function()
spec_helper.drop_db()
end)

it("should delete foreign basicauth_credentials when deleting a Consumer", function()
local ok, err = dao_factory.consumers:delete(consumer)
assert.falsy(err)
Expand Down Expand Up @@ -225,11 +215,9 @@ describe("Cassandra cascade delete", function()
}
assert.falsy(err)
end)

teardown(function()
spec_helper.drop_db()
end)

it("should delete foreign oauth2_credentials and tokens when deleting a Consumer", function()
local ok, err = dao_factory.consumers:delete(consumer)
assert.falsy(err)
Expand Down
8 changes: 2 additions & 6 deletions spec/integration/dao/cassandra/migrations_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@ local test_cassandra_properties = test_configuration.dao_config
test_cassandra_properties.keyspace = FIXTURES.keyspace

local test_dao = DAO(test_cassandra_properties)
local cluster, err = cassandra.spawn_cluster {
local session, err = cassandra.spawn_session {
shm = "factory_specs",
--keyspace = test_configuration.dao_config.keyspace,
contact_points = test_configuration.dao_config.contact_points
}
if err then
error(err)
end

local session, err = cluster:spawn_session {keyspace = test_configuration.dao_config.keyspace}
if err then
error(err)
end

local function has_table(state, arguments)
local rows, err = session:execute("SELECT columnfamily_name FROM system.schema_columnfamilies WHERE keyspace_name = ?", {FIXTURES.keyspace})
if err then
Expand Down
17 changes: 7 additions & 10 deletions spec/plugins/rate-limiting/daos_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe("Rate Limiting Metrics", function()
for period, period_date in pairs(periods) do
local metric, err = ratelimiting_metrics:find_one(api_id, identifier, current_timestamp, period)
assert.falsy(err)
assert.are.same(nil, metric)
assert.same(nil, metric)
end
end)

Expand All @@ -30,15 +30,14 @@ describe("Rate Limiting Metrics", function()
local periods = timestamp.get_timestamps(current_timestamp)

-- First increment
local ok, err = ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1)
assert.falsy(err)
local ok = ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1)
assert.True(ok)

-- First select
for period, period_date in pairs(periods) do
local metric, err = ratelimiting_metrics:find_one(api_id, identifier, current_timestamp, period)
assert.falsy(err)
assert.are.same({
assert.same({
api_id = api_id,
identifier = identifier,
period = period,
Expand All @@ -48,15 +47,14 @@ describe("Rate Limiting Metrics", function()
end

-- Second increment
local ok, err = ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1)
assert.falsy(err)
local ok = ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1)
assert.True(ok)

-- Second select
for period, period_date in pairs(periods) do
local metric, err = ratelimiting_metrics:find_one(api_id, identifier, current_timestamp, period)
assert.falsy(err)
assert.are.same({
assert.same({
api_id = api_id,
identifier = identifier,
period = period,
Expand All @@ -70,8 +68,7 @@ describe("Rate Limiting Metrics", function()
periods = timestamp.get_timestamps(current_timestamp)

-- Third increment
local ok, err = ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1)
assert.falsy(err)
local ok = ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1)
assert.True(ok)

-- Third select with 1 second delay
Expand All @@ -85,7 +82,7 @@ describe("Rate Limiting Metrics", function()

local metric, err = ratelimiting_metrics:find_one(api_id, identifier, current_timestamp, period)
assert.falsy(err)
assert.are.same({
assert.same({
api_id = api_id,
identifier = identifier,
period = period,
Expand Down
17 changes: 7 additions & 10 deletions spec/plugins/response-ratelimiting/daos_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ describe("Rate Limiting Metrics", function()
local periods = timestamp.get_timestamps(current_timestamp)

-- First increment
local ok, err = response_ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1, "video")
assert.falsy(err)
local ok = response_ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1, "video")
assert.True(ok)

-- First select
for period, period_date in pairs(periods) do
local metric, err = response_ratelimiting_metrics:find_one(api_id, identifier, current_timestamp, period, "video")
assert.falsy(err)
assert.are.same({
assert.same({
api_id = api_id,
identifier = identifier,
period = "video_"..period,
Expand All @@ -48,15 +47,14 @@ describe("Rate Limiting Metrics", function()
end

-- Second increment
local ok, err = response_ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1, "video")
assert.falsy(err)
local ok = response_ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1, "video")
assert.True(ok)

-- Second select
for period, period_date in pairs(periods) do
local metric, err = response_ratelimiting_metrics:find_one(api_id, identifier, current_timestamp, period, "video")
assert.falsy(err)
assert.are.same({
assert.same({
api_id = api_id,
identifier = identifier,
period = "video_"..period,
Expand All @@ -70,8 +68,7 @@ describe("Rate Limiting Metrics", function()
periods = timestamp.get_timestamps(current_timestamp)

-- Third increment
local ok, err = response_ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1, "video")
assert.falsy(err)
local ok = response_ratelimiting_metrics:increment(api_id, identifier, current_timestamp, 1, "video")
assert.True(ok)

-- Third select with 1 second delay
Expand All @@ -85,7 +82,7 @@ describe("Rate Limiting Metrics", function()

local metric, err = response_ratelimiting_metrics:find_one(api_id, identifier, current_timestamp, period, "video")
assert.falsy(err)
assert.are.same({
assert.same({
api_id = api_id,
identifier = identifier,
period = "video_"..period,
Expand All @@ -103,4 +100,4 @@ describe("Rate Limiting Metrics", function()
assert.has_error(response_ratelimiting_metrics.find_by_keys, "ratelimiting_metrics:find_by_keys() not supported")
end)

end) -- describe rate limiting metrics
end) -- describe rate limiting metrics
2 changes: 0 additions & 2 deletions spec/unit/tools/config_loader_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ describe("Configuration validation", function()
assert.truthy(errors)
assert.equal("must be a number", errors.proxy_port)
assert.equal("must be a string", errors.database)
assert.equal("must be a number", errors["databases_available.cassandra.timeout"])
assert.equal("must be a boolean", errors["databases_available.cassandra.ssl"])
assert.falsy(errors.ssl_cert_path)
assert.falsy(errors.ssl_key_path)
end)
Expand Down

0 comments on commit e289c37

Please sign in to comment.