From f94544234bae789561638ecdfd5fcc65bb227d81 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 21 Feb 2023 21:41:06 +0100 Subject: [PATCH 1/2] cli: support OpenMetrics in tsdump Touches https://github.com/cockroachdb/cockroach/issues/97120. $ cockroach debug tsdump --insecure --format=openmetrics > tsdump.om $ tail -n 3 tsdump.om valcount{store="1"} 9841.000000 1677012190.0 valcount{store="1"} 9847.000000 1677012200.0 # EOF $ docker run -v $PWD/tsdump.om:/tsdump.om -v $PWD/data:/data --entrypoint /bin/promtool prom/prometheus tsdb create-blocks-from openmetrics /tsdump.om /data BLOCK ULID MIN TIME MAX TIME DURATION NUM SAMPLES NUM CHUNKS NUM SERIES SIZE 01GSTVJG4QMBSQ4ACPK80FFDJ9 1676544540000 1676544940001 6m40.001s 10598 1514 1514 261097 01GSTVJGWM2CX1AA56QSSZ9T2R 1677009150000 1677009590001 7m20.001s 68130 1514 1514 305952 01GSTVJHT2TWTZGCPE52AE1T95 1677009600000 1677012200001 43m20.001s 394093 4542 1514 692053 $ ls data 01GSTVJG4QMBSQ4ACPK80FFDJ9 01GSTVJGWM2CX1AA56QSSZ9T2R 01GSTVJHT2TWTZGCPE52AE1T95 $ docker run -p 9090:9090 -v $PWD/data:/data prom/prometheus --storage.tsdb.path=/data --web.enable-admin-api --config.file=/etc/prometheus/prometheus.yml image It should be relatively straightforward to use docker-compose to also get a Grafana stuck, but I think where the money's at is putting this into a hosted instance. Unfortunately the `promtool` method is offline-first, so it doesn't lend itself readily to it. Epic: none Release note: None cli: allow converting raw tsdumps into openmetric ones `./cockroach debug tsdump --format=openmetrics tsdump.gob > tsdump.om` For demonstration purposes one can then: - clone dockprom[^1] - add `uid: v9Zz2K6nz` to the prometheus datasource[^2] - run: ``` for d in grafana_data prometheus_data; do docker volume create --driver local --opt type=none --opt device=$PWD/$d--opt o=bind $d done promtool tsdb create-blocks-from openmetrics tsdump.om prometheus_data/ docker-compose up -d ``` - http://localhost:3000 admin/admin - import the L2 dashboard[^3] - view the tsdump![^4] Note that all of the histograms are missing because the internal timeseries data doesn't support histograms (records just a few quantiles). [^1]: https://github.com/stefanprodan/dockprom [^2]: grafana/provisioning/datasources/datasource.yml [^3]: https://grafana.testeng.crdb.io/d/CAvWxELVz/l2-drill-down?orgId=1&from=now-1h&to=now&refresh=1m&var-cluster=grinaker-231&var-instances=All [^4]: image Epic: none Release note: None cli: enable offline tsdump conversion This commit enables tsdump conversion without a running cluster. Prior to this commit, a grpc connection to a tsdb server was required to convert raw tsdump files to other supported formats. Release note: None cli: add more labels to tsdump openmetrics output This commit adds more labels to the openmetrics tsdump output, for parity with cloud prom metrics. Labels with values passed by the command line: - `cluster` Labels with values collected from datapoints: - `node_id` - `store` Labels with hard-coded values: - `cluster_type` - `job` - `region` Labels with zero values: - `instance` - `node` - `organization_id` - `organization_label` - `sla_type` - `tenant_id` Prior to this commit, only the `node` and `store` labels were reported. For details, see https://github.com/cockroachlabs/managed-service/pull/14740#issuecomment-1736134254. Release note: None --- pkg/cli/BUILD.bazel | 2 + pkg/cli/debug.go | 4 +- pkg/cli/tsdump.go | 233 +++++++++++++++++++++++++++++++++-------- pkg/cli/tsdump_test.go | 82 +++++++++++++++ pkg/ts/server.go | 11 +- 5 files changed, 285 insertions(+), 47 deletions(-) create mode 100644 pkg/cli/tsdump_test.go diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index ab591fbb5531..51dfd6936e56 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -343,6 +343,7 @@ go_test( "start_test.go", "statement_bundle_test.go", "statement_diag_test.go", + "tsdump_test.go", "userfiletable_test.go", "workload_test.go", "zip_helpers_test.go", @@ -402,6 +403,7 @@ go_test( "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/ts/tspb", "//pkg/util", "//pkg/util/ioctx", "//pkg/util/leaktest", diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 4ef6528554f0..4f303d23a0fc 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -1536,9 +1536,11 @@ func init() { f.Var(&debugLogChanSel, "only-channels", "selection of channels to include in the output diagram.") f = debugTimeSeriesDumpCmd.Flags() - f.Var(&debugTimeSeriesDumpOpts.format, "format", "output format (text, csv, tsv, raw)") + f.Var(&debugTimeSeriesDumpOpts.format, "format", "output format (text, csv, tsv, raw, openmetrics)") f.Var(&debugTimeSeriesDumpOpts.from, "from", "oldest timestamp to include (inclusive)") f.Var(&debugTimeSeriesDumpOpts.to, "to", "newest timestamp to include (inclusive)") + f.StringVar(&debugTimeSeriesDumpOpts.clusterLabel, "cluster-label", + "", "prometheus label for cluster name") f = debugSendKVBatchCmd.Flags() f.StringVar(&debugSendKVBatchContext.traceFormat, "trace", debugSendKVBatchContext.traceFormat, diff --git a/pkg/cli/tsdump.go b/pkg/cli/tsdump.go index aab88f9cad02..d1cc3e7ce8b5 100644 --- a/pkg/cli/tsdump.go +++ b/pkg/cli/tsdump.go @@ -14,13 +14,18 @@ import ( "bufio" "context" "encoding/csv" + "encoding/gob" "fmt" "io" "os" + "regexp" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/cli/clierrorplus" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/ts/tsutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -31,12 +36,14 @@ import ( // TODO(knz): this struct belongs elsewhere. // See: https://github.com/cockroachdb/cockroach/issues/49509 var debugTimeSeriesDumpOpts = struct { - format tsDumpFormat - from, to timestampValue + format tsDumpFormat + from, to timestampValue + clusterLabel string }{ - format: tsDumpText, - from: timestampValue{}, - to: timestampValue(timeutil.Now().Add(24 * time.Hour)), + format: tsDumpText, + from: timestampValue{}, + to: timestampValue(timeutil.Now().Add(24 * time.Hour)), + clusterLabel: "", } var debugTimeSeriesDumpCmd = &cobra.Command{ @@ -47,46 +54,28 @@ Dumps all of the raw timeseries values in a cluster. Only the default resolution is retrieved, i.e. typically datapoints older than the value of the 'timeseries.storage.resolution_10s.ttl' cluster setting will be absent from the output. + +When an input file is provided instead (as an argument), this input file +must previously have been created with the --format=raw switch. The command +will then convert it to the --format requested in the current invocation. `, + Args: cobra.RangeArgs(0, 1), RunE: clierrorplus.MaybeDecorateError(func(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - conn, finish, err := getClientGRPCConn(ctx, serverCfg) - if err != nil { - return err - } - defer finish() - - names, err := serverpb.GetInternalTimeseriesNamesFromServer(ctx, conn) - if err != nil { - return err - } - - req := &tspb.DumpRequest{ - StartNanos: time.Time(debugTimeSeriesDumpOpts.from).UnixNano(), - EndNanos: time.Time(debugTimeSeriesDumpOpts.to).UnixNano(), - Names: names, - } - - if debugTimeSeriesDumpOpts.format == tsDumpRaw { - tsClient := tspb.NewTimeSeriesClient(conn) - stream, err := tsClient.DumpRaw(context.Background(), req) - if err != nil { - return err - } - - // Buffer the writes to os.Stdout since we're going to - // be writing potentially a lot of data to it. - w := bufio.NewWriter(os.Stdout) - if err := tsutil.DumpRawTo(stream, w); err != nil { - return err - } - return w.Flush() + var convertFile string + if len(args) > 0 { + convertFile = args[0] } var w tsWriter switch debugTimeSeriesDumpOpts.format { + case tsDumpRaw: + if convertFile != "" { + return errors.Errorf("input file is already in raw format") + } + // Special case, we don't go through the text output code. case tsDumpCSV: w = csvTSWriter{w: csv.NewWriter(os.Stdout)} case tsDumpTSV: @@ -95,18 +84,101 @@ output. w = cw case tsDumpText: w = defaultTSWriter{w: os.Stdout} + case tsDumpOpenMetrics: + w = makeOpenMetricsWriter(os.Stdout) default: return errors.Newf("unknown output format: %v", debugTimeSeriesDumpOpts.format) } - tsClient := tspb.NewTimeSeriesClient(conn) - stream, err := tsClient.Dump(context.Background(), req) - if err != nil { - return err + var recv func() (*tspb.TimeSeriesData, error) + if convertFile == "" { + // To enable conversion without a running cluster, we want to skip + // connecting to the server when converting an existing tsdump. + conn, finish, err := getClientGRPCConn(ctx, serverCfg) + if err != nil { + return err + } + defer finish() + + names, err := serverpb.GetInternalTimeseriesNamesFromServer(ctx, conn) + if err != nil { + return err + } + req := &tspb.DumpRequest{ + StartNanos: time.Time(debugTimeSeriesDumpOpts.from).UnixNano(), + EndNanos: time.Time(debugTimeSeriesDumpOpts.to).UnixNano(), + Names: names, + } + tsClient := tspb.NewTimeSeriesClient(conn) + + if debugTimeSeriesDumpOpts.format == tsDumpRaw { + stream, err := tsClient.DumpRaw(context.Background(), req) + if err != nil { + return err + } + + // Buffer the writes to os.Stdout since we're going to + // be writing potentially a lot of data to it. + w := bufio.NewWriter(os.Stdout) + if err := tsutil.DumpRawTo(stream, w); err != nil { + return err + } + return w.Flush() + } + stream, err := tsClient.Dump(context.Background(), req) + if err != nil { + return err + } + recv = stream.Recv + } else { + f, err := os.Open(args[0]) + if err != nil { + return err + } + type tup struct { + data *tspb.TimeSeriesData + err error + } + + dec := gob.NewDecoder(f) + gob.Register(&roachpb.KeyValue{}) + decodeOne := func() (*tspb.TimeSeriesData, error) { + var v roachpb.KeyValue + err := dec.Decode(&v) + if err != nil { + return nil, err + } + + var data *tspb.TimeSeriesData + dumper := ts.DefaultDumper{Send: func(d *tspb.TimeSeriesData) error { + data = d + return nil + }} + if err := dumper.Dump(&v); err != nil { + return nil, err + } + return data, nil + } + + ch := make(chan tup, 4096) + go func() { + // ch is closed when the process exits, so closing channel here is + // more for extra protection. + defer close(ch) + for { + data, err := decodeOne() + ch <- tup{data, err} + } + }() + + recv = func() (*tspb.TimeSeriesData, error) { + r := <-ch + return r.data, r.err + } } for { - data, err := stream.Recv() + data, err := recv() if err == io.EOF { return w.Flush() } @@ -125,6 +197,80 @@ type tsWriter interface { Flush() error } +type openMetricsWriter struct { + out io.Writer + labels map[string]string +} + +func makeOpenMetricsWriter(out io.Writer) *openMetricsWriter { + // construct labels + labelMap := make(map[string]string) + // Hardcoded values + labelMap["cluster_type"] = "SELF_HOSTED" + labelMap["job"] = "cockroachdb" + labelMap["region"] = "local" + // Zero values + labelMap["instance"] = "" + labelMap["node"] = "" + labelMap["organization_id"] = "" + labelMap["organization_label"] = "" + labelMap["sla_type"] = "" + labelMap["tenant_id"] = "" + // Command values + if debugTimeSeriesDumpOpts.clusterLabel != "" { + labelMap["cluster"] = debugTimeSeriesDumpOpts.clusterLabel + } else if serverCfg.ClusterName != "" { + labelMap["cluster"] = serverCfg.ClusterName + } else { + labelMap["cluster"] = fmt.Sprintf("cluster-debug-%d", timeutil.Now().Unix()) + } + return &openMetricsWriter{out: out, labels: labelMap} +} + +var reCrStoreNode = regexp.MustCompile(`^cr\.([^\.]+)\.(.*)$`) +var rePromTSName = regexp.MustCompile(`[^a-z0-9]`) + +func (w *openMetricsWriter) Emit(data *tspb.TimeSeriesData) error { + name := data.Name + sl := reCrStoreNode.FindStringSubmatch(data.Name) + labelMap := w.labels + labelMap["node_id"] = "0" + if len(sl) != 0 { + storeNodeKey := sl[1] + if storeNodeKey == "node" { + storeNodeKey += "_id" + } + labelMap[storeNodeKey] = data.Source + name = sl[2] + } + var l []string + for k, v := range labelMap { + l = append(l, fmt.Sprintf("%s=%q", k, v)) + } + labels := "{" + strings.Join(l, ",") + "}" + name = rePromTSName.ReplaceAllLiteralString(name, `_`) + for _, pt := range data.Datapoints { + if _, err := fmt.Fprintf( + w.out, + "%s%s %f %d.%d\n", + name, + labels, + pt.Value, + // Convert to Unix Epoch in seconds with preserved precision + // (https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps). + pt.TimestampNanos/1e9, pt.TimestampNanos%1e9, + ); err != nil { + return err + } + } + return nil +} + +func (w *openMetricsWriter) Flush() error { + fmt.Fprintln(w.out, `# EOF`) + return nil +} + type csvTSWriter struct { w *csv.Writer } @@ -172,6 +318,7 @@ const ( tsDumpCSV tsDumpTSV tsDumpRaw + tsDumpOpenMetrics ) // Type implements the pflag.Value interface. @@ -188,6 +335,8 @@ func (m *tsDumpFormat) String() string { return "text" case tsDumpRaw: return "raw" + case tsDumpOpenMetrics: + return "openmetrics" } return "" } @@ -203,6 +352,8 @@ func (m *tsDumpFormat) Set(s string) error { *m = tsDumpTSV case "raw": *m = tsDumpRaw + case "openmetrics": + *m = tsDumpOpenMetrics default: return fmt.Errorf("invalid value for --format: %s", s) } diff --git a/pkg/cli/tsdump_test.go b/pkg/cli/tsdump_test.go new file mode 100644 index 000000000000..ad2bca7646d0 --- /dev/null +++ b/pkg/cli/tsdump_test.go @@ -0,0 +1,82 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cli + +import ( + "bytes" + "io" + "math/rand" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ts/tspb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestDebugTimeSeriesDumpCmd(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + c := NewCLITest(TestCLIParams{}) + defer c.Cleanup() + t.Run("debug tsdump --format=openmetrics", func(t *testing.T) { + out, err := c.RunWithCapture("debug tsdump --format=openmetrics --cluster-name=test-cluster-1 --disable-cluster-name-verification") + require.NoError(t, err) + results := strings.Split(out, "\n")[1:] // Drop first item that contains executed command string. + require.Equal(t, results[len(results)-1], "", "expected last string to be empty (ends with /\n)") + require.Equal(t, results[len(results)-2], "# EOF") + require.Greater(t, len(results), 0) + require.Greater(t, len(results[:len(results)-2]), 0, "expected to have at least one metric") + }) +} + +func TestMakeOpenMetricsWriter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var out bytes.Buffer + dataPointsNum := 100 + data := makeTS("cr.test.metric", "source", dataPointsNum) + w := makeOpenMetricsWriter(&out) + err := w.Emit(data) + require.NoError(t, err) + err = w.Flush() + require.NoError(t, err) + + var res []string + for { + s, err := out.ReadString('\n') + if err != nil { + require.ErrorIs(t, err, io.EOF) + break + } + res = append(res, s) + } + require.Equal(t, dataPointsNum+1 /* datapoints + EOF final line */, len(res)) +} + +func makeTS(name, source string, dataPointsNum int) *tspb.TimeSeriesData { + dps := make([]tspb.TimeSeriesDatapoint, dataPointsNum) + for i := range dps { + dps[i] = tspb.TimeSeriesDatapoint{ + TimestampNanos: timeutil.Now().UnixNano(), + Value: rand.Float64(), + } + } + return &tspb.TimeSeriesData{ + Name: name, + Source: source, + Datapoints: dps, + } +} diff --git a/pkg/ts/server.go b/pkg/ts/server.go index 42d72971d4a1..79f68dd5aa5b 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -379,7 +379,7 @@ func (s *Server) Query( // set up a KV store and write some keys into it (`MakeDataKey`) to do so without // setting up a `*Server`. func (s *Server) Dump(req *tspb.DumpRequest, stream tspb.TimeSeries_DumpServer) error { - d := defaultDumper{stream}.Dump + d := DefaultDumper{stream.Send}.Dump return dumpImpl(stream.Context(), s.db.db, req, d) } @@ -421,11 +421,12 @@ func dumpImpl( return nil } -type defaultDumper struct { - stream tspb.TimeSeries_DumpServer +// DefaultDumper translates *roachpb.KeyValue into TimeSeriesData. +type DefaultDumper struct { + Send func(*tspb.TimeSeriesData) error } -func (dd defaultDumper) Dump(kv *roachpb.KeyValue) error { +func (dd DefaultDumper) Dump(kv *roachpb.KeyValue) error { name, source, _, _, err := DecodeDataKey(kv.Key) if err != nil { return err @@ -449,7 +450,7 @@ func (dd defaultDumper) Dump(kv *roachpb.KeyValue) error { tsdata.Datapoints[i].Value = idata.Samples[i].Sum } } - return dd.stream.Send(tsdata) + return dd.Send(tsdata) } type rawDumper struct { From dc9f13213c95ce3179c7c30d3f36603e95782bf2 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 26 Oct 2023 02:43:18 -0400 Subject: [PATCH 2/2] kv: assert transaction finalization after re-issued EndTxn Informs #111962. Informs #111967. This commit updates the two places where EndTxn requests are re-issued in the TxnCoordSender (one in the txnCommitter, one in the txnSpanRefresher) to ensure that after the retry succeeds and the response is stitched back together, the transaction is finalized. I have no reason to believe that these assertions will fail. Release note: None --- pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go | 4 ++++ pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 53acaddc303c..3c39b76f886f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -465,6 +465,10 @@ func (tc *txnCommitter) retryTxnCommitAfterFailedParallelCommit( if err := br.Combine(ctx, brSuffix, []int{etIdx}, ba); err != nil { return nil, kvpb.NewError(err) } + if br.Txn == nil || !br.Txn.Status.IsFinalized() { + return nil, kvpb.NewError(errors.AssertionFailedf( + "txn status not finalized after successful retried EndTxn: %v", br.Txn)) + } return br, nil } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index a380c9918f4d..9074b42f7dd6 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -420,6 +420,10 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend( if err := br.Combine(ctx, brSuffix, []int{etIdx}, ba); err != nil { return nil, kvpb.NewError(err) } + if br.Txn == nil || !br.Txn.Status.IsFinalized() { + return nil, kvpb.NewError(errors.AssertionFailedf( + "txn status not finalized after successful retried EndTxn: %v", br.Txn)) + } return br, nil }