Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block (UI and API): Add mark deletion and no compaction UI #4620

Merged
merged 12 commits into from
Nov 6, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4736](https://github.com/thanos-io/thanos/pull/4736) S3: Add capability to use custom AWS STS Endpoint.
- [#4764](https://github.com/thanos-io/thanos/pull/4764) Compactor: add `block-viewer.global.sync-block-timeout` flag to set the timeout of synchronization block metas.
- [#4801](https://github.com/thanos-io/thanos/pull/4801) Compactor: added Prometheus metrics for tracking the progress of compaction and downsampling.
- [#4444](https://github.com/thanos-io/thanos/pull/4444) UI: add mark deletion and no compaction to the Block UI.

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func runCompact(
"/loaded",
component,
)
api = blocksAPI.NewBlocksAPI(logger, conf.webConf.disableCORS, conf.label, flagsMap)
api = blocksAPI.NewBlocksAPI(logger, conf.webConf.disableCORS, conf.label, flagsMap, bkt)
sy *compact.Syncer
)
{
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func runStore(

// Configure Request Logging for HTTP calls.
logMiddleware := logging.NewHTTPServerMiddleware(logger, httpLogOpts...)
api := blocksAPI.NewBlocksAPI(logger, conf.webConfig.disableCORS, "", flagsMap)
api := blocksAPI.NewBlocksAPI(logger, conf.webConfig.disableCORS, "", flagsMap, bkt)
api.Register(r.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)

metaFetcher.UpdateOnChange(func(blocks []metadata.Meta, err error) {
Expand Down
22 changes: 11 additions & 11 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,17 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC

flagsMap := getFlagsMap(cmd.Flags())

api := v1.NewBlocksAPI(logger, tbc.webDisableCORS, tbc.label, flagsMap)
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String())
if err != nil {
return errors.Wrap(err, "bucket client")
}

api := v1.NewBlocksAPI(logger, tbc.webDisableCORS, tbc.label, flagsMap, bkt)

// Configure Request Logging for HTTP calls.
opts := []logging.Option{logging.WithDecider(func(_ string, _ error) logging.Decision {
Expand All @@ -575,16 +585,6 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String())
if err != nil {
return errors.Wrap(err, "bucket client")
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
Expand Down
66 changes: 65 additions & 1 deletion pkg/api/blocks/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/thanos-io/thanos/pkg/api"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/objstore"
)

// BlocksAPI is a very simple API used by Thanos Block Viewer.
Expand All @@ -23,6 +29,7 @@ type BlocksAPI struct {
globalBlocksInfo *BlocksInfo
loadedBlocksInfo *BlocksInfo
disableCORS bool
bkt objstore.Bucket
}

type BlocksInfo struct {
Expand All @@ -32,8 +39,27 @@ type BlocksInfo struct {
Err error `json:"err"`
}

type ActionType int32

const (
Deletion ActionType = iota
NoCompaction
Unknown
)

func parse(s string) ActionType {
switch s {
case "DELETION":
return Deletion
case "NO_COMPACTION":
return NoCompaction
default:
return Unknown
}
}

// NewBlocksAPI creates a simple API to be used by Thanos Block Viewer.
func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap map[string]string) *BlocksAPI {
func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap map[string]string, bkt objstore.Bucket) *BlocksAPI {
return &BlocksAPI{
baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap),
logger: logger,
Expand All @@ -46,6 +72,7 @@ func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap ma
Label: label,
},
disableCORS: disableCORS,
bkt: bkt,
}
}

Expand All @@ -55,6 +82,43 @@ func (bapi *BlocksAPI) Register(r *route.Router, tracer opentracing.Tracer, logg
instr := api.GetInstr(tracer, logger, ins, logMiddleware, bapi.disableCORS)

r.Get("/blocks", instr("blocks", bapi.blocks))
r.Post("/blocks/mark", instr("blocks_mark", bapi.markBlock))
}

func (bapi *BlocksAPI) markBlock(r *http.Request) (interface{}, []error, *api.ApiError) {
idParam := r.FormValue("id")
actionParam := r.FormValue("action")
detailParam := r.FormValue("detail")

if idParam == "" {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("ID cannot be empty")}
}

if actionParam == "" {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("Action cannot be empty")}
}

id, err := ulid.Parse(idParam)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("ULID %q is not valid: %v", idParam, err)}
}

actionType := parse(actionParam)
switch actionType {
case Deletion:
err := block.MarkForDeletion(r.Context(), bapi.logger, bapi.bkt, id, detailParam, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
case NoCompaction:
err := block.MarkForNoCompact(r.Context(), bapi.logger, bapi.bkt, id, metadata.ManualNoCompactReason, detailParam, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
default:
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("not supported marker %v", actionParam)}
}
return nil, nil, nil
}

func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiError) {
Expand Down
187 changes: 187 additions & 0 deletions pkg/api/blocks/v1_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package v1

import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"reflect"
"strings"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
baseAPI "github.com/thanos-io/thanos/pkg/api"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)

func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type endpointTestCase struct {
endpoint baseAPI.ApiFunc
params map[string]string
query url.Values
method string
response interface{}
errType baseAPI.ErrorType
}
type responeCompareFunction func(interface{}, interface{}) bool

func testEndpoint(t *testing.T, test endpointTestCase, name string, responseCompareFunc responeCompareFunction) bool {
return t.Run(name, func(t *testing.T) {
// Build a context with the correct request params.
ctx := context.Background()
for p, v := range test.params {
ctx = route.WithParam(ctx, p, v)
}

reqURL := "http://example.com"
params := test.query.Encode()

var body io.Reader
if test.method == http.MethodPost {
body = strings.NewReader(params)
} else if test.method == "" {
test.method = "ANY"
reqURL += "?" + params
}

req, err := http.NewRequest(test.method, reqURL, body)
if err != nil {
t.Fatal(err)
}

if body != nil {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}

resp, _, apiErr := test.endpoint(req.WithContext(ctx))
if apiErr != nil {
if test.errType == baseAPI.ErrorNone {
t.Fatalf("Unexpected error: %s", apiErr)
}
if test.errType != apiErr.Typ {
t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.Typ)
}
return
}
if test.errType != baseAPI.ErrorNone {
t.Fatalf("Expected error of type %q but got none", test.errType)
}

if !responseCompareFunc(resp, test.response) {
t.Fatalf("Response does not match, expected:\n%+v\ngot:\n%+v", test.response, resp)
}
})
}

func TestMarkBlockEndpoint(t *testing.T) {
ctx := context.Background()
tmpDir, err := ioutil.TempDir("", "test-read-mark")
testutil.Ok(t, err)

// create block
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)

// upload block
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
logger := log.NewNopLogger()
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(tmpDir, b1.String()), metadata.NoneFunc))

now := time.Now()
api := &BlocksAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
logger: logger,
globalBlocksInfo: &BlocksInfo{
Blocks: []metadata.Meta{},
Label: "foo",
},
loadedBlocksInfo: &BlocksInfo{
Blocks: []metadata.Meta{},
Label: "foo",
},
disableCORS: true,
bkt: bkt,
}

var tests = []endpointTestCase{
// Empty ID
{
endpoint: api.markBlock,
query: url.Values{
"id": []string{""},
},
errType: baseAPI.ErrorBadData,
},
// Empty action
{
endpoint: api.markBlock,
query: url.Values{
"id": []string{ulid.MustNew(1, nil).String()},
"action": []string{""},
},
errType: baseAPI.ErrorBadData,
},
// invalid ULID
{
endpoint: api.markBlock,
query: url.Values{
"id": []string{"invalid_id"},
"action": []string{"DELETION"},
},
errType: baseAPI.ErrorBadData,
},
// invalid action
{
endpoint: api.markBlock,
query: url.Values{
"id": []string{ulid.MustNew(2, nil).String()},
"action": []string{"INVALID_ACTION"},
},
errType: baseAPI.ErrorBadData,
},
{
endpoint: api.markBlock,
query: url.Values{
"id": []string{b1.String()},
"action": []string{"DELETION"},
},
response: nil,
},
}

for i, test := range tests {
if ok := testEndpoint(t, test, fmt.Sprintf("#%d %s", i, test.query.Encode()), reflect.DeepEqual); !ok {
return
}
}

file := path.Join(tmpDir, b1.String())
_, err = os.Stat(file)
testutil.Ok(t, err)
}
Loading