Skip to content

Commit

Permalink
BigTable storage backend for Cortex (#468)
Browse files Browse the repository at this point in the history
* Cortex Bigtable backend.

- Currently stores chunks in BigTable only.
- Minimal amount of refactoring, had to:
  - break out the storage client factory into its own package to avoid import loops.
  - export a few methods on Chunk (ExternalKey and Decode).
- Add stack traces to some of the chunk store errors, and log them on queries.
- Instrument the BigTable gRPC client.

Also, fix some logic in the previous BigTable storage engine:

- Return correct part of the row key as the range key
- Correctly stop reading bigtable results when trying to read to the end of a 'row'.
- Log the range key we read from bigtable.
- Chunks are being written before calculating their checksum, which shouldn't happen.
- Don't return empty chunks for BigTable backend.

* Review feedback

* Instrument streaming gRPC BigTable calls too.

* Chunks don't nessecarily come from S3 anymore.

* When fetching chunks, the should be returned if processingErr == nil.
  • Loading branch information
tomwilkie authored and jml committed Jul 13, 2017
1 parent aea9137 commit fa9cf5a
Show file tree
Hide file tree
Showing 172 changed files with 27,867 additions and 405 deletions.
24 changes: 15 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ update-gazelle:
gazelle -go_prefix github.com/weaveworks/cortex -external vendored \
-build_file_name BUILD.bazel

update-vendor:
dep ensure && dep prune
git status | grep BUILD.bazel | cut -d' ' -f 5 | xargs git checkout HEAD

bazel:
bazel build //cmd/...

Expand Down
1 change: 1 addition & 0 deletions cmd/ingester/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//pkg/chunk:go_default_library",
"//pkg/chunk/storage:go_default_library",
"//pkg/ingester:go_default_library",
"//pkg/ingester/client:go_default_library",
"//pkg/util:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/ingester"
"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/util"
Expand All @@ -25,8 +26,8 @@ func main() {
},
}
chunkStoreConfig chunk.StoreConfig
storageConfig chunk.StorageClientConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
ingesterConfig ingester.Config
)
// Ingester needs to know our gRPC listen port.
Expand All @@ -41,7 +42,7 @@ func main() {
}
defer server.Shutdown()

storageClient, err := chunk.NewStorageClient(storageConfig, schemaConfig)
storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
if err != nil {
log.Fatalf("Error initializing storage client: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/lite/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//pkg/chunk:go_default_library",
"//pkg/chunk/storage:go_default_library",
"//pkg/distributor:go_default_library",
"//pkg/ingester:go_default_library",
"//pkg/ingester/client:go_default_library",
Expand Down
12 changes: 7 additions & 5 deletions cmd/lite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/ingester"
"github.com/weaveworks/cortex/pkg/ingester/client"
Expand All @@ -37,13 +38,14 @@ func main() {
distributorConfig distributor.Config
ingesterConfig ingester.Config
schemaConfig chunk.SchemaConfig
storageConfig chunk.StorageClientConfig
unauthenticated bool
storageConfig storage.Config

unauthenticated bool
)
// Ingester needs to know our gRPC listen port.
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig,
&ingesterConfig, &schemaConfig, &storageConfig)
&ingesterConfig, &storageConfig, &schemaConfig)
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
flag.Parse()

Expand All @@ -53,7 +55,7 @@ func main() {
}
defer server.Shutdown()

storageClient, err := chunk.NewStorageClient(storageConfig, schemaConfig)
storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
if err != nil {
log.Fatalf("Error initializing storage client: %v", err)
}
Expand Down Expand Up @@ -85,7 +87,7 @@ func main() {
prometheus.MustRegister(ingester)
defer ingester.Shutdown()

tableClient, err := chunk.NewDynamoDBTableClient(storageConfig.AWSStorageConfig.DynamoDBConfig)
tableClient, err := storage.NewTableClient(storageConfig)
if err != nil {
log.Fatalf("Error initializing DynamoDB table client: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/querier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//pkg/chunk:go_default_library",
"//pkg/chunk/storage:go_default_library",
"//pkg/distributor:go_default_library",
"//pkg/querier:go_default_library",
"//pkg/ring:go_default_library",
Expand Down
7 changes: 4 additions & 3 deletions cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/querier"
"github.com/weaveworks/cortex/pkg/ring"
Expand All @@ -33,11 +34,11 @@ func main() {
ringConfig ring.Config
distributorConfig distributor.Config
chunkStoreConfig chunk.StoreConfig
storageConfig chunk.StorageClientConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
&chunkStoreConfig, &storageConfig, &schemaConfig)
&chunkStoreConfig, &schemaConfig, &storageConfig)
flag.Parse()

r, err := ring.New(ringConfig)
Expand All @@ -60,7 +61,7 @@ func main() {
defer server.Shutdown()
server.HTTP.Handle("/ring", r)

storageClient, err := chunk.NewStorageClient(storageConfig, schemaConfig)
storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
if err != nil {
log.Fatalf("Error initializing storage client: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/ruler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//pkg/chunk:go_default_library",
"//pkg/chunk/storage:go_default_library",
"//pkg/distributor:go_default_library",
"//pkg/ring:go_default_library",
"//pkg/ruler:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions cmd/ruler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/ring"
"github.com/weaveworks/cortex/pkg/ruler"
Expand All @@ -28,14 +29,14 @@ func main() {
distributorConfig distributor.Config
rulerConfig ruler.Config
chunkStoreConfig chunk.StoreConfig
storageConfig chunk.StorageClientConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig)
flag.Parse()

storageClient, err := chunk.NewStorageClient(storageConfig, schemaConfig)
storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
if err != nil {
log.Fatalf("Error initializing storage client: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/table-manager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//pkg/chunk:go_default_library",
"//pkg/chunk/storage:go_default_library",
"//pkg/util:go_default_library",
"//vendor/github.com/prometheus/common/log:go_default_library",
"//vendor/github.com/weaveworks/common/middleware:go_default_library",
Expand Down
9 changes: 5 additions & 4 deletions cmd/table-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/util"
)

Expand All @@ -21,13 +22,13 @@ func main() {
},
}

dynamoDBConfig = chunk.DynamoDBConfig{}
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
schemaConfig chunk.SchemaConfig
)
util.RegisterFlags(&serverConfig, &dynamoDBConfig, &schemaConfig)
util.RegisterFlags(&serverConfig, &storageConfig, &schemaConfig)
flag.Parse()

tableClient, err := chunk.NewDynamoDBTableClient(dynamoDBConfig)
tableClient, err := storage.NewTableClient(storageConfig)
if err != nil {
log.Fatalf("Error initializing DynamoDB table client: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/chunk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//vendor/github.com/aws/aws-sdk-go/service/s3/s3iface:go_default_library",
"//vendor/github.com/bradfitz/gomemcache/memcache:go_default_library",
"//vendor/github.com/golang/snappy:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/prometheus/common/log:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
Expand Down
28 changes: 9 additions & 19 deletions pkg/chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io/ioutil"
"math/rand"
"net/url"
"strconv"
"strings"
"time"

Expand All @@ -22,7 +21,6 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/weaveworks/common/instrument"
Expand Down Expand Up @@ -57,7 +55,7 @@ var (
Help: "Time spent doing DynamoDB requests.",

// DynamoDB latency seems to range from a few ms to a few sec and is
// important. So use 8 buckets from 64us to 8s.
// important. So use 8 buckets from 128us to 2s.
Buckets: prometheus.ExponentialBuckets(0.000128, 4, 8),
}, []string{"operation", "status_code"})
dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -430,7 +428,7 @@ func (a awsStorageClient) getS3Chunk(ctx context.Context, chunk Chunk) (Chunk, e
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(a.bucketName),
Key: aws.String(chunk.externalKey()),
Key: aws.String(chunk.ExternalKey()),
})
return err
})
Expand All @@ -442,7 +440,7 @@ func (a awsStorageClient) getS3Chunk(ctx context.Context, chunk Chunk) (Chunk, e
if err != nil {
return Chunk{}, err
}
if err := chunk.decode(buf); err != nil {
if err := chunk.Decode(buf); err != nil {
return Chunk{}, err
}
return chunk, nil
Expand All @@ -456,9 +454,9 @@ func (a awsStorageClient) getDynamoDBChunks(ctx context.Context, chunks []Chunk)
outstanding := dynamoDBReadRequest{}
chunksByKey := map[string]Chunk{}
for _, chunk := range chunks {
key := chunk.externalKey()
key := chunk.ExternalKey()
chunksByKey[key] = chunk
tableName := a.chunkTableFor(chunk.From)
tableName := a.schemaCfg.ChunkTables.TableFor(chunk.From)
outstanding.Add(tableName, key, placeholder)
}

Expand Down Expand Up @@ -542,7 +540,7 @@ func processChunkResponse(response *dynamodb.BatchGetItemOutput, chunksByKey map
return nil, fmt.Errorf("Got response from DynamoDB with no value: %+v", item)
}

if err := chunk.decode(buf.B); err != nil {
if err := chunk.Decode(buf.B); err != nil {
return nil, err
}

Expand All @@ -561,17 +559,17 @@ func (a awsStorageClient) PutChunks(ctx context.Context, chunks []Chunk) error {

for i := range chunks {
// Encode the chunk first - checksum is calculated as a side effect.
buf, err := chunks[i].encode()
buf, err := chunks[i].Encode()
if err != nil {
return err
}
key := chunks[i].externalKey()
key := chunks[i].ExternalKey()

if !a.schemaCfg.ChunkTables.From.IsSet() || chunks[i].From.Before(a.schemaCfg.ChunkTables.From.Time) {
s3ChunkKeys = append(s3ChunkKeys, key)
s3ChunkBufs = append(s3ChunkBufs, buf)
} else {
table := a.chunkTableFor(chunks[i].From)
table := a.schemaCfg.ChunkTables.TableFor(chunks[i].From)
dynamoDBWrites.Add(table, key, placeholder, buf)
}
}
Expand All @@ -588,14 +586,6 @@ func (a awsStorageClient) PutChunks(ctx context.Context, chunks []Chunk) error {
return a.BatchWrite(ctx, dynamoDBWrites)
}

func (a awsStorageClient) chunkTableFor(t model.Time) string {
var (
periodSecs = int64(a.schemaCfg.ChunkTables.Period / time.Second)
table = t.Unix() / periodSecs
)
return a.schemaCfg.ChunkTables.Prefix + strconv.Itoa(int(table))
}

func (a awsStorageClient) putS3Chunks(ctx context.Context, keys []string, bufs [][]byte) error {
incomingErrors := make(chan error)
for i := range bufs {
Expand Down
Loading

0 comments on commit fa9cf5a

Please sign in to comment.