Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113037: cli: support OpenMetrics in tsdump r=koorosh a=koorosh

**This PR duplicates initial work in #111654** with additional change that adds tests and small fixes.
Last commit can be reviewed only.
The only reason for new PR is that I couldn't push additional changes to former one.

----
This PR continues #97411 from `@tbg.`

Fixes #97120.

See individual commit messages for details.

----
*From #97411 on the first two commits:*

Touches #97120.

    $ cockroach debug tsdump --insecure --format=openmetrics > tsdump.om

    $ tail -n 3 tsdump.om
    valcount{store="1"} 9841.000000 1677012190.0
    valcount{store="1"} 9847.000000 1677012200.0
    # EOF

    $ docker run -v $PWD/tsdump.om:/tsdump.om -v $PWD/data:/data --entrypoint /bin/promtool prom/prometheus tsdb create-blocks-from openmetrics /tsdump.om /data
    BLOCK ULID                  MIN TIME       MAX TIME       DURATION     NUM SAMPLES  NUM CHUNKS   NUM SERIES   SIZE
    01GSTVJG4QMBSQ4ACPK80FFDJ9  1676544540000  1676544940001  6m40.001s    10598        1514         1514         261097
    01GSTVJGWM2CX1AA56QSSZ9T2R  1677009150000  1677009590001  7m20.001s    68130        1514         1514         305952
    01GSTVJHT2TWTZGCPE52AE1T95  1677009600000  1677012200001  43m20.001s   394093       4542         1514         692053

    $ ls data
    01GSTVJG4QMBSQ4ACPK80FFDJ9	01GSTVJGWM2CX1AA56QSSZ9T2R	01GSTVJHT2TWTZGCPE52AE1T95

    $ docker run -p 9090:9090 -v $PWD/data:/data prom/prometheus --storage.tsdb.path=/data --web.enable-admin-api --config.file=/etc/prometheus/prometheus.yml

<img width="1689" alt="image" src="https://user-images.githubusercontent.com/5076964/220455288-e29147d7-c2f6-49dc-bb1d-5be3b28d958f.png">

----

Second commit, convert existing raw tsdump and look at it through Grafana/prom:

`./cockroach debug tsdump --format=openmetrics tsdump.gob > tsdump.om`

For demonstration purposes one can then:

- clone dockprom[^1]
- add `uid: v9Zz2K6nz` to the prometheus datasource[^2]
- run:

  ```
  for d in grafana_data prometheus_data; do
    docker volume create --driver local --opt type=none --opt device=$PWD/$d--opt o=bind $d
  done
  promtool tsdb create-blocks-from openmetrics tsdump.om prometheus_data/
  docker-compose up -d
  ```
- http://localhost:3000 admin/admin
- import the L2 dashboard[^3]
- view the tsdump![^4]

Note that all of the histograms are missing because the internal timeseries
data doesn't support histograms (records just a few quantiles).

[^1]: https://github.com/stefanprodan/dockprom
[^2]: grafana/provisioning/datasources/datasource.yml
[^3]: https://grafana.testeng.crdb.io/d/CAvWxELVz/l2-drill-down?orgId=1&from=now-1h&to=now&refresh=1m&var-cluster=grinaker-231&var-instances=All
[^4]: <img width="1664" alt="image" src="https://user-images.githubusercontent.com/5076964/233642565-175fec77-1bff-4a75-af98-b9bdc6748f78.png">

----
*re: additons to #97411:*

The last two commits enable tsdump raw -> openmetrics conversion offline and add additional labels to the metrics, for parity with cloud metrics.

With a single-node, insecure cluster:

```
➜  cockroach git:(tsdump-openmetrics) ./cockroach debug tsdump --insecure --host=localhost:26258 --format=raw --from="2023-09-29 15:27:03" --to="2023-09-29 20:57:03" > tsdump.gob
 
... shut down cluster...

➜  cockroach git:(tsdump-openmetrics) ✗ ./cockroach debug tsdump --format=openmetrics tsdump.gob --cluster-label="my-cluster" > tsdump_converted.om
➜  cockroach git:(tsdump-openmetrics) ✗ open ./tsdump_converted.om

``` 

```
admission_admitted_elastic_cpu{instance="",organization_id="",organization_label="",node_id="1",sla_type="",tenant_id="",cluster="my-cluster",cluster_type="SELF_HOSTED",job="cockroachdb",region="local",node=""} 0.000000 1696013730.0
admission_admitted_elastic_cpu{instance="",organization_id="",organization_label="",node_id="1",sla_type="",tenant_id="",cluster="my-cluster",cluster_type="SELF_HOSTED",job="cockroachdb",region="local",node=""} 0.000000 1696013740.0
admission_admitted_elastic_cpu{instance="",organization_id="",organization_label="",node_id="1",sla_type="",tenant_id="",cluster="my-cluster",cluster_type="SELF_HOSTED",job="cockroachdb",region="local",node=""} 0.000000 1696013750.0
....

Epic: None

113138: kv: assert transaction finalization after re-issued EndTxn r=nvanbenschoten a=nvanbenschoten

Informs #111962.
Informs #111967.

This commit updates the two places where EndTxn requests are re-issued in the TxnCoordSender (one in the txnCommitter, one in the txnSpanRefresher) to ensure that after the retry succeeds and the response is stitched back together, the transaction is finalized.

I have no reason to believe that these assertions will fail.

Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Oct 26, 2023
3 parents 29dee14 + f945442 + dc9f132 commit 5d881d8
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 47 deletions.
2 changes: 2 additions & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ go_test(
"start_test.go",
"statement_bundle_test.go",
"statement_diag_test.go",
"tsdump_test.go",
"userfiletable_test.go",
"workload_test.go",
"zip_helpers_test.go",
Expand Down Expand Up @@ -410,6 +411,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,9 +1538,11 @@ func init() {
f.Var(&debugLogChanSel, "only-channels", "selection of channels to include in the output diagram.")

f = debugTimeSeriesDumpCmd.Flags()
f.Var(&debugTimeSeriesDumpOpts.format, "format", "output format (text, csv, tsv, raw)")
f.Var(&debugTimeSeriesDumpOpts.format, "format", "output format (text, csv, tsv, raw, openmetrics)")
f.Var(&debugTimeSeriesDumpOpts.from, "from", "oldest timestamp to include (inclusive)")
f.Var(&debugTimeSeriesDumpOpts.to, "to", "newest timestamp to include (inclusive)")
f.StringVar(&debugTimeSeriesDumpOpts.clusterLabel, "cluster-label",
"", "prometheus label for cluster name")

f = debugSendKVBatchCmd.Flags()
f.StringVar(&debugSendKVBatchContext.traceFormat, "trace", debugSendKVBatchContext.traceFormat,
Expand Down
233 changes: 192 additions & 41 deletions pkg/cli/tsdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ import (
"bufio"
"context"
"encoding/csv"
"encoding/gob"
"fmt"
"io"
"os"
"regexp"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/ts/tsutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -31,12 +36,14 @@ import (
// TODO(knz): this struct belongs elsewhere.
// See: https://github.com/cockroachdb/cockroach/issues/49509
var debugTimeSeriesDumpOpts = struct {
format tsDumpFormat
from, to timestampValue
format tsDumpFormat
from, to timestampValue
clusterLabel string
}{
format: tsDumpText,
from: timestampValue{},
to: timestampValue(timeutil.Now().Add(24 * time.Hour)),
format: tsDumpText,
from: timestampValue{},
to: timestampValue(timeutil.Now().Add(24 * time.Hour)),
clusterLabel: "",
}

var debugTimeSeriesDumpCmd = &cobra.Command{
Expand All @@ -47,46 +54,28 @@ Dumps all of the raw timeseries values in a cluster. Only the default resolution
is retrieved, i.e. typically datapoints older than the value of the
'timeseries.storage.resolution_10s.ttl' cluster setting will be absent from the
output.
When an input file is provided instead (as an argument), this input file
must previously have been created with the --format=raw switch. The command
will then convert it to the --format requested in the current invocation.
`,
Args: cobra.RangeArgs(0, 1),
RunE: clierrorplus.MaybeDecorateError(func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conn, finish, err := getClientGRPCConn(ctx, serverCfg)
if err != nil {
return err
}
defer finish()

names, err := serverpb.GetInternalTimeseriesNamesFromServer(ctx, conn)
if err != nil {
return err
}

req := &tspb.DumpRequest{
StartNanos: time.Time(debugTimeSeriesDumpOpts.from).UnixNano(),
EndNanos: time.Time(debugTimeSeriesDumpOpts.to).UnixNano(),
Names: names,
}

if debugTimeSeriesDumpOpts.format == tsDumpRaw {
tsClient := tspb.NewTimeSeriesClient(conn)
stream, err := tsClient.DumpRaw(context.Background(), req)
if err != nil {
return err
}

// Buffer the writes to os.Stdout since we're going to
// be writing potentially a lot of data to it.
w := bufio.NewWriter(os.Stdout)
if err := tsutil.DumpRawTo(stream, w); err != nil {
return err
}
return w.Flush()
var convertFile string
if len(args) > 0 {
convertFile = args[0]
}

var w tsWriter
switch debugTimeSeriesDumpOpts.format {
case tsDumpRaw:
if convertFile != "" {
return errors.Errorf("input file is already in raw format")
}
// Special case, we don't go through the text output code.
case tsDumpCSV:
w = csvTSWriter{w: csv.NewWriter(os.Stdout)}
case tsDumpTSV:
Expand All @@ -95,18 +84,101 @@ output.
w = cw
case tsDumpText:
w = defaultTSWriter{w: os.Stdout}
case tsDumpOpenMetrics:
w = makeOpenMetricsWriter(os.Stdout)
default:
return errors.Newf("unknown output format: %v", debugTimeSeriesDumpOpts.format)
}

tsClient := tspb.NewTimeSeriesClient(conn)
stream, err := tsClient.Dump(context.Background(), req)
if err != nil {
return err
var recv func() (*tspb.TimeSeriesData, error)
if convertFile == "" {
// To enable conversion without a running cluster, we want to skip
// connecting to the server when converting an existing tsdump.
conn, finish, err := getClientGRPCConn(ctx, serverCfg)
if err != nil {
return err
}
defer finish()

names, err := serverpb.GetInternalTimeseriesNamesFromServer(ctx, conn)
if err != nil {
return err
}
req := &tspb.DumpRequest{
StartNanos: time.Time(debugTimeSeriesDumpOpts.from).UnixNano(),
EndNanos: time.Time(debugTimeSeriesDumpOpts.to).UnixNano(),
Names: names,
}
tsClient := tspb.NewTimeSeriesClient(conn)

if debugTimeSeriesDumpOpts.format == tsDumpRaw {
stream, err := tsClient.DumpRaw(context.Background(), req)
if err != nil {
return err
}

// Buffer the writes to os.Stdout since we're going to
// be writing potentially a lot of data to it.
w := bufio.NewWriter(os.Stdout)
if err := tsutil.DumpRawTo(stream, w); err != nil {
return err
}
return w.Flush()
}
stream, err := tsClient.Dump(context.Background(), req)
if err != nil {
return err
}
recv = stream.Recv
} else {
f, err := os.Open(args[0])
if err != nil {
return err
}
type tup struct {
data *tspb.TimeSeriesData
err error
}

dec := gob.NewDecoder(f)
gob.Register(&roachpb.KeyValue{})
decodeOne := func() (*tspb.TimeSeriesData, error) {
var v roachpb.KeyValue
err := dec.Decode(&v)
if err != nil {
return nil, err
}

var data *tspb.TimeSeriesData
dumper := ts.DefaultDumper{Send: func(d *tspb.TimeSeriesData) error {
data = d
return nil
}}
if err := dumper.Dump(&v); err != nil {
return nil, err
}
return data, nil
}

ch := make(chan tup, 4096)
go func() {
// ch is closed when the process exits, so closing channel here is
// more for extra protection.
defer close(ch)
for {
data, err := decodeOne()
ch <- tup{data, err}
}
}()

recv = func() (*tspb.TimeSeriesData, error) {
r := <-ch
return r.data, r.err
}
}

for {
data, err := stream.Recv()
data, err := recv()
if err == io.EOF {
return w.Flush()
}
Expand All @@ -125,6 +197,80 @@ type tsWriter interface {
Flush() error
}

type openMetricsWriter struct {
out io.Writer
labels map[string]string
}

func makeOpenMetricsWriter(out io.Writer) *openMetricsWriter {
// construct labels
labelMap := make(map[string]string)
// Hardcoded values
labelMap["cluster_type"] = "SELF_HOSTED"
labelMap["job"] = "cockroachdb"
labelMap["region"] = "local"
// Zero values
labelMap["instance"] = ""
labelMap["node"] = ""
labelMap["organization_id"] = ""
labelMap["organization_label"] = ""
labelMap["sla_type"] = ""
labelMap["tenant_id"] = ""
// Command values
if debugTimeSeriesDumpOpts.clusterLabel != "" {
labelMap["cluster"] = debugTimeSeriesDumpOpts.clusterLabel
} else if serverCfg.ClusterName != "" {
labelMap["cluster"] = serverCfg.ClusterName
} else {
labelMap["cluster"] = fmt.Sprintf("cluster-debug-%d", timeutil.Now().Unix())
}
return &openMetricsWriter{out: out, labels: labelMap}
}

var reCrStoreNode = regexp.MustCompile(`^cr\.([^\.]+)\.(.*)$`)
var rePromTSName = regexp.MustCompile(`[^a-z0-9]`)

func (w *openMetricsWriter) Emit(data *tspb.TimeSeriesData) error {
name := data.Name
sl := reCrStoreNode.FindStringSubmatch(data.Name)
labelMap := w.labels
labelMap["node_id"] = "0"
if len(sl) != 0 {
storeNodeKey := sl[1]
if storeNodeKey == "node" {
storeNodeKey += "_id"
}
labelMap[storeNodeKey] = data.Source
name = sl[2]
}
var l []string
for k, v := range labelMap {
l = append(l, fmt.Sprintf("%s=%q", k, v))
}
labels := "{" + strings.Join(l, ",") + "}"
name = rePromTSName.ReplaceAllLiteralString(name, `_`)
for _, pt := range data.Datapoints {
if _, err := fmt.Fprintf(
w.out,
"%s%s %f %d.%d\n",
name,
labels,
pt.Value,
// Convert to Unix Epoch in seconds with preserved precision
// (https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps).
pt.TimestampNanos/1e9, pt.TimestampNanos%1e9,
); err != nil {
return err
}
}
return nil
}

func (w *openMetricsWriter) Flush() error {
fmt.Fprintln(w.out, `# EOF`)
return nil
}

type csvTSWriter struct {
w *csv.Writer
}
Expand Down Expand Up @@ -172,6 +318,7 @@ const (
tsDumpCSV
tsDumpTSV
tsDumpRaw
tsDumpOpenMetrics
)

// Type implements the pflag.Value interface.
Expand All @@ -188,6 +335,8 @@ func (m *tsDumpFormat) String() string {
return "text"
case tsDumpRaw:
return "raw"
case tsDumpOpenMetrics:
return "openmetrics"
}
return ""
}
Expand All @@ -203,6 +352,8 @@ func (m *tsDumpFormat) Set(s string) error {
*m = tsDumpTSV
case "raw":
*m = tsDumpRaw
case "openmetrics":
*m = tsDumpOpenMetrics
default:
return fmt.Errorf("invalid value for --format: %s", s)
}
Expand Down
Loading

0 comments on commit 5d881d8

Please sign in to comment.