Skip to content

Commit

Permalink
receive: Add block shipping
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Apr 15, 2019
1 parent dda20f0 commit d3a9d45
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 91 deletions.
33 changes: 32 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand All @@ -20,6 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -36,6 +39,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

externalLabels := cmd.Flag("external-labels", "External labels to announce. Comma separated list of key=value pairs. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key1=value1,key2=value2").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runReceive(
g,
Expand All @@ -49,6 +56,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
objStoreConfig,
*externalLabels,
)
}
}
Expand All @@ -65,10 +74,26 @@ func runReceive(
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
objStoreConfig *pathOrContent,
externalLabelsString string,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

externalLabels := labels.Labels{}
labelStrings := strings.Split(externalLabelsString, ",")
for _, labelString := range labelStrings {
keyValueStrings := strings.Split(labelString, "=")
if len(keyValueStrings) != 2 {
return errors.Errorf("external labels are malformed: a list of labels passed to the --external-labels flag must be comma separated and labels in key=value form. Got a label: %s", labelString)
}
externalLabels = append(externalLabels, labels.Label{
Name: keyValueStrings[0],
Value: keyValueStrings[1],
})
}
level.Debug(logger).Log("external_labels", externalLabels.String())

tsdbCfg := &tsdb.Options{
Retention: model.Duration(time.Hour * 24 * 15),
NoLockfile: true,
Expand Down Expand Up @@ -189,7 +214,7 @@ func runReceive(
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, externalLabels)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
Expand Down Expand Up @@ -225,6 +250,12 @@ func runReceive(
},
)
}

err := addShipper(logger, reg, g, metadata.ReceiveSource, &promMetadata{labels: externalLabels}, dataDir, objStoreConfig, false)
if err != nil {
return errors.Wrap(err, "failed to add block shipper to run group")
}

level.Info(logger).Log("msg", "starting receiver")

return nil
Expand Down
24 changes: 12 additions & 12 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ func runSidecar(
})
}

err := addShipper(logger, reg, g, metadata.SidecarSource, m, dataDir, objStoreConfig, uploadCompacted)
if err != nil {
return errors.Wrap(err, "failed to add block shipper to run group")
}

level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name())
return nil
}

func addShipper(logger log.Logger, reg prometheus.Registerer, g *run.Group, sourceType metadata.SourceType, m *promMetadata, dataDir string, objStoreConfig *pathOrContent, uploadCompacted bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -265,36 +275,26 @@ func runSidecar(

var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, m.promURL)
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, sourceType, m.promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, sourceType)
}

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
m.UpdateTimestamps(minTime, math.MaxInt64)

mint, maxt := m.Timestamps()
peer.SetTimestamps(mint, maxt)
}
return nil
})
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name())
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ require (
github.com/fortytw2/leaktest v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.8.0
github.com/gobuffalo/envy v1.6.15 // indirect
github.com/gogo/protobuf v1.2.0
github.com/gohugoio/hugo v0.54.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/martian v2.1.0+incompatible // indirect
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
Expand Down
Loading

0 comments on commit d3a9d45

Please sign in to comment.