Skip to content

Commit

Permalink
feat: port influx inspect report-tsm to 2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 committed Jul 8, 2021
1 parent 0cb0da2 commit 06840b8
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/influxd/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_tsm"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm"
Expand Down Expand Up @@ -36,6 +37,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
base.AddCommand(dump_tsm.NewDumpTSMCommand())
base.AddCommand(dump_tsi.NewDumpTSICommand())
base.AddCommand(delete_tsm.NewDeleteTSMCommand())
base.AddCommand(report_tsm.NewReportTSMCommand())

return base, nil
}
356 changes: 356 additions & 0 deletions cmd/influxd/inspect/report_tsm/report_tsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
package report_tsm

import (
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"text/tabwriter"
"time"

"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/retailnext/hllpp"
"github.com/spf13/cobra"
)

type args struct {
dir string
pattern string
detailed bool
exact bool
}

func NewReportTSMCommand() *cobra.Command {
var arguments args
cmd := &cobra.Command{
Use: "report-tsm",
Short: "Run TSM report",
Long: `
This command will analyze TSM files within a storage engine directory, reporting
the cardinality within the files as well as the time range that the point data
covers.
This command only interrogates the index within each file, and does not read any
block data. To reduce heap requirements, by default report-tsm estimates the
overall cardinality in the file set by using the HLL++ algorithm. Exact
cardinalities can be determined by using the --exact flag.
For each file, the following is output:
* The full filename;
* The series cardinality within the file;
* The number of series first encountered within the file;
* The min and max timestamp associated with TSM data in the file; and
* The time taken to load the TSM index and apply any tombstones.
The summary section then outputs the total time range and series cardinality for
the fileset. Depending on the --detailed flag, series cardinality is segmented
in the following ways:
* Series cardinality for each organization;
* Series cardinality for each bucket;
* Series cardinality for each measurement;
* Number of field keys for each measurement; and
* Number of tag values for each tag key.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {

// Verify if shard dir
err := arguments.isShardDir(arguments.dir)
if arguments.detailed && err != nil {
return fmt.Errorf("--detailed only supported for shard dirs")
}

return arguments.Run(cmd)
},
}

cmd.Flags().StringVarP(&arguments.pattern, "pattern", "", "", "only process TSM files containing pattern")
cmd.Flags().BoolVarP(&arguments.exact, "exact", "", false, "calculate and exact cardinality count. Warning, may use significant memory...")
cmd.Flags().BoolVarP(&arguments.detailed, "detailed", "", false, "emit series cardinality segmented by measurements, tag keys and fields. Warning, may take a while.")

dir, err := fs.InfluxDir()
if err != nil {
panic(err)
}
dir = filepath.Join(dir, "engine/data")
cmd.Flags().StringVarP(&arguments.dir, "data-dir", "", dir, fmt.Sprintf("use provided data directory (defaults to %s).", dir))

return cmd
}

func (a *args) isShardDir(dir string) error {
name := filepath.Base(dir)
if id, err := strconv.Atoi(name); err != nil || id < 1 {
return fmt.Errorf("not a valid shard dir: %v", dir)
}

return nil
}

func (a *args) Run(cmd *cobra.Command) error {
// Create the cardinality counter
newCounterFn := newHLLCounter
estTitle := " (est)"
if a.exact {
estTitle = ""
newCounterFn = newExactCounter
}

totalSeries := newCounterFn()
tagCardinalities := map[string]counter{}
measCardinalities := map[string]counter{}
fieldCardinalities := map[string]counter{}

dbCardinalities := map[string]counter{}

start := time.Now()

tw := tabwriter.NewWriter(cmd.OutOrStdout(), 8, 2, 1, ' ', 0)
_, _ = fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New" + estTitle, "Min Time", "Max Time", "Load Time"}, "\t"))

minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64)
var fileCount int
if err := a.walkShardDirs(a.dir, func(db, rp, id, path string) error {
if a.pattern != "" && strings.Contains(path, a.pattern) {
return nil
}

file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "error: %s: %v. Skipping.\n", path, err)
return nil
}

loadStart := time.Now()
reader, err := tsm1.NewTSMReader(file)
if err != nil {
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "error: %s: %v. Skipping.\n", file.Name(), err)
return nil
}
loadTime := time.Since(loadStart)
fileCount++

dbCount := dbCardinalities[db]
if dbCount == nil {
dbCount = newCounterFn()
dbCardinalities[db] = dbCount
}

oldCount := dbCount.Count()

seriesCount := reader.KeyCount()
for i := 0; i < seriesCount; i++ {
key, _ := reader.KeyAt(i)
totalSeries.Add(key)
dbCount.Add(key)

if a.detailed {
sep := strings.Index(string(key), "#!~#")
seriesKey, field := key[:sep], key[sep+4:]
measurement, tags := models.ParseKey(seriesKey)

measCount := measCardinalities[measurement]
if measCount == nil {
measCount = newCounterFn()
measCardinalities[measurement] = measCount
}
measCount.Add(key)

fieldCount := fieldCardinalities[measurement]
if fieldCount == nil {
fieldCount = newCounterFn()
fieldCardinalities[measurement] = fieldCount
}
fieldCount.Add(field)

for _, t := range tags {
tagCount := tagCardinalities[string(t.Key)]
if tagCount == nil {
tagCount = newCounterFn()
tagCardinalities[string(t.Key)] = tagCount
}
tagCount.Add(t.Value)
}
}
}
minT, maxT := reader.TimeRange()
if minT < minTime {
minTime = minT
}
if maxT > maxTime {
maxTime = maxT
}
_ = reader.Close()

_, _ = fmt.Fprintln(tw, strings.Join([]string{
db, rp, id,
filepath.Base(file.Name()),
strconv.FormatInt(int64(seriesCount), 10),
strconv.FormatInt(int64(dbCount.Count()-oldCount), 10),
time.Unix(0, minT).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxT).UTC().Format(time.RFC3339Nano),
loadTime.String(),
}, "\t"))
if a.detailed {
_ = tw.Flush()
}
return nil
}); err != nil {
return err
}

_ = tw.Flush()

printArgs{
fileCount: fileCount,
minTime: minTime,
maxTime: maxTime,
estTitle: estTitle,
totalSeries: totalSeries,
detailed: a.detailed,
tagCardinalities: tagCardinalities,
measCardinalities: measCardinalities,
fieldCardinalities: fieldCardinalities,
dbCardinalities: dbCardinalities,
}.printSummary(cmd)

cmd.Printf("Completed in %s\n", time.Since(start))
return nil
}

type printArgs struct {
fileCount int
minTime, maxTime int64
estTitle string
totalSeries counter
detailed bool

tagCardinalities map[string]counter
measCardinalities map[string]counter
fieldCardinalities map[string]counter
dbCardinalities map[string]counter
}

func (p printArgs) printSummary(cmd *cobra.Command) {
cmd.Printf("\nSummary:")
cmd.Printf(" Files: %d\n", p.fileCount)
cmd.Printf(" Time Range: %s - %s\n",
time.Unix(0, p.minTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, p.maxTime).UTC().Format(time.RFC3339Nano),
)
cmd.Printf(" Duration: %s \n\n", time.Unix(0, p.maxTime).Sub(time.Unix(0, p.minTime)))

cmd.Printf("Statistics\n")
cmd.Printf(" Series:\n")
for db, counts := range p.dbCardinalities {
cmd.Printf(" - %s%s: %d (%d%%)\n", db, p.estTitle, counts.Count(), int(float64(counts.Count())/float64(p.totalSeries.Count())*100))
}
cmd.Printf(" Total%s: %d\n", p.estTitle, p.totalSeries.Count())

if p.detailed {
cmd.Printf("\n Measurements (est):\n")
for _, t := range sortKeys(p.measCardinalities) {
cmd.Printf(" - %v: %d (%d%%)\n", t, p.measCardinalities[t].Count(), int((float64(p.measCardinalities[t].Count())/float64(p.totalSeries.Count()))*100))
}

cmd.Printf("\n Fields (est):\n")
for _, t := range sortKeys(p.fieldCardinalities) {
cmd.Printf(" - %v: %d\n", t, p.fieldCardinalities[t].Count())
}

cmd.Printf("\n Tags (est):\n")
for _, t := range sortKeys(p.tagCardinalities) {
cmd.Printf(" - %v: %d\n", t, p.tagCardinalities[t].Count())
}
}
}

// sortKeys is a quick helper to return the sorted set of a map's keys
func sortKeys(vals map[string]counter) (keys []string) {
for k := range vals {
keys = append(keys, k)
}
sort.Strings(keys)

return keys
}

func (a *args) walkShardDirs(root string, fn func(db, rp, id, path string) error) error {
type location struct {
db, rp, id, path string
}

var dirs []location
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

if info.IsDir() {
return nil
}

if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension {
shardDir := filepath.Dir(path)

if err := a.isShardDir(shardDir); err != nil {
return err
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
parts := strings.Split(absPath, string(filepath.Separator))
db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2]
dirs = append(dirs, location{db: db, rp: rp, id: id, path: path})
return nil
}
return nil
}); err != nil {
return err
}

sort.Slice(dirs, func(i, j int) bool {
a, _ := strconv.Atoi(dirs[i].id)
b, _ := strconv.Atoi(dirs[j].id)
return a < b
})

for _, shard := range dirs {
if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil {
return err
}
}
return nil
}

// counter abstracts a a method of counting keys.
type counter interface {
Add(key []byte)
Count() uint64
}

// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation.
func newHLLCounter() counter {
return hllpp.New()
}

// exactCounter returns an exact count for keys using counting all distinct items in a set.
type exactCounter struct {
m map[string]struct{}
}

func (c *exactCounter) Add(key []byte) {
c.m[string(key)] = struct{}{}
}

func (c *exactCounter) Count() uint64 {
return uint64(len(c.m))
}

func newExactCounter() counter {
return &exactCounter{
m: make(map[string]struct{}),
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/retailnext/hllpp v1.0.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/retailnext/hllpp v1.0.0 h1:7+NffI2mo7lZG78NruEsf3jEnjJ6Z0n1otEyFqdK8zA=
github.com/retailnext/hllpp v1.0.0/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down

0 comments on commit 06840b8

Please sign in to comment.