Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate "parallelizable" plans for compaction #1

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
return nil, nil, err
}

planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds())
planner, _ := NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), "default")
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
return compactor, planner, nil
}
)
Expand Down
58 changes: 58 additions & 0 deletions pkg/compactor/planner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package compactor

import (
"context"
"errors"
"fmt"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
)

var (
errPlannerNotImplemented = errors.New("planner type not implemented")
)

type Planner struct {
blocksPlanner compact.Planner
logger log.Logger
}

func NewPlanner(logger log.Logger, ranges []int64, plannerType string) (*Planner, error) {
if logger == nil {
logger = log.NewNopLogger()
}

var blocksPlanner compact.Planner
switch plannerType {
case "parallel":
ac1214 marked this conversation as resolved.
Show resolved Hide resolved
blocksPlanner = NewParallelPlanner(logger, ranges)
case "default":
blocksPlanner = compact.NewTSDBBasedPlanner(logger, ranges)
default:
return nil, errPlannerNotImplemented
}

p := &Planner{
logger: logger,
blocksPlanner: blocksPlanner,
}

return p, nil
}

func (p *Planner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
return p.blocksPlanner.Plan(ctx, metasByMinTime)
}

func (p *Planner) PrintPlan(ctx context.Context, metasByMinTime []*metadata.Meta) {
toCompact, err := p.Plan(ctx, metasByMinTime)

if err != nil {
return
}

level.Info(p.logger).Log("msg", "Compaction plan generated: ", "plan", fmt.Sprintf("%v", toCompact))
}
58 changes: 58 additions & 0 deletions pkg/compactor/planner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package compactor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

type PlannerMock struct {
mock.Mock
}

func (p *PlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
args := p.Called(ctx, metasByMinTime)
return args.Get(0).([]*metadata.Meta), args.Error(1)
}

func TestPlanner_ShouldReturnParallelPlanner(t *testing.T) {
var ranges []int64
p, _ := NewPlanner(nil, ranges, "parallel")

parallelPlanner := &ParallelPlanner{}

assert.IsType(t, parallelPlanner, p.blocksPlanner)
}

func TestPlanner_ShouldReturnDefaultPlanner(t *testing.T) {
var ranges []int64
p, _ := NewPlanner(nil, ranges, "default")

parallelPlanner := &ParallelPlanner{}

assert.IsType(t, parallelPlanner, p.blocksPlanner)
}

func TestPlanner_ShouldErrorOnNonExistentPlanner(t *testing.T) {
var ranges []int64
_, err := NewPlanner(nil, ranges, "non-existent")

assert.ErrorIs(t, err, errPlannerNotImplemented)
}

func TestPlanner_PlanShouldCallBlocksPlannerPlan(t *testing.T) {
blockPlannerMock := &PlannerMock{}
blockPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

p := &Planner{
blocksPlanner: blockPlannerMock,
}

blockMetas := []*metadata.Meta{}
p.Plan(context.Background(), blockMetas)

blockPlannerMock.AssertCalled(t, "Plan", context.Background(), blockMetas)
}