Skip to content

Commit

Permalink
sidecar: Added support for streaming, chunked remote read.
Browse files Browse the repository at this point in the history
Fixes: #488

Signed-off-by: Bartek Plotka <[email protected]>

# Conflicts:
#	Makefile
#	pkg/store/prompb/remote.pb.go
  • Loading branch information
bwplotka committed Aug 22, 2019
1 parent d047ea7 commit 431aaf5
Show file tree
Hide file tree
Showing 28 changed files with 309 additions and 2,654 deletions.
18 changes: 5 additions & 13 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,14 @@ jobs:
- run: make format
- run:
name: "Run all tests"
# TODO(bplotka): Setup some S3 tests for CI.
# TODO(bwplotka): Setup some S3 tests for CI.
# taskset sets CPU affinity to 2 (current CPU limit)
command: |
if [ -z ${GCP_PROJECT} ]; then
export THANOS_SKIP_GCS_TESTS="true"
echo "Skipping GCS tests."
taskset 2 make test-local
exit
fi
export THANOS_SKIP_S3_AWS_TESTS="true"
echo "Skipping AWS tests."
export THANOS_SKIP_AZURE_TESTS="true"
echo "Skipping Azure tests."
export THANOS_SKIP_SWIFT_TESTS="true"
echo "Skipping SWIFT tests."
export THANOS_SKIP_TENCENT_COS_TESTS="true"
echo "Skipping TENCENT COS tests."
make test
taskset 2 make test-only-gcs
# Cross build is needed for publish_release but needs to be done outside of docker.
cross_build:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ tarballs-release: $(PROMU)
.PHONY: test
test: check-git test-deps
@echo ">> running all tests. Do export THANOS_SKIP_GCS_TESTS='true' or/and THANOS_SKIP_S3_AWS_TESTS='true' or/and THANOS_SKIP_AZURE_TESTS='true' and/or THANOS_SKIP_SWIFT_TESTS='true' and/or THANOS_SKIP_TENCENT_COS_TESTS='true' if you want to skip e2e tests against real store buckets"
THANOS_TEST_PROMETHEUS_VERSIONS="$(PROM_VERSIONS)" THANOS_TEST_ALERTMANAGER_PATH="alertmanager-$(ALERTMANAGER_VERSION)" go test $(shell go list ./... | grep -v /vendor/ | grep -v /benchmark/);
THANOS_TEST_PROMETHEUS_VERSIONS="$(PROM_VERSIONS)" THANOS_TEST_ALERTMANAGER_PATH="alertmanager-$(ALERTMANAGER_VERSION)" go test $(shell go list ./... | grep -v /vendor/);

.PHONY: test-only-gcs
test-only-gcs: export THANOS_SKIP_S3_AWS_TESTS = true
Expand Down Expand Up @@ -237,7 +237,7 @@ web: web-pre-process $(HUGO)
#
# to debug big allocations during linting.
lint: check-git $(GOLANGCILINT) $(MISSPELL)
@echo ">> linting all of the Go files"
@echo ">> linting all of the Go files GOGC=${GOGC}""
@$(GOLANGCILINT) run -v --enable goimports --enable goconst --skip-dirs vendor
@echo ">> detecting misspells"
@find . -type f | grep -v vendor/ | grep -vE '\./\..*' | xargs $(MISSPELL) -error
Expand Down
3 changes: 1 addition & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ func runQuery(
}

ins := extpromhttp.NewInstrumentationMiddleware(reg)

ui.NewQueryUI(logger, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse)

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ func runRule(

ins := extpromhttp.NewInstrumentationMiddleware(reg)

ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)
ui.NewRuleUI(logger, reg, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins)

api := v1.NewAPI(logger, ruleMgrs)
api := v1.NewAPI(logger, reg, ruleMgrs)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

mux := http.NewServeMux()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v0.0.0-20190814090039-b5c833ca2194
github.com/prometheus/prometheus v1.8.2-0.20190819201610-48b2c9c8eae2 // v1.8.2 is misleading as Prometheus does not have v2 module. This is pointing to one commit after 2.12.0.
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
Expand Down
43 changes: 40 additions & 3 deletions go.sum

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
func TestIsWALFileAccesible_e2e(t *testing.T) {
testutil.ForeachPrometheus(t, func(t testing.TB, p *testutil.Prometheus) {
testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
Expand All @@ -43,7 +42,6 @@ global:
`))

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
Expand All @@ -60,7 +58,6 @@ global:
func TestConfiguredFlags_e2e(t *testing.T) {
testutil.ForeachPrometheus(t, func(t testing.TB, p *testutil.Prometheus) {
testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
Expand Down Expand Up @@ -96,7 +93,6 @@ func TestSnapshot_e2e(t *testing.T) {
testutil.Ok(t, err)

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ type API struct {
rangeQueryDuration prometheus.Histogram
enableAutodownsampling bool
enablePartialResponse bool
now func() time.Time
reg prometheus.Registerer

now func() time.Time
}

// NewAPI returns an initialized API type.
Expand Down Expand Up @@ -143,6 +145,7 @@ func NewAPI(
rangeQueryDuration: rangeQueryDuration,
enableAutodownsampling: enableAutodownsampling,
enablePartialResponse: enablePartialResponse,
reg: reg,

now: time.Now,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/prompb"
promtsdb "github.com/prometheus/prometheus/storage/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/prompb"
)

// Options for the web Handler.
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sort"
"sync"

"github.com/thanos-io/thanos/pkg/store/prompb"
"github.com/prometheus/prometheus/prompb"

"github.com/cespare/xxhash"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package receive
import (
"testing"

"github.com/thanos-io/thanos/pkg/store/prompb"
"github.com/prometheus/prometheus/prompb"
)

func TestHash(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package receive
import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/prompb"
"github.com/prometheus/prometheus/prompb"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestReloader_RuleApply(t *testing.T) {
testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3"), os.ModePerm))
testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm))

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
g := sync.WaitGroup{}
g.Add(1)
go func() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/rule/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/NYTimes/gziphandler"
"github.com/prometheus/client_golang/prometheus"

"github.com/go-kit/kit/log"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/common/route"
Expand All @@ -22,16 +24,19 @@ type API struct {
logger log.Logger
now func() time.Time
ruleRetriever RulesRetriever
reg prometheus.Registerer
}

func NewAPI(
logger log.Logger,
reg prometheus.Registerer,
ruleRetriever RulesRetriever,
) *API {
return &API{
logger: logger,
now: time.Now,
ruleRetriever: ruleRetriever,
reg: reg,
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/rule/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
qapi "github.com/thanos-io/thanos/pkg/query/api"
thanosrule "github.com/thanos-io/thanos/pkg/rule"
)
Expand Down Expand Up @@ -164,6 +164,7 @@ func TestEndpoints(t *testing.T) {
algr.RuleGroups()
api := NewAPI(
nil,
prometheus.DefaultRegisterer,
algr,
)
testEndpoints(t, api)
Expand Down
1 change: 1 addition & 0 deletions pkg/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st
continue
}
// We add external labels in `pkg/alert.Queue`.
// TODO(bwplotka): Investigate if we should put ext labels here or not.
if err := updater.Update(evalInterval, fs, nil); err != nil {
errs = append(errs, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestBucketStore_Info(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, err := ioutil.TempDir("", "prometheus-test")
dir, err := ioutil.TempDir("", "bucketstore-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20)
Expand Down
Loading

0 comments on commit 431aaf5

Please sign in to comment.