Skip to content

Commit

Permalink
Add Cassandra table options flag (grafana#2575)
Browse files Browse the repository at this point in the history
* add the cassandra.table-with flag

Signed-off-by: Kyeongwon Seo <[email protected]>

* update CHANGELOG and docs

Signed-off-by: Kyeongwon Seo <[email protected]>

* add PR number to CHANGELOG

Signed-off-by: Kyeongwon Seo <[email protected]>

* remove blacklisted package import

Signed-off-by: Kyeongwon Seo <[email protected]>

* fix import with goimports

Signed-off-by: Kyeongwon Seo <[email protected]>

* rename table_with to table_options and improve the flag usage

Signed-off-by: Kyeongwon Seo <[email protected]>

* add a description about configuring table options to documentation: Running Cortex with Cassandra

Signed-off-by: Kyeongwon Seo <[email protected]>

* clean white noise

Signed-off-by: Kyeongwon Seo <[email protected]>
  • Loading branch information
kwSeo authored Jun 8, 2020
1 parent 8e53379 commit a4aba35
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
QueryConcurrency int `yaml:"query_concurrency"`
NumConnections int `yaml:"num_connections"`
ConvictHosts bool `yaml:"convict_hosts_on_failure"`
TableOptions string `yaml:"table_options"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -75,6 +76,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
f.IntVar(&cfg.NumConnections, "cassandra.num-connections", 2, "Number of TCP connections per host.")
f.BoolVar(&cfg.ConvictHosts, "cassandra.convict-hosts-on-failure", true, "Convict hosts of being down on failure.")
f.StringVar(&cfg.TableOptions, "cassandra.table-options", "", "Table options used to create index or chunk tables. This value is used as plain text in the table `WITH` like this, \"CREATE TABLE <generated_by_cortex> (...) WITH <cassandra.table-options>\". For details, see https://cortexmetrics.io/docs/production/cassandra. (Default = \"\": use default table options of your Cassandra)")
}

func (cfg *Config) Validate() error {
Expand Down
23 changes: 16 additions & 7 deletions cassandra/table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,8 @@ func (c *tableClient) ListTables(ctx context.Context) ([]string, error) {
}

func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error {
err := c.session.Query(fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
)`, desc.Name)).WithContext(ctx).Exec()
query := c.getCreateTableQuery(&desc)
err := c.session.Query(query).WithContext(ctx).Exec()
return errors.WithStack(err)
}

Expand All @@ -69,3 +64,17 @@ func (c *tableClient) UpdateTable(ctx context.Context, current, expected chunk.T
func (c *tableClient) Stop() {
c.session.Close()
}

func (c *tableClient) getCreateTableQuery(desc *chunk.TableDesc) (query string) {
query = fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
)`, desc.Name)
if c.cfg.TableOptions != "" {
query = fmt.Sprintf("%s WITH %s", query, c.cfg.TableOptions)
}
return
}
48 changes: 48 additions & 0 deletions cassandra/table_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cassandra

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestTableClient_getCreateTableQuery_default(t *testing.T) {
client := &tableClient{
cfg: Config{},
}
desc, _, _ := client.DescribeTable(context.Background(), "test_table")
query := client.getCreateTableQuery(&desc)
assert.Equal(
t,
`
CREATE TABLE IF NOT EXISTS test_table (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
)`,
query,
)
}

func TestTableClient_getCreateTableQuery_withOptions(t *testing.T) {
client := &tableClient{
cfg: Config{
TableOptions: "CLUSTERING ORDER BY (range DESC) AND compaction = { 'class' : 'LeveledCompactionStrategy' }",
},
}
desc, _, _ := client.DescribeTable(context.Background(), "test_table")
query := client.getCreateTableQuery(&desc)
assert.Equal(
t,
`
CREATE TABLE IF NOT EXISTS test_table (
hash text,
range blob,
value blob,
PRIMARY KEY (hash, range)
) WITH CLUSTERING ORDER BY (range DESC) AND compaction = { 'class' : 'LeveledCompactionStrategy' }`,
query,
)
}

0 comments on commit a4aba35

Please sign in to comment.