Skip to content

Commit

Permalink
receive: Add block shipping
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Apr 18, 2019
1 parent 9fad981 commit 4b72fe8
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 79 deletions.
64 changes: 63 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
Expand All @@ -20,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -36,7 +40,16 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

labelStrs := cmd.Flag("labels", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\"").Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
}

return runReceive(
g,
logger,
Expand All @@ -49,6 +62,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
objStoreConfig,
lset,
)
}
}
Expand All @@ -65,6 +80,8 @@ func runReceive(
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
objStoreConfig *pathOrContent,
lset labels.Labels,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand Down Expand Up @@ -189,7 +206,7 @@ func runReceive(
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
Expand Down Expand Up @@ -225,6 +242,51 @@ func runReceive(
},
)
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

return nil
})
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting receiver")

return nil
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ require (
github.com/fortytw2/leaktest v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.8.0
github.com/gobuffalo/envy v1.6.15 // indirect
github.com/gogo/protobuf v1.2.0
github.com/gohugoio/hugo v0.54.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/martian v2.1.0+incompatible // indirect
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
Expand All @@ -23,7 +21,7 @@ require (
github.com/hashicorp/memberlist v0.1.0
github.com/julienschmidt/httprouter v1.1.0 // indirect
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/miekg/dns v1.0.8
github.com/miekg/dns v1.1.4
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mozillazg/go-cos v0.11.0
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223
Expand Down
Loading

0 comments on commit 4b72fe8

Please sign in to comment.