From 9ec979f382403595f80a7f58c207b80c9b6e7e2f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 2 May 2017 12:50:43 +0100 Subject: [PATCH 001/660] Bazel-ify Cortex (#342) * Move proto to ingester/client. * Move build to build-image * Add BUILD files for bazel * Move all non-main packages under pkg/ * Update circle.yml for the new build-image * Update configs-itegration-test * Documentation for the bazel builds. --- BUILD | 80 ++++++ aws_storage_client.go | 567 +++++++++++++++++++++++++++++++++++++ aws_storage_client_test.go | 419 +++++++++++++++++++++++++++ by_key.go | 114 ++++++++ by_key_test.go | 99 +++++++ chunk.go | 320 +++++++++++++++++++++ chunk_cache.go | 219 ++++++++++++++ chunk_cache_test.go | 114 ++++++++ chunk_store.go | 526 ++++++++++++++++++++++++++++++++++ chunk_store_test.go | 424 +++++++++++++++++++++++++++ chunk_test.go | 123 ++++++++ inmemory_storage_client.go | 280 ++++++++++++++++++ memcache_client.go | 106 +++++++ schema.go | 538 +++++++++++++++++++++++++++++++++++ schema_config.go | 329 +++++++++++++++++++++ schema_config_test.go | 321 +++++++++++++++++++++ schema_test.go | 468 ++++++++++++++++++++++++++++++ schema_util.go | 142 ++++++++++ schema_util_test.go | 85 ++++++ storage_client.go | 64 +++++ table_manager.go | 342 ++++++++++++++++++++++ table_manager_test.go | 178 ++++++++++++ 22 files changed, 5858 insertions(+) create mode 100644 BUILD create mode 100644 aws_storage_client.go create mode 100644 aws_storage_client_test.go create mode 100644 by_key.go create mode 100644 by_key_test.go create mode 100644 chunk.go create mode 100644 chunk_cache.go create mode 100644 chunk_cache_test.go create mode 100644 chunk_store.go create mode 100644 chunk_store_test.go create mode 100644 chunk_test.go create mode 100644 inmemory_storage_client.go create mode 100644 memcache_client.go create mode 100644 schema.go create mode 100644 schema_config.go create mode 100644 schema_config_test.go create mode 100644 schema_test.go create mode 100644 schema_util.go create mode 100644 schema_util_test.go create mode 100644 storage_client.go create mode 100644 table_manager.go create mode 100644 table_manager_test.go diff --git a/BUILD b/BUILD new file mode 100644 index 0000000000000..646f2a670ede6 --- /dev/null +++ b/BUILD @@ -0,0 +1,80 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "aws_storage_client.go", + "by_key.go", + "chunk.go", + "chunk_cache.go", + "chunk_store.go", + "inmemory_storage_client.go", + "memcache_client.go", + "schema.go", + "schema_config.go", + "schema_util.go", + "storage_client.go", + "table_manager.go", + ], + visibility = ["//visibility:public"], + deps = [ + "//pkg/util:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws/awserr:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws/credentials:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws/session:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/dynamodb:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/s3:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/s3/s3iface:go_default_library", + "//vendor/github.com/bradfitz/gomemcache/memcache:go_default_library", + "//vendor/github.com/golang/snappy:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + "//vendor/github.com/prometheus/common/log:go_default_library", + "//vendor/github.com/prometheus/common/model:go_default_library", + "//vendor/github.com/prometheus/prometheus/promql:go_default_library", + "//vendor/github.com/prometheus/prometheus/storage/local/chunk:go_default_library", + "//vendor/github.com/prometheus/prometheus/storage/metric:go_default_library", + "//vendor/github.com/weaveworks/common/errors:go_default_library", + "//vendor/github.com/weaveworks/common/instrument:go_default_library", + "//vendor/github.com/weaveworks/common/mtime:go_default_library", + "//vendor/github.com/weaveworks/common/user:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "aws_storage_client_test.go", + "by_key_test.go", + "chunk_cache_test.go", + "chunk_store_test.go", + "chunk_test.go", + "schema_config_test.go", + "schema_test.go", + "schema_util_test.go", + "table_manager_test.go", + ], + library = ":go_default_library", + deps = [ + "//pkg/util:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws/awserr:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/dynamodb:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface:go_default_library", + "//vendor/github.com/bradfitz/gomemcache/memcache:go_default_library", + "//vendor/github.com/prometheus/common/log:go_default_library", + "//vendor/github.com/prometheus/common/model:go_default_library", + "//vendor/github.com/prometheus/prometheus/storage/local/chunk:go_default_library", + "//vendor/github.com/prometheus/prometheus/storage/metric:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/github.com/weaveworks/common/mtime:go_default_library", + "//vendor/github.com/weaveworks/common/test:go_default_library", + "//vendor/github.com/weaveworks/common/user:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + ], +) diff --git a/aws_storage_client.go b/aws_storage_client.go new file mode 100644 index 0000000000000..a0c4c3afa58c1 --- /dev/null +++ b/aws_storage_client.go @@ -0,0 +1,567 @@ +package chunk + +import ( + "bytes" + "flag" + "fmt" + "io/ioutil" + "math/rand" + "net/url" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/net/context" + + "github.com/weaveworks/common/instrument" + "github.com/weaveworks/cortex/pkg/util" +) + +const ( + hashKey = "h" + rangeKey = "r" + valueKey = "c" + + // For dynamodb errors + tableNameLabel = "table" + errorReasonLabel = "error" + otherError = "other" + + provisionedThroughputExceededException = "ProvisionedThroughputExceededException" + + // Backoff for dynamoDB requests, to match AWS lib - see: + // https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go + minBackoff = 50 * time.Millisecond + maxBackoff = 50 * time.Second + maxRetries = 20 + + // See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html. + dynamoMaxBatchSize = 25 +) + +var ( + dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "dynamo_request_duration_seconds", + Help: "Time spent doing DynamoDB requests.", + + // DynamoDB latency seems to range from a few ms to a few sec and is + // important. So use 8 buckets from 64us to 8s. + Buckets: prometheus.ExponentialBuckets(0.000128, 4, 8), + }, []string{"operation", "status_code"}) + dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_consumed_capacity_total", + Help: "The capacity units consumed by operation.", + }, []string{"operation"}) + dynamoFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_failures_total", + Help: "The total number of errors while storing chunks to the chunk store.", + }, []string{tableNameLabel, errorReasonLabel}) + s3RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "s3_request_duration_seconds", + Help: "Time spent doing S3 requests.", + Buckets: []float64{.025, .05, .1, .25, .5, 1, 2}, + }, []string{"operation", "status_code"}) +) + +func init() { + prometheus.MustRegister(dynamoRequestDuration) + prometheus.MustRegister(dynamoConsumedCapacity) + prometheus.MustRegister(dynamoFailures) + prometheus.MustRegister(s3RequestDuration) +} + +// DynamoDBConfig specifies config for a DynamoDB database. +type DynamoDBConfig struct { + DynamoDB util.URLValue +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { + f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL with escaped Key and Secret encoded. "+ + "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:/// to use a mock in-memory implementation.") +} + +// AWSStorageConfig specifies config for storing data on AWS. +type AWSStorageConfig struct { + DynamoDBConfig + S3 util.URLValue +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *AWSStorageConfig) RegisterFlags(f *flag.FlagSet) { + cfg.DynamoDBConfig.RegisterFlags(f) + f.Var(&cfg.S3, "s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+ + "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:/// to use a mock in-memory implementation.") +} + +type awsStorageClient struct { + DynamoDB dynamodbiface.DynamoDBAPI + S3 s3iface.S3API + bucketName string + + // queryRequestFn exists for mocking, so we don't have to write a whole load + // of boilerplate. + queryRequestFn func(ctx context.Context, input *dynamodb.QueryInput) dynamoDBRequest +} + +// NewAWSStorageClient makes a new AWS-backed StorageClient. +func NewAWSStorageClient(cfg AWSStorageConfig) (StorageClient, error) { + dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) + if err != nil { + return nil, err + } + + if cfg.S3.URL == nil { + return nil, fmt.Errorf("no URL specified for S3") + } + s3Config, err := awsConfigFromURL(cfg.S3.URL) + if err != nil { + return nil, err + } + s3Client := s3.New(session.New(s3Config)) + bucketName := strings.TrimPrefix(cfg.S3.URL.Path, "/") + + storageClient := awsStorageClient{ + DynamoDB: dynamoDB, + S3: s3Client, + bucketName: bucketName, + } + storageClient.queryRequestFn = storageClient.queryRequest + return storageClient, nil +} + +func (a awsStorageClient) NewWriteBatch() WriteBatch { + return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) +} + +// batchWrite writes requests to the underlying storage, handling retires and backoff. +func (a awsStorageClient) BatchWrite(ctx context.Context, input WriteBatch) error { + outstanding := input.(dynamoDBWriteBatch) + unprocessed := map[string][]*dynamodb.WriteRequest{} + backoff, numRetries := minBackoff, 0 + for dictLen(outstanding)+dictLen(unprocessed) > 0 && numRetries < maxRetries { + reqs := map[string][]*dynamodb.WriteRequest{} + takeReqs(unprocessed, reqs, dynamoMaxBatchSize) + takeReqs(outstanding, reqs, dynamoMaxBatchSize) + var resp *dynamodb.BatchWriteItemOutput + + err := instrument.TimeRequestHistogram(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, func(ctx context.Context) error { + var err error + resp, err = a.DynamoDB.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{ + RequestItems: reqs, + ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + }) + return err + }) + for _, cc := range resp.ConsumedCapacity { + dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem"). + Add(float64(*cc.CapacityUnits)) + } + + if err != nil { + for tableName := range reqs { + recordDynamoError(tableName, err) + } + } + + // If there are unprocessed items, backoff and retry those items. + if unprocessedItems := resp.UnprocessedItems; unprocessedItems != nil && dictLen(unprocessedItems) > 0 { + takeReqs(unprocessedItems, unprocessed, -1) + time.Sleep(backoff) + backoff = nextBackoff(backoff) + continue + } + + // If we get provisionedThroughputExceededException, then no items were processed, + // so back off and retry all. + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException { + takeReqs(reqs, unprocessed, -1) + time.Sleep(backoff) + backoff = nextBackoff(backoff) + numRetries++ + continue + } + + // All other errors are fatal. + if err != nil { + return err + } + + backoff = minBackoff + numRetries = 0 + } + + if valuesLeft := dictLen(outstanding) + dictLen(unprocessed); valuesLeft > 0 { + return fmt.Errorf("failed to write chunk after %d retries, %d values remaining", numRetries, valuesLeft) + } + return nil +} + +func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error { + input := &dynamodb.QueryInput{ + TableName: aws.String(query.TableName), + KeyConditions: map[string]*dynamodb.Condition{ + hashKey: { + AttributeValueList: []*dynamodb.AttributeValue{ + {S: aws.String(query.HashValue)}, + }, + ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq), + }, + }, + ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + } + + if query.RangeValuePrefix != nil { + input.KeyConditions[rangeKey] = &dynamodb.Condition{ + AttributeValueList: []*dynamodb.AttributeValue{ + {B: query.RangeValuePrefix}, + }, + ComparisonOperator: aws.String(dynamodb.ComparisonOperatorBeginsWith), + } + } else if query.RangeValueStart != nil { + input.KeyConditions[rangeKey] = &dynamodb.Condition{ + AttributeValueList: []*dynamodb.AttributeValue{ + {B: query.RangeValueStart}, + }, + ComparisonOperator: aws.String(dynamodb.ComparisonOperatorGe), + } + } + + // Filters + if query.ValueEqual != nil { + input.FilterExpression = aws.String(fmt.Sprintf("%s = :v", valueKey)) + input.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ + ":v": { + B: query.ValueEqual, + }, + } + } + + request := a.queryRequestFn(ctx, input) + backoff := minBackoff + for page := request; page != nil; page = page.NextPage() { + err := instrument.TimeRequestHistogram(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error { + return page.Send() + }) + + if cc := page.Data().(*dynamodb.QueryOutput).ConsumedCapacity; cc != nil { + dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages"). + Add(float64(*cc.CapacityUnits)) + } + + if err != nil { + recordDynamoError(*input.TableName, err) + + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException { + time.Sleep(backoff) + backoff = nextBackoff(backoff) + continue + } + + return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, err) + } + + queryOutput := page.Data().(*dynamodb.QueryOutput) + if getNextPage := callback(dynamoDBReadBatch(queryOutput.Items), !page.HasNextPage()); !getNextPage { + if err != nil { + return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, page.Error()) + } + return nil + } + + backoff = minBackoff + } + + return nil +} + +type dynamoDBRequest interface { + NextPage() dynamoDBRequest + Send() error + Data() interface{} + Error() error + HasNextPage() bool +} + +func (a awsStorageClient) queryRequest(ctx context.Context, input *dynamodb.QueryInput) dynamoDBRequest { + req, _ := a.DynamoDB.QueryRequest(input) + req.SetContext(ctx) + return dynamoDBRequestAdapter{req} +} + +type dynamoDBRequestAdapter struct { + request *request.Request +} + +func (a dynamoDBRequestAdapter) NextPage() dynamoDBRequest { + next := a.request.NextPage() + if next == nil { + return nil + } + return dynamoDBRequestAdapter{next} +} + +func (a dynamoDBRequestAdapter) Data() interface{} { + return a.request.Data +} + +func (a dynamoDBRequestAdapter) Send() error { + return a.request.Send() +} + +func (a dynamoDBRequestAdapter) Error() error { + return a.request.Error +} + +func (a dynamoDBRequestAdapter) HasNextPage() bool { + return a.request.HasNextPage() +} + +func (a awsStorageClient) GetChunk(ctx context.Context, key string) ([]byte, error) { + var resp *s3.GetObjectOutput + err := instrument.TimeRequestHistogram(ctx, "S3.GetObject", s3RequestDuration, func(ctx context.Context) error { + var err error + resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(a.bucketName), + Key: aws.String(key), + }) + return err + }) + if err != nil { + return nil, err + } + defer resp.Body.Close() + buf, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return buf, nil +} + +func (a awsStorageClient) PutChunk(ctx context.Context, key string, buf []byte) error { + return instrument.TimeRequestHistogram(ctx, "S3.PutObject", s3RequestDuration, func(ctx context.Context) error { + _, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{ + Body: bytes.NewReader(buf), + Bucket: aws.String(a.bucketName), + Key: aws.String(key), + }) + return err + }) +} + +type dynamoDBWriteBatch map[string][]*dynamodb.WriteRequest + +func (b dynamoDBWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { + item := map[string]*dynamodb.AttributeValue{ + hashKey: {S: aws.String(hashValue)}, + rangeKey: {B: rangeValue}, + } + + if value != nil { + item[valueKey] = &dynamodb.AttributeValue{B: value} + } + + b[tableName] = append(b[tableName], &dynamodb.WriteRequest{ + PutRequest: &dynamodb.PutRequest{ + Item: item, + }, + }) +} + +type dynamoDBReadBatch []map[string]*dynamodb.AttributeValue + +func (b dynamoDBReadBatch) Len() int { + return len(b) +} + +func (b dynamoDBReadBatch) RangeValue(i int) []byte { + return b[i][rangeKey].B +} + +func (b dynamoDBReadBatch) Value(i int) []byte { + chunkValue, ok := b[i][valueKey] + if !ok { + return nil + } + return chunkValue.B +} + +type dynamoTableClient struct { + DynamoDB dynamodbiface.DynamoDBAPI +} + +// newDynamoTableClient makes a new DynamoTableClient. +func newDynamoTableClient(cfg DynamoDBConfig) (DynamoTableClient, error) { + dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) + if err != nil { + return nil, err + } + return dynamoTableClient{ + DynamoDB: dynamoDB, + }, nil +} + +func (d dynamoTableClient) ListTables(ctx context.Context) ([]string, error) { + table := []string{} + err := instrument.TimeRequestHistogram(ctx, "DynamoDB.ListTablesPages", dynamoRequestDuration, func(_ context.Context) error { + return d.DynamoDB.ListTablesPages(&dynamodb.ListTablesInput{}, func(resp *dynamodb.ListTablesOutput, _ bool) bool { + for _, s := range resp.TableNames { + table = append(table, *s) + } + return true + }) + }) + return table, err +} + +func (d dynamoTableClient) CreateTable(ctx context.Context, name string, readCapacity, writeCapacity int64) error { + return instrument.TimeRequestHistogram(ctx, "DynamoDB.CreateTable", dynamoRequestDuration, func(_ context.Context) error { + input := &dynamodb.CreateTableInput{ + TableName: aws.String(name), + AttributeDefinitions: []*dynamodb.AttributeDefinition{ + { + AttributeName: aws.String(hashKey), + AttributeType: aws.String(dynamodb.ScalarAttributeTypeS), + }, + { + AttributeName: aws.String(rangeKey), + AttributeType: aws.String(dynamodb.ScalarAttributeTypeB), + }, + }, + KeySchema: []*dynamodb.KeySchemaElement{ + { + AttributeName: aws.String(hashKey), + KeyType: aws.String(dynamodb.KeyTypeHash), + }, + { + AttributeName: aws.String(rangeKey), + KeyType: aws.String(dynamodb.KeyTypeRange), + }, + }, + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacity), + WriteCapacityUnits: aws.Int64(writeCapacity), + }, + } + _, err := d.DynamoDB.CreateTable(input) + return err + }) +} + +func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (readCapacity, writeCapacity int64, status string, err error) { + var out *dynamodb.DescribeTableOutput + instrument.TimeRequestHistogram(ctx, "DynamoDB.DescribeTable", dynamoRequestDuration, func(_ context.Context) error { + out, err = d.DynamoDB.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(name), + }) + readCapacity = *out.Table.ProvisionedThroughput.ReadCapacityUnits + writeCapacity = *out.Table.ProvisionedThroughput.WriteCapacityUnits + status = *out.Table.TableStatus + return err + }) + return +} + +func (d dynamoTableClient) UpdateTable(ctx context.Context, name string, readCapacity, writeCapacity int64) error { + return instrument.TimeRequestHistogram(ctx, "DynamoDB.UpdateTable", dynamoRequestDuration, func(_ context.Context) error { + _, err := d.DynamoDB.UpdateTable(&dynamodb.UpdateTableInput{ + TableName: aws.String(name), + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacity), + WriteCapacityUnits: aws.Int64(writeCapacity), + }, + }) + return err + }) +} + +func nextBackoff(lastBackoff time.Duration) time.Duration { + // Based on the "Decorrelated Jitter" approach from https://www.awsarchitectureblog.com/2015/03/backoff.html + // sleep = min(cap, random_between(base, sleep * 3)) + backoff := minBackoff + time.Duration(rand.Int63n(int64((lastBackoff*3)-minBackoff))) + if backoff > maxBackoff { + backoff = maxBackoff + } + return backoff +} + +func recordDynamoError(tableName string, err error) { + if awsErr, ok := err.(awserr.Error); ok { + dynamoFailures.WithLabelValues(tableName, awsErr.Code()).Add(float64(1)) + } else { + dynamoFailures.WithLabelValues(tableName, otherError).Add(float64(1)) + } +} + +func dictLen(b map[string][]*dynamodb.WriteRequest) int { + result := 0 + for _, reqs := range b { + result += len(reqs) + } + return result +} + +// Fill 'to' with WriteRequests from 'from' until 'to' has at most max requests. Remove those requests from 'from'. +func takeReqs(from, to map[string][]*dynamodb.WriteRequest, max int) { + outLen, inLen := dictLen(to), dictLen(from) + toFill := inLen + if max > 0 { + toFill = util.Min(inLen, max-outLen) + } + for toFill > 0 { + for tableName, fromReqs := range from { + taken := util.Min(len(fromReqs), toFill) + if taken > 0 { + to[tableName] = append(to[tableName], fromReqs[:taken]...) + from[tableName] = fromReqs[taken:] + toFill -= taken + } + } + } +} + +// dynamoClientFromURL creates a new DynamoDB client from a URL. +func dynamoClientFromURL(awsURL *url.URL) (dynamodbiface.DynamoDBAPI, error) { + if awsURL == nil { + return nil, fmt.Errorf("no URL specified for DynamoDB") + } + config, err := awsConfigFromURL(awsURL) + if err != nil { + return nil, err + } + return dynamodb.New(session.New(config)), nil +} + +// awsConfigFromURL returns AWS config from given URL. It expects escaped AWS Access key ID & Secret Access Key to be +// encoded in the URL. It also expects region specified as a host (letting AWS generate full endpoint) or fully valid +// endpoint with dummy region assumed (e.g for URLs to emulated services). +func awsConfigFromURL(awsURL *url.URL) (*aws.Config, error) { + if awsURL.User == nil { + return nil, fmt.Errorf("must specify escaped Access Key & Secret Access in URL") + } + + password, _ := awsURL.User.Password() + creds := credentials.NewStaticCredentials(awsURL.User.Username(), password, "") + config := aws.NewConfig(). + WithCredentials(creds). + WithMaxRetries(0) // We do our own retries, so we can monitor them + if strings.Contains(awsURL.Host, ".") { + return config.WithEndpoint(fmt.Sprintf("http://%s", awsURL.Host)).WithRegion("dummy"), nil + } + + // Let AWS generate default endpoint based on region passed as a host in URL. + return config.WithRegion(awsURL.Host), nil +} diff --git a/aws_storage_client_test.go b/aws_storage_client_test.go new file mode 100644 index 0000000000000..640582b1b8ff0 --- /dev/null +++ b/aws_storage_client_test.go @@ -0,0 +1,419 @@ +package chunk + +import ( + "bytes" + "fmt" + "net/url" + "sort" + "sync" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/prometheus/common/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" +) + +type mockDynamoDBClient struct { + dynamodbiface.DynamoDBAPI + + mtx sync.RWMutex + unprocessed int + provisionedErr int + tables map[string]*mockDynamoDBTable +} + +type mockDynamoDBTable struct { + items map[string][]mockDynamoDBItem +} + +type mockDynamoDBItem map[string]*dynamodb.AttributeValue + +func newMockDynamoDB(unprocessed int, provisionedErr int) *mockDynamoDBClient { + return &mockDynamoDBClient{ + tables: map[string]*mockDynamoDBTable{}, + unprocessed: unprocessed, + provisionedErr: provisionedErr, + } +} + +func (m *mockDynamoDBClient) createTable(name string) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.tables[name] = &mockDynamoDBTable{ + items: map[string][]mockDynamoDBItem{}, + } +} + +func (m *mockDynamoDBClient) BatchWriteItemWithContext(_ aws.Context, input *dynamodb.BatchWriteItemInput, _ ...request.Option) (*dynamodb.BatchWriteItemOutput, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + resp := &dynamodb.BatchWriteItemOutput{ + UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, + } + + if m.provisionedErr > 0 { + m.provisionedErr-- + return resp, awserr.New(provisionedThroughputExceededException, "", nil) + } + + for tableName, writeRequests := range input.RequestItems { + table, ok := m.tables[tableName] + if !ok { + return &dynamodb.BatchWriteItemOutput{}, fmt.Errorf("table not found") + } + + for _, writeRequest := range writeRequests { + if m.unprocessed > 0 { + m.unprocessed-- + resp.UnprocessedItems[tableName] = append(resp.UnprocessedItems[tableName], writeRequest) + continue + } + + hashValue := *writeRequest.PutRequest.Item[hashKey].S + rangeValue := writeRequest.PutRequest.Item[rangeKey].B + + items := table.items[hashValue] + + // insert in order + i := sort.Search(len(items), func(i int) bool { + return bytes.Compare(items[i][rangeKey].B, rangeValue) >= 0 + }) + if i >= len(items) || !bytes.Equal(items[i][rangeKey].B, rangeValue) { + items = append(items, nil) + copy(items[i+1:], items[i:]) + } else { + return &dynamodb.BatchWriteItemOutput{}, fmt.Errorf("Duplicate entry") + } + items[i] = writeRequest.PutRequest.Item + + table.items[hashValue] = items + } + } + return resp, nil +} + +func (m *mockDynamoDBClient) queryRequest(_ context.Context, input *dynamodb.QueryInput) dynamoDBRequest { + result := &dynamodb.QueryOutput{ + Items: []map[string]*dynamodb.AttributeValue{}, + } + + // Required filters + hashValue := *input.KeyConditions[hashKey].AttributeValueList[0].S + + // Optional filters + var ( + rangeValueFilter []byte + rangeValueFilterType string + ) + if c, ok := input.KeyConditions[rangeKey]; ok { + rangeValueFilter = c.AttributeValueList[0].B + rangeValueFilterType = *c.ComparisonOperator + } + + // Filter by HashValue, RangeValue and Value if it exists + items := m.tables[*input.TableName].items[hashValue] + for _, item := range items { + rangeValue := item[rangeKey].B + if rangeValueFilterType == dynamodb.ComparisonOperatorGe && bytes.Compare(rangeValue, rangeValueFilter) < 0 { + continue + } + if rangeValueFilterType == dynamodb.ComparisonOperatorBeginsWith && !bytes.HasPrefix(rangeValue, rangeValueFilter) { + continue + } + + if item[valueKey] != nil { + value := item[valueKey].B + + // Apply filterExpression if it exists (supporting only v = :v) + if input.FilterExpression != nil { + if *input.FilterExpression == fmt.Sprintf("%s = :v", valueKey) { + filterValue := input.ExpressionAttributeValues[":v"].B + if !bytes.Equal(value, filterValue) { + continue + } + } else { + log.Warnf("Unsupported FilterExpression: %s", *input.FilterExpression) + } + } + } + + result.Items = append(result.Items, item) + } + + return &dynamoDBMockRequest{ + result: result, + } +} + +type dynamoDBMockRequest struct { + result *dynamodb.QueryOutput +} + +func (m *dynamoDBMockRequest) NextPage() dynamoDBRequest { + return m +} +func (m *dynamoDBMockRequest) Send() error { + return nil +} +func (m *dynamoDBMockRequest) Data() interface{} { + return m.result +} +func (m *dynamoDBMockRequest) Error() error { + return nil +} +func (m *dynamoDBMockRequest) HasNextPage() bool { + return false +} + +func TestDynamoDBClient(t *testing.T) { + dynamoDB := newMockDynamoDB(0, 0) + client := awsStorageClient{ + DynamoDB: dynamoDB, + queryRequestFn: dynamoDB.queryRequest, + } + batch := client.NewWriteBatch() + for i := 0; i < 30; i++ { + batch.Add("table", fmt.Sprintf("hash%d", i), []byte(fmt.Sprintf("range%d", i)), nil) + } + dynamoDB.createTable("table") + + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + for i := 0; i < 30; i++ { + entry := IndexQuery{ + TableName: "table", + HashValue: fmt.Sprintf("hash%d", i), + } + var have []IndexEntry + err := client.QueryPages(context.Background(), entry, func(read ReadBatch, lastPage bool) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, IndexEntry{ + RangeValue: read.RangeValue(i), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, []IndexEntry{ + {RangeValue: []byte(fmt.Sprintf("range%d", i))}, + }, have) + } +} + +func TestDynamoDBClientQueryPages(t *testing.T) { + dynamoDB := newMockDynamoDB(0, 0) + client := awsStorageClient{ + DynamoDB: dynamoDB, + queryRequestFn: dynamoDB.queryRequest, + } + + entries := []IndexEntry{ + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("bar:3"), + Value: []byte("30"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:1"), + Value: []byte("10"), + }, + { + TableName: "table", + HashValue: "foo", + RangeValue: []byte("baz:2"), + Value: []byte("20"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:1"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:2"), + Value: []byte("abc"), + }, + { + TableName: "table", + HashValue: "flip", + RangeValue: []byte("bar:3"), + Value: []byte("abc"), + }, + } + + tests := []struct { + name string + query IndexQuery + want []IndexEntry + }{ + { + "check HashValue only", + IndexQuery{ + TableName: "table", + HashValue: "flip", + }, + []IndexEntry{entries[5], entries[6], entries[7]}, + }, + { + "check RangeValueStart", + IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValueStart: []byte("bar:2"), + }, + []IndexEntry{entries[1], entries[2], entries[3], entries[4]}, + }, + { + "check RangeValuePrefix", + IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("baz:"), + }, + []IndexEntry{entries[3], entries[4]}, + }, + { + "check ValueEqual", + IndexQuery{ + TableName: "table", + HashValue: "foo", + RangeValuePrefix: []byte("bar"), + ValueEqual: []byte("20"), + }, + []IndexEntry{entries[1]}, + }, + } + + batch := client.NewWriteBatch() + for _, entry := range entries { + batch.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + dynamoDB.createTable("table") + + err := client.BatchWrite(context.Background(), batch) + require.NoError(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var have []IndexEntry + err := client.QueryPages(context.Background(), tt.query, func(read ReadBatch, lastPage bool) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, IndexEntry{ + TableName: tt.query.TableName, + HashValue: tt.query.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return !lastPage + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + }) + } +} + +func TestAWSConfigFromURL(t *testing.T) { + for _, tc := range []struct { + url string + expectedKey string + expectedSecret string + expectedRegion string + expectedEp string + + expectedNotSpecifiedUserErr bool + }{ + { + "s3://abc:123@s3.default.svc.cluster.local:4569", + "abc", + "123", + "dummy", + "http://s3.default.svc.cluster.local:4569", + false, + }, + { + "dynamodb://user:pass@dynamodb.default.svc.cluster.local:8000/cortex", + "user", + "pass", + "dummy", + "http://dynamodb.default.svc.cluster.local:8000", + false, + }, + { + // Not escaped password. + "s3://abc:123/@s3.default.svc.cluster.local:4569", + "", + "", + "", + "", + true, + }, + { + // Not escaped username. + "s3://abc/:123@s3.default.svc.cluster.local:4569", + "", + "", + "", + "", + true, + }, + { + "s3://keyWithEscapedSlashAtTheEnd%2F:%24%2C%26%2C%2B%2C%27%2C%2F%2C%3A%2C%3B%2C%3D%2C%3F%2C%40@eu-west-2/bucket1", + "keyWithEscapedSlashAtTheEnd/", + "$,&,+,',/,:,;,=,?,@", + "eu-west-2", + "", + false, + }, + } { + parsedURL, err := url.Parse(tc.url) + require.NoError(t, err) + + cfg, err := awsConfigFromURL(parsedURL) + if tc.expectedNotSpecifiedUserErr { + require.Error(t, err) + continue + } + require.NoError(t, err) + + require.NotNil(t, cfg.Credentials) + val, err := cfg.Credentials.Get() + require.NoError(t, err) + + assert.Equal(t, tc.expectedKey, val.AccessKeyID) + assert.Equal(t, tc.expectedSecret, val.SecretAccessKey) + + require.NotNil(t, cfg.Region) + assert.Equal(t, tc.expectedRegion, *cfg.Region) + + if tc.expectedEp != "" { + require.NotNil(t, cfg.Endpoint) + assert.Equal(t, tc.expectedEp, *cfg.Endpoint) + } + } +} diff --git a/by_key.go b/by_key.go new file mode 100644 index 0000000000000..243ddf3aff127 --- /dev/null +++ b/by_key.go @@ -0,0 +1,114 @@ +package chunk + +// ByKey allow you to sort chunks by ID +type ByKey []Chunk + +func (cs ByKey) Len() int { return len(cs) } +func (cs ByKey) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } +func (cs ByKey) Less(i, j int) bool { return cs[i].externalKey() < cs[j].externalKey() } + +// unique will remove duplicates from the input. +// list must be sorted. +func unique(cs ByKey) ByKey { + if len(cs) == 0 { + return ByKey{} + } + + result := make(ByKey, 1, len(cs)) + result[0] = cs[0] + i, j := 0, 1 + for j < len(cs) { + if result[i].externalKey() == cs[j].externalKey() { + j++ + continue + } + result = append(result, cs[j]) + i++ + j++ + } + return result +} + +// merge will merge & dedupe two lists of chunks. +// list musts be sorted and not contain dupes. +func merge(a, b ByKey) ByKey { + result := make(ByKey, 0, len(a)+len(b)) + i, j := 0, 0 + for i < len(a) && j < len(b) { + if a[i].externalKey() < b[j].externalKey() { + result = append(result, a[i]) + i++ + } else if a[i].externalKey() > b[j].externalKey() { + result = append(result, b[j]) + j++ + } else { + result = append(result, a[i]) + i++ + j++ + } + } + for ; i < len(a); i++ { + result = append(result, a[i]) + } + for ; j < len(b); j++ { + result = append(result, b[j]) + } + return result +} + +// nWayUnion will merge and dedupe n lists of chunks. +// lists must be sorted and not contain dupes. +func nWayUnion(sets []ByKey) ByKey { + l := len(sets) + switch l { + case 0: + return ByKey{} + case 1: + return sets[0] + case 2: + return merge(sets[0], sets[1]) + default: + var ( + split = l / 2 + left = nWayUnion(sets[:split]) + right = nWayUnion(sets[split:]) + ) + return nWayUnion([]ByKey{left, right}) + } +} + +// nWayIntersect will interesct n sorted lists of chunks. +func nWayIntersect(sets []ByKey) ByKey { + l := len(sets) + switch l { + case 0: + return ByKey{} + case 1: + return sets[0] + case 2: + var ( + left, right = sets[0], sets[1] + i, j = 0, 0 + result = []Chunk{} + ) + for i < len(left) && j < len(right) { + if left[i].externalKey() == right[j].externalKey() { + result = append(result, left[i]) + } + + if left[i].externalKey() < right[j].externalKey() { + i++ + } else { + j++ + } + } + return result + default: + var ( + split = l / 2 + left = nWayIntersect(sets[:split]) + right = nWayIntersect(sets[split:]) + ) + return nWayIntersect([]ByKey{left, right}) + } +} diff --git a/by_key_test.go b/by_key_test.go new file mode 100644 index 0000000000000..0e3a1bb3823df --- /dev/null +++ b/by_key_test.go @@ -0,0 +1,99 @@ +package chunk + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" +) + +func c(id string) Chunk { + return Chunk{UserID: id} +} + +func TestUnique(t *testing.T) { + for _, tc := range []struct { + in ByKey + want ByKey + }{ + {nil, ByKey{}}, + {ByKey{c("a"), c("a")}, ByKey{c("a")}}, + {ByKey{c("a"), c("a"), c("b"), c("b"), c("c")}, ByKey{c("a"), c("b"), c("c")}}, + {ByKey{c("a"), c("b"), c("c")}, ByKey{c("a"), c("b"), c("c")}}, + } { + have := unique(tc.in) + if !reflect.DeepEqual(tc.want, have) { + assert.Equal(t, tc.want, have) + } + } +} + +func TestMerge(t *testing.T) { + type args struct { + a ByKey + b ByKey + } + for _, tc := range []struct { + args args + want ByKey + }{ + {args{ByKey{}, ByKey{}}, ByKey{}}, + {args{ByKey{c("a")}, ByKey{}}, ByKey{c("a")}}, + {args{ByKey{}, ByKey{c("b")}}, ByKey{c("b")}}, + {args{ByKey{c("a")}, ByKey{c("b")}}, ByKey{c("a"), c("b")}}, + { + args{ByKey{c("a"), c("c")}, ByKey{c("a"), c("b"), c("d")}}, + ByKey{c("a"), c("b"), c("c"), c("d")}, + }, + } { + have := merge(tc.args.a, tc.args.b) + if !reflect.DeepEqual(tc.want, have) { + assert.Equal(t, tc.want, have) + } + } +} + +func TestNWayUnion(t *testing.T) { + for _, tc := range []struct { + in []ByKey + want ByKey + }{ + {nil, ByKey{}}, + {[]ByKey{{c("a")}}, ByKey{c("a")}}, + {[]ByKey{{c("a")}, {c("a")}}, ByKey{c("a")}}, + {[]ByKey{{c("a")}, {}}, ByKey{c("a")}}, + {[]ByKey{{}, {c("b")}}, ByKey{c("b")}}, + {[]ByKey{{c("a")}, {c("b")}}, ByKey{c("a"), c("b")}}, + { + []ByKey{{c("a"), c("c"), c("e")}, {c("c"), c("d")}, {c("b")}}, + ByKey{c("a"), c("b"), c("c"), c("d"), c("e")}, + }, + { + []ByKey{{c("c"), c("d")}, {c("b")}, {c("a"), c("c"), c("e")}}, + ByKey{c("a"), c("b"), c("c"), c("d"), c("e")}, + }, + } { + have := nWayUnion(tc.in) + if !reflect.DeepEqual(tc.want, have) { + assert.Equal(t, tc.want, have) + } + } +} + +func TestNWayIntersect(t *testing.T) { + for _, tc := range []struct { + in []ByKey + want ByKey + }{ + {nil, ByKey{}}, + {[]ByKey{{c("a"), c("b"), c("c")}}, []Chunk{c("a"), c("b"), c("c")}}, + {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}}, ByKey{c("a"), c("c")}}, + {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}, {c("b")}}, ByKey{}}, + {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}, {c("a")}}, ByKey{c("a")}}, + } { + have := nWayIntersect(tc.in) + if !reflect.DeepEqual(tc.want, have) { + assert.Equal(t, tc.want, have) + } + } +} diff --git a/chunk.go b/chunk.go new file mode 100644 index 0000000000000..d331f165296ef --- /dev/null +++ b/chunk.go @@ -0,0 +1,320 @@ +package chunk + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "hash/crc32" + "io" + "strconv" + "strings" + + "github.com/golang/snappy" + "github.com/prometheus/common/model" + prom_chunk "github.com/prometheus/prometheus/storage/local/chunk" + + "github.com/weaveworks/common/errors" + "github.com/weaveworks/cortex/pkg/util" +) + +// Errors that decode can return +const ( + ErrInvalidChunkID = errors.Error("invalid chunk ID") + ErrInvalidChecksum = errors.Error("invalid chunk checksum") + ErrWrongMetadata = errors.Error("wrong chunk metadata") +) + +var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) + +// Chunk contains encoded timeseries data +type Chunk struct { + // These two fields will be missing from older chunks (as will the hash). + // On fetch we will initialise these fields from the DynamoDB key. + Fingerprint model.Fingerprint `json:"fingerprint"` + UserID string `json:"userID"` + + // These fields will be in all chunks, including old ones. + From model.Time `json:"from"` + Through model.Time `json:"through"` + Metric model.Metric `json:"metric"` + + // The hash is not written to the external storage either. We use + // crc32, Castagnoli table. See http://www.evanjones.ca/crc32c.html. + // For old chunks, ChecksumSet will be false. + ChecksumSet bool `json:"-"` + Checksum uint32 `json:"-"` + + // We never use Delta encoding (the zero value), so if this entry is + // missing, we default to DoubleDelta. + Encoding prom_chunk.Encoding `json:"encoding"` + Data prom_chunk.Chunk `json:"-"` + + // This flag is used for very old chunks, where the metadata is read out + // of the index. + metadataInIndex bool +} + +// NewChunk creates a new chunk +func NewChunk(userID string, fp model.Fingerprint, metric model.Metric, c prom_chunk.Chunk, from, through model.Time) Chunk { + return Chunk{ + Fingerprint: fp, + UserID: userID, + From: from, + Through: through, + Metric: metric, + Encoding: c.Encoding(), + Data: c, + } +} + +// parseExternalKey is used to construct a partially-populated chunk from the +// key in DynamoDB. This chunk can then be used to calculate the key needed +// to fetch the Chunk data from Memcache/S3, and then fully populate the chunk +// with decode(). +// +// Pre-checksums, the keys written to DynamoDB looked like +// `::` (aka the ID), and the key for +// memcache and S3 was `/::. +// Finger prints and times were written in base-10. +// +// Post-checksums, externals keys become the same across DynamoDB, Memcache +// and S3. Numbers become hex encoded. Keys look like: +// `/:::`. +func parseExternalKey(userID, externalKey string) (Chunk, error) { + if !strings.Contains(externalKey, "/") { + return parseLegacyChunkID(userID, externalKey) + } + chunk, err := parseNewExternalKey(externalKey) + if err != nil { + return Chunk{}, err + } + if chunk.UserID != userID { + return Chunk{}, ErrWrongMetadata + } + return chunk, nil +} + +func parseLegacyChunkID(userID, key string) (Chunk, error) { + parts := strings.Split(key, ":") + if len(parts) != 3 { + return Chunk{}, ErrInvalidChunkID + } + fingerprint, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return Chunk{}, err + } + from, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return Chunk{}, err + } + through, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return Chunk{}, err + } + return Chunk{ + UserID: userID, + Fingerprint: model.Fingerprint(fingerprint), + From: model.Time(from), + Through: model.Time(through), + }, nil +} + +func parseNewExternalKey(key string) (Chunk, error) { + parts := strings.Split(key, "/") + if len(parts) != 2 { + return Chunk{}, ErrInvalidChunkID + } + userID := parts[0] + hexParts := strings.Split(parts[1], ":") + if len(hexParts) != 4 { + return Chunk{}, ErrInvalidChunkID + } + fingerprint, err := strconv.ParseUint(hexParts[0], 16, 64) + if err != nil { + return Chunk{}, err + } + from, err := strconv.ParseInt(hexParts[1], 16, 64) + if err != nil { + return Chunk{}, err + } + through, err := strconv.ParseInt(hexParts[2], 16, 64) + if err != nil { + return Chunk{}, err + } + checksum, err := strconv.ParseUint(hexParts[3], 16, 32) + if err != nil { + return Chunk{}, err + } + return Chunk{ + UserID: userID, + Fingerprint: model.Fingerprint(fingerprint), + From: model.Time(from), + Through: model.Time(through), + Checksum: uint32(checksum), + ChecksumSet: true, + }, nil +} + +// externalKey returns the key you can use to fetch this chunk from external +// storage. For newer chunks, this key includes a checksum. +func (c *Chunk) externalKey() string { + // Some chunks have a checksum stored in dynamodb, some do not. We must + // generate keys appropriately. + if c.ChecksumSet { + // This is the inverse of parseNewExternalKey. + return fmt.Sprintf("%s/%x:%x:%x:%x", c.UserID, uint64(c.Fingerprint), int64(c.From), int64(c.Through), c.Checksum) + } + // This is the inverse of parseLegacyExternalKey, with "/" prepended. + // Legacy chunks had the user ID prefix on s3/memcache, but not in DynamoDB. + // See comment on parseExternalKey. + return fmt.Sprintf("%s/%d:%d:%d", c.UserID, uint64(c.Fingerprint), int64(c.From), int64(c.Through)) +} + +// encode writes the chunk out to a big write buffer, then calculates the checksum. +func (c *Chunk) encode() ([]byte, error) { + var buf bytes.Buffer + + // Write 4 empty bytes first - we will come back and put the len in here. + metadataLenBytes := [4]byte{} + if _, err := buf.Write(metadataLenBytes[:]); err != nil { + return nil, err + } + + // Encode chunk metadata into snappy-compressed buffer + if err := json.NewEncoder(snappy.NewWriter(&buf)).Encode(c); err != nil { + return nil, err + } + + // Write the metadata length back at the start of the buffer. + binary.BigEndian.PutUint32(metadataLenBytes[:], uint32(buf.Len())) + copy(buf.Bytes(), metadataLenBytes[:]) + + // Write the data length + dataLenBytes := [4]byte{} + binary.BigEndian.PutUint32(dataLenBytes[:], uint32(prom_chunk.ChunkLen)) + if _, err := buf.Write(dataLenBytes[:]); err != nil { + return nil, err + } + + // And now the chunk data + if err := c.Data.Marshal(&buf); err != nil { + return nil, err + } + + // Now work out the checksum + output := buf.Bytes() + c.ChecksumSet = true + c.Checksum = crc32.Checksum(output, castagnoliTable) + return output, nil +} + +// decode the chunk from the given buffer, and confirm the chunk is the one we +// expected. +func (c *Chunk) decode(input []byte) error { + // Legacy chunks were written with metadata in the index. + if c.metadataInIndex { + var err error + c.Data, err = prom_chunk.NewForEncoding(prom_chunk.DoubleDelta) + if err != nil { + return err + } + return c.Data.UnmarshalFromBuf(input) + } + + // First, calculate the checksum of the chunk and confirm it matches + // what we expected. + if c.ChecksumSet && c.Checksum != crc32.Checksum(input, castagnoliTable) { + return ErrInvalidChecksum + } + + // Now unmarshal the chunk metadata. + r := bytes.NewReader(input) + var metadataLen uint32 + if err := binary.Read(r, binary.BigEndian, &metadataLen); err != nil { + return err + } + var tempMetadata Chunk + err := json.NewDecoder(snappy.NewReader(&io.LimitedReader{ + N: int64(metadataLen), + R: r, + })).Decode(&tempMetadata) + if err != nil { + return err + } + + // Next, confirm the chunks matches what we expected. Easiest way to do this + // is to compare what the decoded data thinks its external ID would be, but + // we don't write the checksum to s3, so we have to copy the checksum in. + if c.ChecksumSet { + tempMetadata.Checksum, tempMetadata.ChecksumSet = c.Checksum, c.ChecksumSet + if c.externalKey() != tempMetadata.externalKey() { + return ErrWrongMetadata + } + } + *c = tempMetadata + + // Flag indicates if metadata was written to index, and if false implies + // we should read a header of the chunk containing the metadata. Exists + // for backwards compatibility with older chunks, which did not have header. + if c.Encoding == prom_chunk.Delta { + c.Encoding = prom_chunk.DoubleDelta + } + + // Finally, unmarshal the actual chunk data. + c.Data, err = prom_chunk.NewForEncoding(c.Encoding) + if err != nil { + return err + } + + var dataLen uint32 + if err := binary.Read(r, binary.BigEndian, &dataLen); err != nil { + return err + } + + return c.Data.Unmarshal(&io.LimitedReader{ + N: int64(dataLen), + R: r, + }) +} + +// ChunksToMatrix converts a slice of chunks into a model.Matrix. +func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) { + // Group chunks by series, sort and dedupe samples. + sampleStreams := map[model.Fingerprint]*model.SampleStream{} + for _, c := range chunks { + fp := c.Metric.Fingerprint() + ss, ok := sampleStreams[fp] + if !ok { + ss = &model.SampleStream{ + Metric: c.Metric, + } + sampleStreams[fp] = ss + } + + samples, err := c.samples() + if err != nil { + return nil, err + } + + ss.Values = util.MergeSamples(ss.Values, samples) + } + + matrix := make(model.Matrix, 0, len(sampleStreams)) + for _, ss := range sampleStreams { + matrix = append(matrix, ss) + } + + return matrix, nil +} + +func (c *Chunk) samples() ([]model.SamplePair, error) { + it := c.Data.NewIterator() + // TODO(juliusv): Pre-allocate this with the right length again once we + // add a method upstream to get the number of samples in a chunk. + var samples []model.SamplePair + for it.Scan() { + samples = append(samples, it.Value()) + } + return samples, nil +} diff --git a/chunk_cache.go b/chunk_cache.go new file mode 100644 index 0000000000000..044535f2aaabc --- /dev/null +++ b/chunk_cache.go @@ -0,0 +1,219 @@ +package chunk + +import ( + "flag" + "sync" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + "github.com/weaveworks/common/instrument" + "golang.org/x/net/context" +) + +var ( + memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_requests_total", + Help: "Total count of chunks requested from memcache.", + }) + + memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_hits_total", + Help: "Total count of chunks found in memcache.", + }) + + memcacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_corrupt_chunks_total", + Help: "Total count of corrupt chunks found in memcache.", + }) + + memcacheDroppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_dropped_write_back", + Help: "Total count of dropped write backs to memcache.", + }) + + memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "memcache_request_duration_seconds", + Help: "Total time spent in seconds doing memcache requests.", + // Memecache requests are very quick: smallest bucket is 16us, biggest is 1s + Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), + }, []string{"method", "status_code"}) +) + +func init() { + prometheus.MustRegister(memcacheRequests) + prometheus.MustRegister(memcacheHits) + prometheus.MustRegister(memcacheCorrupt) + prometheus.MustRegister(memcacheRequestDuration) +} + +// Memcache caches things +type Memcache interface { + GetMulti(keys []string) (map[string]*memcache.Item, error) + Set(item *memcache.Item) error +} + +// CacheConfig is config to make a Cache +type CacheConfig struct { + Expiration time.Duration + WriteBackGoroutines int + WriteBackBuffer int + memcacheConfig MemcacheConfig +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") + f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") + f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") + cfg.memcacheConfig.RegisterFlags(f) +} + +// Cache type caches chunks +type Cache struct { + cfg CacheConfig + memcache Memcache + + wg sync.WaitGroup + quit chan struct{} + bgWrites chan backgroundWrite +} + +type backgroundWrite struct { + key string + buf []byte +} + +// NewCache makes a new Cache +func NewCache(cfg CacheConfig) *Cache { + var memcache Memcache + if cfg.memcacheConfig.Host != "" { + memcache = NewMemcacheClient(cfg.memcacheConfig) + } + c := &Cache{ + cfg: cfg, + memcache: memcache, + quit: make(chan struct{}), + bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), + } + c.wg.Add(cfg.WriteBackGoroutines) + for i := 0; i < cfg.WriteBackGoroutines; i++ { + go c.writeBackLoop() + } + return c +} + +// Stop the background flushing goroutines. +func (c *Cache) Stop() { + close(c.quit) + c.wg.Wait() +} + +func memcacheStatusCode(err error) string { + // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables + switch err { + case nil: + return "200" + case memcache.ErrCacheMiss: + return "404" + case memcache.ErrMalformedKey: + return "400" + default: + return "500" + } +} + +// FetchChunkData gets chunks from the chunk cache. +func (c *Cache) FetchChunkData(ctx context.Context, chunks []Chunk) (found []Chunk, missing []Chunk, err error) { + if c.memcache == nil { + return nil, chunks, nil + } + + memcacheRequests.Add(float64(len(chunks))) + + keys := make([]string, 0, len(chunks)) + for _, chunk := range chunks { + keys = append(keys, chunk.externalKey()) + } + + var items map[string]*memcache.Item + err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + var err error + items, err = c.memcache.GetMulti(keys) + return err + }) + if err != nil { + return nil, chunks, err + } + + for i, externalKey := range keys { + item, ok := items[externalKey] + if !ok { + missing = append(missing, chunks[i]) + continue + } + + if err := chunks[i].decode(item.Value); err != nil { + memcacheCorrupt.Inc() + log.Errorf("Failed to decode chunk from cache: %v", err) + missing = append(missing, chunks[i]) + continue + } + + found = append(found, chunks[i]) + } + + memcacheHits.Add(float64(len(found))) + return found, missing, nil +} + +// StoreChunk serializes and stores a chunk in the chunk cache. +func (c *Cache) StoreChunk(ctx context.Context, key string, buf []byte) error { + if c.memcache == nil { + return nil + } + + return instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + item := memcache.Item{ + Key: key, + Value: buf, + Expiration: int32(c.cfg.Expiration.Seconds()), + } + return c.memcache.Set(&item) + }) +} + +// BackgroundWrite writes chunks for the cache in the background +func (c *Cache) BackgroundWrite(key string, buf []byte) { + bgWrite := backgroundWrite{ + key: key, + buf: buf, + } + select { + case c.bgWrites <- bgWrite: + default: + memcacheDroppedWriteBack.Inc() + } +} + +func (c *Cache) writeBackLoop() { + defer c.wg.Done() + + for { + select { + case bgWrite := <-c.bgWrites: + err := c.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) + if err != nil { + log.Errorf("Error writing to memcache: %v", err) + } + case <-c.quit: + return + } + } +} diff --git a/chunk_cache_test.go b/chunk_cache_test.go new file mode 100644 index 0000000000000..e66440ddace88 --- /dev/null +++ b/chunk_cache_test.go @@ -0,0 +1,114 @@ +package chunk + +import ( + "math/rand" + "sync" + "testing" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" +) + +type mockMemcache struct { + sync.RWMutex + contents map[string][]byte +} + +func newMockMemcache() *mockMemcache { + return &mockMemcache{ + contents: map[string][]byte{}, + } +} + +func (m *mockMemcache) GetMulti(keys []string) (map[string]*memcache.Item, error) { + m.RLock() + defer m.RUnlock() + result := map[string]*memcache.Item{} + for _, k := range keys { + if c, ok := m.contents[k]; ok { + result[k] = &memcache.Item{ + Value: c, + } + } + } + return result, nil +} + +func (m *mockMemcache) Set(item *memcache.Item) error { + m.Lock() + defer m.Unlock() + m.contents[item.Key] = item.Value + return nil +} + +func TestChunkCache(t *testing.T) { + c := Cache{ + memcache: newMockMemcache(), + } + + const ( + chunkLen = 13 * 3600 // in seconds + ) + + // put 100 chunks from 0 to 99 + keys := []string{} + chunks := []Chunk{} + for i := 0; i < 100; i++ { + ts := model.TimeFromUnix(int64(i * chunkLen)) + promChunk, _ := chunk.New().Add(model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(i), + }) + chunk := NewChunk( + userID, + model.Fingerprint(1), + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + }, + promChunk[0], + ts, + ts.Add(chunkLen), + ) + + buf, err := chunk.encode() + require.NoError(t, err) + + key := chunk.externalKey() + err = c.StoreChunk(context.Background(), key, buf) + require.NoError(t, err) + + keys = append(keys, key) + chunks = append(chunks, chunk) + } + + for i := 0; i < 100; i++ { + index := rand.Intn(len(keys)) + key := keys[index] + + chunk, err := parseExternalKey(userID, key) + require.NoError(t, err) + + found, missing, err := c.FetchChunkData(context.Background(), []Chunk{chunk}) + require.NoError(t, err) + require.Empty(t, missing) + require.Len(t, found, 1) + require.Equal(t, chunks[index], found[0]) + } + + // test getting them all + receivedChunks := []Chunk{} + for i := 0; i < len(keys); i++ { + chunk, err := parseExternalKey(userID, keys[i]) + require.NoError(t, err) + receivedChunks = append(receivedChunks, chunk) + } + found, missing, err := c.FetchChunkData(context.Background(), receivedChunks) + require.NoError(t, err) + require.Empty(t, missing) + require.Len(t, found, len(keys)) + require.Equal(t, chunks, receivedChunks) +} diff --git a/chunk_store.go b/chunk_store.go new file mode 100644 index 0000000000000..5248f4ebd3fbe --- /dev/null +++ b/chunk_store.go @@ -0,0 +1,526 @@ +package chunk + +import ( + "encoding/json" + "flag" + "fmt" + "sort" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage/metric" + "golang.org/x/net/context" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/util" +) + +var ( + indexEntriesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "chunk_store_index_entries_per_chunk", + Help: "Number of entries written to storage per chunk.", + Buckets: prometheus.ExponentialBuckets(1, 2, 5), + }) + rowWrites = util.NewHashBucketHistogram(util.HashBucketHistogramOpts{ + HistogramOpts: prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "chunk_store_row_writes_distribution", + Help: "Distribution of writes to individual storage rows", + Buckets: prometheus.DefBuckets, + }, + HashBuckets: 1024, + }) +) + +func init() { + prometheus.MustRegister(indexEntriesPerChunk) + prometheus.MustRegister(rowWrites) +} + +// StoreConfig specifies config for a ChunkStore +type StoreConfig struct { + SchemaConfig + CacheConfig + + // For injecting different schemas in tests. + schemaFactory func(cfg SchemaConfig) Schema +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.SchemaConfig.RegisterFlags(f) + cfg.CacheConfig.RegisterFlags(f) +} + +// Store implements Store +type Store struct { + cfg StoreConfig + + storage StorageClient + cache *Cache + schema Schema +} + +// NewStore makes a new ChunkStore +func NewStore(cfg StoreConfig, storage StorageClient) (*Store, error) { + var schema Schema + var err error + if cfg.schemaFactory == nil { + schema, err = newCompositeSchema(cfg.SchemaConfig) + } else { + schema = cfg.schemaFactory(cfg.SchemaConfig) + } + if err != nil { + return nil, err + } + + return &Store{ + cfg: cfg, + storage: storage, + schema: schema, + cache: NewCache(cfg.CacheConfig), + }, nil +} + +// Stop any background goroutines (ie in the cache.) +func (c *Store) Stop() { + c.cache.Stop() +} + +// Put implements ChunkStore +func (c *Store) Put(ctx context.Context, chunks []Chunk) error { + userID, err := user.Extract(ctx) + if err != nil { + return err + } + + // Encode the chunk first - checksum is calculated as a side effect. + bufs := [][]byte{} + keys := []string{} + for i := range chunks { + encoded, err := chunks[i].encode() + if err != nil { + return err + } + bufs = append(bufs, encoded) + keys = append(keys, chunks[i].externalKey()) + } + + err = c.putChunks(ctx, keys, bufs) + if err != nil { + return err + } + + return c.updateIndex(ctx, userID, chunks) +} + +// putChunks writes a collection of chunks to S3 in parallel. +func (c *Store) putChunks(ctx context.Context, keys []string, bufs [][]byte) error { + incomingErrors := make(chan error) + for i := range bufs { + go func(i int) { + incomingErrors <- c.putChunk(ctx, keys[i], bufs[i]) + }(i) + } + + var lastErr error + for range keys { + err := <-incomingErrors + if err != nil { + lastErr = err + } + } + return lastErr +} + +// putChunk puts a chunk into S3. +func (c *Store) putChunk(ctx context.Context, key string, buf []byte) error { + err := c.storage.PutChunk(ctx, key, buf) + if err != nil { + return err + } + + if err := c.cache.StoreChunk(ctx, key, buf); err != nil { + log.Warnf("Could not store %v in chunk cache: %v", key, err) + } + return nil +} + +func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error { + writeReqs, err := c.calculateDynamoWrites(userID, chunks) + if err != nil { + return err + } + + return c.storage.BatchWrite(ctx, writeReqs) +} + +// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all +// the chunks it is given. +func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch, error) { + seenIndexEntries := map[string]struct{}{} + + writeReqs := c.storage.NewWriteBatch() + for _, chunk := range chunks { + metricName, err := util.ExtractMetricNameFromMetric(chunk.Metric) + if err != nil { + return nil, err + } + + entries, err := c.schema.GetWriteEntries(chunk.From, chunk.Through, userID, metricName, chunk.Metric, chunk.externalKey()) + if err != nil { + return nil, err + } + indexEntriesPerChunk.Observe(float64(len(entries))) + + // Remove duplicate entries based on tableName:hashValue:rangeValue + unseenEntries := []IndexEntry{} + for _, entry := range entries { + key := fmt.Sprintf("%s:%s:%x", entry.TableName, entry.HashValue, entry.RangeValue) + if _, ok := seenIndexEntries[key]; !ok { + seenIndexEntries[key] = struct{}{} + unseenEntries = append(unseenEntries, entry) + } + } + + for _, entry := range unseenEntries { + rowWrites.Observe(entry.HashValue, 1) + writeReqs.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + } + return writeReqs, nil +} + +// Get implements ChunkStore +func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) { + if through < from { + return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from) + } + + filters, matchers := util.SplitFiltersAndMatchers(allMatchers) + + // Fetch chunk descriptors (just ID really) from storage + chunks, err := c.lookupChunksByMatchers(ctx, from, through, matchers) + if err != nil { + return nil, promql.ErrStorage(err) + } + + // Filter out chunks that are not in the selected time range. + filtered := make([]Chunk, 0, len(chunks)) + for _, chunk := range chunks { + if chunk.Through < from || through < chunk.From { + continue + } + filtered = append(filtered, chunk) + } + + // Now fetch the actual chunk data from Memcache / S3 + fromCache, missing, err := c.cache.FetchChunkData(ctx, filtered) + if err != nil { + log.Warnf("Error fetching from cache: %v", err) + } + + fromS3, err := c.fetchChunkData(ctx, missing) + if err != nil { + return nil, promql.ErrStorage(err) + } + + if err = c.writeBackCache(ctx, fromS3); err != nil { + log.Warnf("Could not store chunks in chunk cache: %v", err) + } + + // TODO instead of doing this sort, propagate an index and assign chunks + // into the result based on that index. + allChunks := append(fromCache, fromS3...) + sort.Sort(ByKey(allChunks)) + + // Filter out chunks + filteredChunks := make([]Chunk, 0, len(allChunks)) +outer: + for _, chunk := range allChunks { + for _, filter := range filters { + if !filter.Match(chunk.Metric[filter.Name]) { + continue outer + } + } + + filteredChunks = append(filteredChunks, chunk) + } + + return filteredChunks, nil +} + +func (c *Store) lookupChunksByMatchers(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) { + metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(matchers) + + // Only lookup by metric name if the matcher is of type equal, otherwise we + // have to fetch chunks for all metric names as other metric names could match. + if ok && metricNameMatcher.Type == metric.Equal { + return c.lookupChunksByMetricName(ctx, from, through, matchers, metricNameMatcher.Value) + } + + userID, err := user.Extract(ctx) + if err != nil { + return nil, err + } + + // If there is no metric name, we want return chunks for all metric names + metricNameQueries, err := c.schema.GetReadQueries(from, through, userID) + if err != nil { + return nil, err + } + metricNameEntries, err := c.lookupEntriesByQueries(ctx, metricNameQueries) + if err != nil { + return nil, err + } + + incomingChunkSets := make(chan ByKey) + incomingErrors := make(chan error) + skippedMetricNames := 0 + + for _, metricNameEntry := range metricNameEntries { + metricName, err := parseMetricNameRangeValue(metricNameEntry.RangeValue, metricNameEntry.Value) + if err != nil { + return nil, err + } + + // We are fetching all metric name chunks, however if there is a metricNameMatcher, + // we only want metric names that match + if ok && !metricNameMatcher.Match(metricName) { + skippedMetricNames++ + continue + } + + go func(metricName model.LabelValue) { + chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName) + if err != nil { + incomingErrors <- err + } else { + incomingChunkSets <- chunks + } + }(metricName) + } + + var chunkSets []ByKey + var lastErr error + for i := 0; i < (len(metricNameEntries) - skippedMetricNames); i++ { + select { + case incoming := <-incomingChunkSets: + chunkSets = append(chunkSets, incoming) + case err := <-incomingErrors: + lastErr = err + } + } + + return nWayUnion(chunkSets), lastErr +} + +func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher, metricName model.LabelValue) ([]Chunk, error) { + userID, err := user.Extract(ctx) + if err != nil { + return nil, err + } + + // Just get chunks for metric if there are no matchers + if len(matchers) == 0 { + queries, err := c.schema.GetReadQueriesForMetric(from, through, userID, metricName) + if err != nil { + return nil, err + } + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + return nil, err + } + + return c.convertIndexEntriesToChunks(ctx, entries, nil) + } + + // Otherwise get chunks which include other matchers + incomingChunkSets := make(chan ByKey) + incomingErrors := make(chan error) + for _, matcher := range matchers { + go func(matcher *metric.LabelMatcher) { + // Lookup IndexQuery's + var queries []IndexQuery + var err error + if matcher.Type != metric.Equal { + queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name) + } else { + queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value) + } + if err != nil { + incomingErrors <- err + return + } + + // Lookup IndexEntry's + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + incomingErrors <- err + return + } + + // Convert IndexEntry's into chunks + chunks, err := c.convertIndexEntriesToChunks(ctx, entries, matcher) + if err != nil { + incomingErrors <- err + } else { + incomingChunkSets <- chunks + } + }(matcher) + } + + // Receive chunkSets from all matchers + var chunkSets []ByKey + var lastErr error + for i := 0; i < len(matchers); i++ { + select { + case incoming := <-incomingChunkSets: + chunkSets = append(chunkSets, incoming) + case err := <-incomingErrors: + lastErr = err + } + } + + // Merge chunkSets in order because we wish to keep label series together consecutively + return nWayIntersect(chunkSets), lastErr +} + +func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { + incomingEntries := make(chan []IndexEntry) + incomingErrors := make(chan error) + for _, query := range queries { + go func(query IndexQuery) { + entries, err := c.lookupEntriesByQuery(ctx, query) + if err != nil { + incomingErrors <- err + } else { + incomingEntries <- entries + } + }(query) + } + + // Combine the results into one slice + var entries []IndexEntry + var lastErr error + for i := 0; i < len(queries); i++ { + select { + case incoming := <-incomingEntries: + entries = append(entries, incoming...) + case err := <-incomingErrors: + lastErr = err + } + } + + return entries, lastErr +} + +func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) { + var entries []IndexEntry + + if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch, lastPage bool) (shouldContinue bool) { + for i := 0; i < resp.Len(); i++ { + entries = append(entries, IndexEntry{ + TableName: query.TableName, + HashValue: query.HashValue, + RangeValue: resp.RangeValue(i), + Value: resp.Value(i), + }) + } + return !lastPage + }); err != nil { + log.Errorf("Error querying storage: %v", err) + return nil, err + } + + return entries, nil +} + +func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []IndexEntry, matcher *metric.LabelMatcher) (ByKey, error) { + userID, err := user.Extract(ctx) + if err != nil { + return nil, err + } + + var chunkSet ByKey + + for _, entry := range entries { + chunkKey, labelValue, metadataInIndex, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + if err != nil { + return nil, err + } + + chunk, err := parseExternalKey(userID, chunkKey) + if err != nil { + return nil, err + } + + // This can be removed in Dev 2017, 13 months after the last chunks + // was written with metadata in the index. + if metadataInIndex && entry.Value != nil { + if err := json.Unmarshal(entry.Value, &chunk); err != nil { + return nil, err + } + chunk.metadataInIndex = true + } + + if matcher != nil && !matcher.Match(labelValue) { + log.Debug("Dropping chunk for non-matching metric ", chunk.Metric) + continue + } + chunkSet = append(chunkSet, chunk) + } + + // Return chunks sorted and deduped because they will be merged with other sets + sort.Sort(chunkSet) + return unique(chunkSet), nil +} + +func (c *Store) fetchChunkData(ctx context.Context, chunkSet []Chunk) ([]Chunk, error) { + incomingChunks := make(chan Chunk) + incomingErrors := make(chan error) + for _, chunk := range chunkSet { + go func(chunk Chunk) { + buf, err := c.storage.GetChunk(ctx, chunk.externalKey()) + if err != nil { + incomingErrors <- err + return + } + if err := chunk.decode(buf); err != nil { + incomingErrors <- err + return + } + incomingChunks <- chunk + }(chunk) + } + + chunks := []Chunk{} + errors := []error{} + for i := 0; i < len(chunkSet); i++ { + select { + case chunk := <-incomingChunks: + chunks = append(chunks, chunk) + case err := <-incomingErrors: + errors = append(errors, err) + } + } + if len(errors) > 0 { + return nil, errors[0] + } + return chunks, nil +} + +func (c *Store) writeBackCache(_ context.Context, chunks []Chunk) error { + for i := range chunks { + encoded, err := chunks[i].encode() + if err != nil { + return err + } + c.cache.BackgroundWrite(chunks[i].externalKey(), encoded) + } + return nil +} diff --git a/chunk_store_test.go b/chunk_store_test.go new file mode 100644 index 0000000000000..53225fb2d19db --- /dev/null +++ b/chunk_store_test.go @@ -0,0 +1,424 @@ +package chunk + +import ( + "fmt" + "math/rand" + "reflect" + "testing" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" + "github.com/prometheus/prometheus/storage/metric" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + + "github.com/weaveworks/common/test" + "github.com/weaveworks/common/user" +) + +// newTestStore creates a new Store for testing. +func newTestChunkStore(t *testing.T, cfg StoreConfig) *Store { + storage := NewMockStorage() + tableManager, err := NewDynamoTableManager(TableManagerConfig{}, storage) + require.NoError(t, err) + err = tableManager.syncTables(context.Background()) + require.NoError(t, err) + store, err := NewStore(cfg, storage) + require.NoError(t, err) + return store +} + +func TestChunkStore(t *testing.T) { + ctx := user.Inject(context.Background(), userID) + now := model.Now() + chunk1 := dummyChunkFor(model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + "toms": "code", + "flip": "flop", + }) + chunk2 := dummyChunkFor(model.Metric{ + model.MetricNameLabel: "foo", + "bar": "beep", + "toms": "code", + }) + + schemas := []struct { + name string + fn func(cfg SchemaConfig) Schema + }{ + {"v1 schema", v1Schema}, + {"v2 schema", v2Schema}, + {"v3 schema", v3Schema}, + {"v4 schema", v4Schema}, + {"v5 schema", v5Schema}, + {"v6 schema", v6Schema}, + {"v7 schema", v7Schema}, + } + + nameMatcher := mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo") + + for _, tc := range []struct { + query string + expect []Chunk + matchers []*metric.LabelMatcher + }{ + { + `foo`, + []Chunk{chunk1, chunk2}, + []*metric.LabelMatcher{nameMatcher}, + }, + { + `foo{flip=""}`, + []Chunk{chunk2}, + []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.Equal, "flip", "")}, + }, + { + `foo{bar="baz"}`, + []Chunk{chunk1}, + []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.Equal, "bar", "baz")}, + }, + { + `foo{bar="beep"}`, + []Chunk{chunk2}, + []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.Equal, "bar", "beep")}, + }, + { + `foo{toms="code"}`, + []Chunk{chunk1, chunk2}, + []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.Equal, "toms", "code")}, + }, + { + `foo{bar!="baz"}`, + []Chunk{chunk2}, + []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.NotEqual, "bar", "baz")}, + }, + { + `foo{bar=~"beep|baz"}`, + []Chunk{chunk1, chunk2}, + []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.RegexMatch, "bar", "beep|baz")}, + }, + { + `foo{toms="code", bar=~"beep|baz"}`, + []Chunk{chunk1, chunk2}, + []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.Equal, "toms", "code"), mustNewLabelMatcher(metric.RegexMatch, "bar", "beep|baz")}, + }, + { + `foo{toms="code", bar="baz"}`, + []Chunk{chunk1}, []*metric.LabelMatcher{nameMatcher, mustNewLabelMatcher(metric.Equal, "toms", "code"), mustNewLabelMatcher(metric.Equal, "bar", "baz")}, + }, + } { + for _, schema := range schemas { + t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { + log.Infoln("========= Running query", tc.query, "with schema", schema.name) + store := newTestChunkStore(t, StoreConfig{ + schemaFactory: schema.fn, + }) + + if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { + t.Fatal(err) + } + + chunks, err := store.Get(ctx, now.Add(-time.Hour), now, tc.matchers...) + require.NoError(t, err) + + // Zero out the checksums, as the inputs above didn't have the checksums calculated + for i := range chunks { + chunks[i].Checksum = 0 + chunks[i].ChecksumSet = false + } + + if !reflect.DeepEqual(tc.expect, chunks) { + t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, chunks)) + } + }) + } + } +} + +// TestChunkStoreMetricNames tests no metric name queries supported from v7Schema +func TestChunkStoreMetricNames(t *testing.T) { + ctx := user.Inject(context.Background(), userID) + now := model.Now() + + foo1Chunk1 := dummyChunkFor(model.Metric{ + model.MetricNameLabel: "foo1", + "bar": "baz", + "toms": "code", + "flip": "flop", + }) + foo1Chunk2 := dummyChunkFor(model.Metric{ + model.MetricNameLabel: "foo1", + "bar": "beep", + "toms": "code", + }) + foo2Chunk := dummyChunkFor(model.Metric{ + model.MetricNameLabel: "foo2", + "bar": "beep", + "toms": "code", + }) + foo3Chunk := dummyChunkFor(model.Metric{ + model.MetricNameLabel: "foo3", + "bar": "beep", + "toms": "code", + }) + + schemas := []struct { + name string + fn func(cfg SchemaConfig) Schema + }{ + {"v7 schema", v7Schema}, + } + + for _, tc := range []struct { + query string + expect []Chunk + matchers []*metric.LabelMatcher + }{ + { + `foo1`, + []Chunk{foo1Chunk1, foo1Chunk2}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo1")}, + }, + { + `foo2`, + []Chunk{foo2Chunk}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo2")}, + }, + { + `foo3`, + []Chunk{foo3Chunk}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo3")}, + }, + + // When name matcher is used without Equal, start matching all metric names + // however still filter out metric names which do not match query + { + `{__name__!="foo1"}`, + []Chunk{foo3Chunk, foo2Chunk}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.NotEqual, model.MetricNameLabel, "foo1")}, + }, + { + `{__name__=~"foo1|foo2"}`, + []Chunk{foo1Chunk1, foo2Chunk, foo1Chunk2}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.RegexMatch, model.MetricNameLabel, "foo1|foo2")}, + }, + + // No metric names + { + `{bar="baz"}`, + []Chunk{foo1Chunk1}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, "bar", "baz")}, + }, + { + `{bar="beep"}`, + []Chunk{foo3Chunk, foo2Chunk, foo1Chunk2}, // doesn't match foo1 chunk1 + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, "bar", "beep")}, + }, + { + `{flip=""}`, + []Chunk{foo3Chunk, foo2Chunk, foo1Chunk2}, // doesn't match foo1 chunk1 as it has a flip value + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, "flip", "")}, + }, + { + `{bar!="beep"}`, + []Chunk{foo1Chunk1}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.NotEqual, "bar", "beep")}, + }, + { + `{bar=~"beep|baz"}`, + []Chunk{foo3Chunk, foo1Chunk1, foo2Chunk, foo1Chunk2}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.RegexMatch, "bar", "beep|baz")}, + }, + { + `{toms="code", bar=~"beep|baz"}`, + []Chunk{foo3Chunk, foo1Chunk1, foo2Chunk, foo1Chunk2}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, "toms", "code"), mustNewLabelMatcher(metric.RegexMatch, "bar", "beep|baz")}, + }, + { + `{toms="code", bar="baz"}`, + []Chunk{foo1Chunk1}, + []*metric.LabelMatcher{mustNewLabelMatcher(metric.Equal, "toms", "code"), mustNewLabelMatcher(metric.Equal, "bar", "baz")}, + }, + } { + for _, schema := range schemas { + t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { + log.Infoln("========= Running query", tc.query, "with schema", schema.name) + store := newTestChunkStore(t, StoreConfig{ + schemaFactory: schema.fn, + }) + + if err := store.Put(ctx, []Chunk{foo1Chunk1, foo1Chunk2, foo2Chunk, foo3Chunk}); err != nil { + t.Fatal(err) + } + + chunks, err := store.Get(ctx, now.Add(-time.Hour), now, tc.matchers...) + require.NoError(t, err) + + // Zero out the checksums, as the inputs above didn't have the checksums calculated + for i := range chunks { + chunks[i].Checksum = 0 + chunks[i].ChecksumSet = false + } + + if !reflect.DeepEqual(tc.expect, chunks) { + t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, chunks)) + } + }) + } + } +} + +func mustNewLabelMatcher(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher { + matcher, err := metric.NewLabelMatcher(matchType, name, value) + if err != nil { + panic(err) + } + return matcher +} + +func TestChunkStoreRandom(t *testing.T) { + ctx := user.Inject(context.Background(), userID) + schemas := []struct { + name string + fn func(cfg SchemaConfig) Schema + store *Store + }{ + {name: "v1 schema", fn: v1Schema}, + {name: "v2 schema", fn: v2Schema}, + {name: "v3 schema", fn: v3Schema}, + {name: "v4 schema", fn: v4Schema}, + {name: "v5 schema", fn: v5Schema}, + {name: "v6 schema", fn: v6Schema}, + {name: "v7 schema", fn: v7Schema}, + } + + for i := range schemas { + schemas[i].store = newTestChunkStore(t, StoreConfig{ + schemaFactory: schemas[i].fn, + }) + } + + // put 100 chunks from 0 to 99 + const chunkLen = 13 * 3600 // in seconds + for i := 0; i < 100; i++ { + ts := model.TimeFromUnix(int64(i * chunkLen)) + chunks, _ := chunk.New().Add(model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(float64(i)), + }) + chunk := NewChunk( + userID, + model.Fingerprint(1), + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + }, + chunks[0], + ts, + ts.Add(chunkLen*time.Second), + ) + for _, s := range schemas { + err := s.store.Put(ctx, []Chunk{chunk}) + require.NoError(t, err) + } + } + + // pick two random numbers and do a query + for i := 0; i < 100; i++ { + start := rand.Int63n(100 * chunkLen) + end := start + rand.Int63n((100*chunkLen)-start) + assert.True(t, start < end) + + startTime := model.TimeFromUnix(start) + endTime := model.TimeFromUnix(end) + + for _, s := range schemas { + chunks, err := s.store.Get(ctx, startTime, endTime, + mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo"), + mustNewLabelMatcher(metric.Equal, "bar", "baz"), + ) + require.NoError(t, err) + + // We need to check that each chunk is in the time range + for _, chunk := range chunks { + assert.False(t, chunk.From.After(endTime)) + assert.False(t, chunk.Through.Before(startTime)) + samples, err := chunk.samples() + assert.NoError(t, err) + assert.Equal(t, 1, len(samples)) + // TODO verify chunk contents + } + + // And check we got all the chunks we want + numChunks := (end / chunkLen) - (start / chunkLen) + 1 + assert.Equal(t, int(numChunks), len(chunks), s.name) + } + } +} + +func TestChunkStoreLeastRead(t *testing.T) { + // Test we don't read too much from the index + ctx := user.Inject(context.Background(), userID) + store := newTestChunkStore(t, StoreConfig{ + schemaFactory: v6Schema, + }) + + // Put 24 chunks 1hr chunks in the store + const chunkLen = 60 // in seconds + for i := 0; i < 24; i++ { + ts := model.TimeFromUnix(int64(i * chunkLen)) + chunks, _ := chunk.New().Add(model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(float64(i)), + }) + chunk := NewChunk( + userID, + model.Fingerprint(1), + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + }, + chunks[0], + ts, + ts.Add(chunkLen*time.Second), + ) + log.Infof("Loop %d", i) + err := store.Put(ctx, []Chunk{chunk}) + require.NoError(t, err) + } + + // pick a random numbers and do a query to end of row + for i := 1; i < 24; i++ { + start := int64(i * chunkLen) + end := int64(24 * chunkLen) + assert.True(t, start <= end) + + startTime := model.TimeFromUnix(start) + endTime := model.TimeFromUnix(end) + + chunks, err := store.Get(ctx, startTime, endTime, + mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo"), + mustNewLabelMatcher(metric.Equal, "bar", "baz"), + ) + if err != nil { + t.Fatal(t, err) + } + + // We need to check that each chunk is in the time range + for _, chunk := range chunks { + assert.False(t, chunk.From.After(endTime)) + assert.False(t, chunk.Through.Before(startTime)) + samples, err := chunk.samples() + assert.NoError(t, err) + assert.Equal(t, 1, len(samples)) + } + + // And check we got all the chunks we want + numChunks := 24 - (start / chunkLen) + 1 + assert.Equal(t, int(numChunks), len(chunks)) + } +} diff --git a/chunk_test.go b/chunk_test.go new file mode 100644 index 0000000000000..5b71b68579548 --- /dev/null +++ b/chunk_test.go @@ -0,0 +1,123 @@ +package chunk + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/local/chunk" + "github.com/stretchr/testify/require" +) + +const userID = "userID" + +func dummyChunk() Chunk { + return dummyChunkFor(model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + "toms": "code", + }) +} + +func dummyChunkFor(metric model.Metric) Chunk { + now := model.Now() + cs, _ := chunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) + chunk := NewChunk( + userID, + metric.Fingerprint(), + metric, + cs[0], + now.Add(-time.Hour), + now, + ) + return chunk +} + +func TestChunkCodec(t *testing.T) { + for i, c := range []struct { + chunk Chunk + err error + f func(*Chunk, []byte) + }{ + // Basic round trip + {chunk: dummyChunk()}, + + // Checksum should fail + { + chunk: dummyChunk(), + err: ErrInvalidChecksum, + f: func(_ *Chunk, buf []byte) { buf[4]++ }, + }, + + // Checksum should fail + { + chunk: dummyChunk(), + err: ErrInvalidChecksum, + f: func(c *Chunk, _ []byte) { c.Checksum = 123 }, + }, + + // Metadata test should fail + { + chunk: dummyChunk(), + err: ErrWrongMetadata, + f: func(c *Chunk, _ []byte) { c.Fingerprint++ }, + }, + + // Metadata test should fail + { + chunk: dummyChunk(), + err: ErrWrongMetadata, + f: func(c *Chunk, _ []byte) { c.UserID = "foo" }, + }, + } { + t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { + buf, err := c.chunk.encode() + require.NoError(t, err) + + have, err := parseExternalKey(userID, c.chunk.externalKey()) + require.NoError(t, err) + + if c.f != nil { + c.f(&have, buf) + } + + err = have.decode(buf) + require.Equal(t, err, c.err) + + if c.err == nil { + require.Equal(t, have, c.chunk) + } + }) + } +} + +func TestParseExternalKey(t *testing.T) { + for _, c := range []struct { + key string + chunk Chunk + err error + }{ + {key: "2:1484661279394:1484664879394", chunk: Chunk{ + UserID: userID, + Fingerprint: model.Fingerprint(2), + From: model.Time(1484661279394), + Through: model.Time(1484664879394), + }}, + + {key: userID + "/2:270d8f00:270d8f00:f84c5745", chunk: Chunk{ + UserID: userID, + Fingerprint: model.Fingerprint(2), + From: model.Time(655200000), + Through: model.Time(655200000), + ChecksumSet: true, + Checksum: 4165752645, + }}, + + {key: "invalidUserID/2:270d8f00:270d8f00:f84c5745", chunk: Chunk{}, err: ErrWrongMetadata}, + } { + chunk, err := parseExternalKey(userID, c.key) + require.Equal(t, c.err, err) + require.Equal(t, c.chunk, chunk) + } +} diff --git a/inmemory_storage_client.go b/inmemory_storage_client.go new file mode 100644 index 0000000000000..e1458f7c3228d --- /dev/null +++ b/inmemory_storage_client.go @@ -0,0 +1,280 @@ +package chunk + +import ( + "bytes" + "fmt" + "sort" + "sync" + + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/prometheus/common/log" + "golang.org/x/net/context" +) + +// MockStorage is a fake in-memory StorageClient. +type MockStorage struct { + mtx sync.RWMutex + tables map[string]*mockTable + objects map[string][]byte +} + +type mockTable struct { + items map[string][]mockItem + write, read int64 +} + +type mockItem struct { + rangeValue []byte + value []byte +} + +// NewMockStorage creates a new MockStorage. +func NewMockStorage() *MockStorage { + return &MockStorage{ + tables: map[string]*mockTable{}, + objects: map[string][]byte{}, + } +} + +// ListTables implements StorageClient. +func (m *MockStorage) ListTables(_ context.Context) ([]string, error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + var tableNames []string + for tableName := range m.tables { + func(tableName string) { + tableNames = append(tableNames, tableName) + }(tableName) + } + return tableNames, nil +} + +// CreateTable implements StorageClient. +func (m *MockStorage) CreateTable(_ context.Context, name string, read, write int64) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + if _, ok := m.tables[name]; ok { + return fmt.Errorf("table already exists") + } + + m.tables[name] = &mockTable{ + items: map[string][]mockItem{}, + write: write, + read: read, + } + + return nil +} + +// DescribeTable implements StorageClient. +func (m *MockStorage) DescribeTable(_ context.Context, name string) (readCapacity, writeCapacity int64, status string, err error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + table, ok := m.tables[name] + if !ok { + return 0, 0, "", fmt.Errorf("not found") + } + + return table.read, table.write, dynamodb.TableStatusActive, nil +} + +// UpdateTable implements StorageClient. +func (m *MockStorage) UpdateTable(_ context.Context, name string, readCapacity, writeCapacity int64) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + table, ok := m.tables[name] + if !ok { + return fmt.Errorf("not found") + } + + table.read = readCapacity + table.write = writeCapacity + + return nil +} + +// NewWriteBatch implements StorageClient. +func (m *MockStorage) NewWriteBatch() WriteBatch { + return &mockWriteBatch{} +} + +// BatchWrite implements StorageClient. +func (m *MockStorage) BatchWrite(_ context.Context, batch WriteBatch) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + mockBatch := *batch.(*mockWriteBatch) + seenWrites := map[string]bool{} + + for _, req := range mockBatch { + table, ok := m.tables[req.tableName] + if !ok { + return fmt.Errorf("table not found") + } + + // Check for duplicate writes by RangeKey in same batch + key := fmt.Sprintf("%s:%s:%x", req.tableName, req.hashValue, req.rangeValue) + if _, ok := seenWrites[key]; ok { + return fmt.Errorf("Dupe write in batch") + } + seenWrites[key] = true + + log.Debugf("Write %s/%x", req.hashValue, req.rangeValue) + + items := table.items[req.hashValue] + + // insert in order + i := sort.Search(len(items), func(i int) bool { + return bytes.Compare(items[i].rangeValue, req.rangeValue) >= 0 + }) + if i >= len(items) || !bytes.Equal(items[i].rangeValue, req.rangeValue) { + items = append(items, mockItem{}) + copy(items[i+1:], items[i:]) + } else { + // Return error if duplicate write and not metric name entry + itemComponents := decodeRangeKey(items[i].rangeValue) + if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) { + return fmt.Errorf("Dupe write") + } + } + items[i] = mockItem{ + rangeValue: req.rangeValue, + value: req.value, + } + + table.items[req.hashValue] = items + } + return nil +} + +// QueryPages implements StorageClient. +func (m *MockStorage) QueryPages(_ context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error { + m.mtx.RLock() + defer m.mtx.RUnlock() + + table, ok := m.tables[query.TableName] + if !ok { + return fmt.Errorf("table not found") + } + + items, ok := table.items[query.HashValue] + if !ok { + return nil + } + + if query.RangeValuePrefix != nil { + log.Debugf("Lookup prefix %s/%x (%d)", query.HashValue, query.RangeValuePrefix, len(items)) + + // the smallest index i in [0, n) at which f(i) is true + i := sort.Search(len(items), func(i int) bool { + if bytes.Compare(items[i].rangeValue, query.RangeValuePrefix) > 0 { + return true + } + return bytes.HasPrefix(items[i].rangeValue, query.RangeValuePrefix) + }) + j := sort.Search(len(items)-i, func(j int) bool { + if bytes.Compare(items[i+j].rangeValue, query.RangeValuePrefix) < 0 { + return false + } + return !bytes.HasPrefix(items[i+j].rangeValue, query.RangeValuePrefix) + }) + + log.Debugf(" found range [%d:%d)", i, i+j) + if i > len(items) || j == 0 { + return nil + } + items = items[i : i+j] + + } else if query.RangeValueStart != nil { + log.Debugf("Lookup range %s/%x -> ... (%d)", query.HashValue, query.RangeValueStart, len(items)) + + // the smallest index i in [0, n) at which f(i) is true + i := sort.Search(len(items), func(i int) bool { + return bytes.Compare(items[i].rangeValue, query.RangeValueStart) >= 0 + }) + + log.Debugf(" found range [%d)", i) + if i > len(items) { + return nil + } + items = items[i:] + + } else { + log.Debugf("Lookup %s/* (%d)", query.HashValue, len(items)) + } + + // Filters + if query.ValueEqual != nil { + log.Debugf("Filter Value EQ = %s", query.ValueEqual) + + filtered := make([]mockItem, 0) + for _, v := range items { + if bytes.Equal(v.value, query.ValueEqual) { + filtered = append(filtered, v) + } + } + items = filtered + } + + result := mockReadBatch{} + for _, item := range items { + result = append(result, item) + } + + callback(result, true) + return nil +} + +// PutChunk implements S3Client. +func (m *MockStorage) PutChunk(_ context.Context, key string, buf []byte) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.objects[key] = buf + return nil +} + +// GetChunk implements S3Client. +func (m *MockStorage) GetChunk(_ context.Context, key string) ([]byte, error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + buf, ok := m.objects[key] + if !ok { + return nil, fmt.Errorf("%v not found", key) + } + + return buf, nil +} + +type mockWriteBatch []struct { + tableName, hashValue string + rangeValue []byte + value []byte +} + +func (b *mockWriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) { + *b = append(*b, struct { + tableName, hashValue string + rangeValue []byte + value []byte + }{tableName, hashValue, rangeValue, value}) +} + +type mockReadBatch []mockItem + +func (b mockReadBatch) Len() int { + return len(b) +} + +func (b mockReadBatch) RangeValue(i int) []byte { + return b[i].rangeValue +} + +func (b mockReadBatch) Value(i int) []byte { + return b[i].value +} diff --git a/memcache_client.go b/memcache_client.go new file mode 100644 index 0000000000000..1b5b976d4a19f --- /dev/null +++ b/memcache_client.go @@ -0,0 +1,106 @@ +package chunk + +import ( + "flag" + "fmt" + "net" + "sort" + "sync" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/prometheus/common/log" +) + +// MemcacheClient is a memcache client that gets its server list from SRV +// records, and periodically updates that ServerList. +type MemcacheClient struct { + *memcache.Client + serverList *memcache.ServerList + hostname string + service string + + quit chan struct{} + wait sync.WaitGroup +} + +// MemcacheConfig defines how a MemcacheClient should be constructed. +type MemcacheConfig struct { + Host string + Service string + Timeout time.Duration + UpdateInterval time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *MemcacheConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Host, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") + f.StringVar(&cfg.Service, "memcached.service", "memcached", "SRV service used to discover memcache servers.") + f.DurationVar(&cfg.Timeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") + f.DurationVar(&cfg.UpdateInterval, "memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.") +} + +// NewMemcacheClient creates a new MemcacheClient that gets its server list +// from SRV and updates the server list on a regular basis. +func NewMemcacheClient(cfg MemcacheConfig) *MemcacheClient { + var servers memcache.ServerList + client := memcache.NewFromSelector(&servers) + client.Timeout = cfg.Timeout + + newClient := &MemcacheClient{ + Client: client, + serverList: &servers, + hostname: cfg.Host, + service: cfg.Service, + quit: make(chan struct{}), + } + err := newClient.updateMemcacheServers() + if err != nil { + log.Errorf("Error setting memcache servers to '%v': %v", cfg.Host, err) + } + + newClient.wait.Add(1) + go newClient.updateLoop(cfg.UpdateInterval) + return newClient +} + +// Stop the memcache client. +func (c *MemcacheClient) Stop() { + close(c.quit) + c.wait.Wait() +} + +func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error { + defer c.wait.Done() + ticker := time.NewTicker(updateInterval) + var err error + for { + select { + case <-ticker.C: + err = c.updateMemcacheServers() + if err != nil { + log.Warnf("Error updating memcache servers: %v", err) + } + case <-c.quit: + ticker.Stop() + } + } +} + +// updateMemcacheServers sets a memcache server list from SRV records. SRV +// priority & weight are ignored. +func (c *MemcacheClient) updateMemcacheServers() error { + _, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname) + if err != nil { + return err + } + var servers []string + for _, srv := range addrs { + servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port)) + } + // ServerList deterministically maps keys to _index_ of the server list. + // Since DNS returns records in different order each time, we sort to + // guarantee best possible match between nodes. + sort.Strings(servers) + return c.serverList.SetServers(servers...) +} diff --git a/schema.go b/schema.go new file mode 100644 index 0000000000000..298c4bedb39f6 --- /dev/null +++ b/schema.go @@ -0,0 +1,538 @@ +package chunk + +import ( + "crypto/sha1" + "errors" + "fmt" + "strings" + + "github.com/prometheus/common/model" + "github.com/weaveworks/cortex/pkg/util" +) + +var ( + chunkTimeRangeKeyV1 = []byte{'1'} + chunkTimeRangeKeyV2 = []byte{'2'} + chunkTimeRangeKeyV3 = []byte{'3'} + chunkTimeRangeKeyV4 = []byte{'4'} + chunkTimeRangeKeyV5 = []byte{'5'} + metricNameRangeKeyV1 = []byte{'6'} +) + +// Errors +var ( + ErrNoMetricNameNotSupported = errors.New("metric name required for pre-v7 schemas") +) + +// Schema interface defines methods to calculate the hash and range keys needed +// to write or read chunks from the external index. +type Schema interface { + // When doing a write, use this method to return the list of entries you should write to. + GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + + // When doing a read, use these methods to return the list of entries you should query + GetReadQueries(from, through model.Time, userID string) ([]IndexQuery, error) + GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) + GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) + GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) +} + +// IndexQuery describes a query for entries +type IndexQuery struct { + TableName string + HashValue string + + // One of RangeValuePrefix or RangeValueStart might be set: + // - If RangeValuePrefix is not nil, must read all keys with that prefix. + // - If RangeValueStart is not nil, must read all keys from there onwards. + // - If neither is set, must read all keys for that row. + RangeValuePrefix []byte + RangeValueStart []byte + + // Filters for querying + ValueEqual []byte +} + +// IndexEntry describes an entry in the chunk index +type IndexEntry struct { + TableName string + HashValue string + + // For writes, RangeValue will always be set. + RangeValue []byte + + // New for v6 schema, label value is not written as part of the range key. + Value []byte +} + +// v1Schema was: +// - hash key: :: +// - range key: