Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

add capability to process bigtable indices with mt-index-cat and re-organize code #1909

Merged
merged 5 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 134 additions & 49 deletions cmd/mt-index-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/grafana/metrictank/cmd/mt-index-cat/out"
"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/idx/bigtable"
"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/logger"
Expand Down Expand Up @@ -47,13 +48,15 @@ func main() {
var verbose bool
var limit int
var partitionStr string
var btTotalPartitions int

globalFlags := flag.NewFlagSet("global config flags", flag.ExitOnError)
globalFlags.StringVar(&addr, "addr", "http://localhost:6060", "graphite/metrictank address")
globalFlags.StringVar(&prefix, "prefix", "", "only show metrics that have this prefix")
globalFlags.StringVar(&substr, "substr", "", "only show metrics that have this substring")
globalFlags.StringVar(&suffix, "suffix", "", "only show metrics that have this suffix")
globalFlags.StringVar(&partitionStr, "partitions", "*", "only show metrics from the comma separated list of partitions or * for all")
globalFlags.IntVar(&btTotalPartitions, "bt-total-partitions", -1, "total number of partitions (when using bigtable and partitions='*')")
globalFlags.StringVar(&regexStr, "regex", "", "only show metrics that match this regex")
globalFlags.StringVar(&tags, "tags", "", "tag filter. empty (default), 'some', 'none', 'valid', or 'invalid'")
globalFlags.StringVar(&from, "from", "30min", "for vegeta outputs, will generate requests for data starting from now minus... eg '30min', '5h', '14d', etc. or a unix timestamp")
Expand All @@ -63,6 +66,7 @@ func main() {
globalFlags.BoolVar(&verbose, "verbose", false, "print stats to stderr")

cassFlags := cassandra.ConfigSetup()
btFlags := bigtable.ConfigSetup()

outputs := []string{"dump", "list", "vegeta-render", "vegeta-render-patterns"}

Expand All @@ -84,9 +88,13 @@ func main() {
fmt.Println(" 'valid' only show metrics whose tags (if any) are valid")
fmt.Println(" 'invalid' only show metrics that have one or more invalid tags")
fmt.Println()
fmt.Printf("idxtype: only 'cass' supported for now\n\n")
fmt.Printf("idxtype: 'cass' (cassandra) or 'bt' (bigtable)\n\n")
fmt.Printf("cass config flags:\n\n")
cassFlags.PrintDefaults()
fmt.Printf("\n\n")
fmt.Printf("bigtable config flags:\n\n")
btFlags.PrintDefaults()
fmt.Printf("\n\n")
fmt.Println()
fmt.Println("output:")
fmt.Println()
Expand All @@ -102,12 +110,18 @@ func main() {
fmt.Println(" roundDuration: formats an integer-seconds duration using aggressive rounding. for the purpose of getting an idea of overal metrics age")
fmt.Println()
fmt.Println()
fmt.Println("EXAMPLES:")
fmt.Println("Cassandra Examples:")
fmt.Println("mt-index-cat -from 60min cass -hosts cassandra:9042 list")
fmt.Println("mt-index-cat -from 60min cass -hosts cassandra:9042 'sumSeries({{.Name | pattern}})'")
fmt.Println("mt-index-cat -from 60min cass -hosts cassandra:9042 'GET http://localhost:6060/render?target=sumSeries({{.Name | pattern}})&from=-6h\\nX-Org-Id: 1\\n\\n'")
fmt.Println("mt-index-cat cass -hosts cassandra:9042 -timeout 60s '{{.LastUpdate | age | roundDuration}}\\n' | sort | uniq -c")
fmt.Println("mt-index-cat cass -hosts localhost:9042 -schema-file ../../scripts/config/schema-idx-cassandra.toml '{{.Name | patternCustom 15 \"pass\" 40 \"1rcnw\" 15 \"2rcnw\" 10 \"3rcnw\" 10 \"3rccw\" 10 \"2rccw\"}}\\n'")
fmt.Println()
fmt.Println()
fmt.Println("Bigtable Examples:")
fmt.Println("mt-index-cat -max-stale 0 -bt-total-partitions 128 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false list")
fmt.Println("mt-index-cat -max-stale 768h -partitions 1,2,3 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false '{{.NameWithTags}} {{.Id}} {{.OrgId}} {{.LastUpdate}} {{.Partition}}\n'")

}

if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
Expand Down Expand Up @@ -139,13 +153,24 @@ func main() {
os.Exit(-1)
}
var cassI int
var btI int
for i, v := range os.Args {
if v == "cass" {
cassI = i
}
if v == "bt" {
btI = i
}
}
if cassI == 0 {
log.Println("only indextype 'cass' supported")

if cassI > 0 && btI > 0 {
log.Println("you may only use one index type at a time, either 'cass' or 'bt'")
flag.Usage()
os.Exit(1)
}

if cassI == 0 && btI == 0 {
log.Println("you must use one index type, either 'cass' or 'bt'")
flag.Usage()
os.Exit(1)
}
Expand All @@ -156,9 +181,21 @@ func main() {
os.Exit(1)
}

globalFlags.Parse(os.Args[1:cassI])
cassFlags.Parse(os.Args[cassI+1 : len(os.Args)-1])
cassandra.CliConfig.Enabled = true
var idxFlagPos int
if cassI > 0 {
idxFlagPos = cassI
} else {
idxFlagPos = btI
}

globalFlags.Parse(os.Args[1:idxFlagPos])
if cassI > 0 {
cassFlags.Parse(os.Args[idxFlagPos+1 : len(os.Args)-1])
cassandra.CliConfig.Enabled = true
} else {
btFlags.Parse(os.Args[idxFlagPos+1 : len(os.Args)-1])
bigtable.CliConfig.Enabled = true
}

if regexStr != "" {
var err error
Expand All @@ -184,8 +221,17 @@ func main() {
show = out.Template(format)
}

idx := cassandra.New(cassandra.CliConfig)
err := idx.InitBare()
var cassIdx *cassandra.CasIdx
var btIdx *bigtable.BigtableIdx
var err error

if cassI > 0 {
cassIdx = cassandra.New(cassandra.CliConfig)
err = cassIdx.InitBare()
} else {
btIdx = bigtable.New(bigtable.CliConfig)
err = btIdx.InitBare()
}
perror(err)

// from should either be a unix timestamp, or a specification that graphite/metrictank will recognize.
Expand Down Expand Up @@ -237,53 +283,92 @@ func main() {
}
}

var defs []schema.MetricDefinition
if len(partitions) == 0 {
defs = idx.Load(nil, time.Now())
} else {
defs = idx.LoadPartitions(partitions, nil, time.Now())
}
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
total := len(defs)
shown := 0

for _, d := range defs {
// note that prefix and substr can be "", meaning filter disabled.
// the conditions handle this fine as well.
if !strings.HasPrefix(d.Name, prefix) {
continue
}
if !strings.HasSuffix(d.Name, suffix) {
continue
}
if !strings.Contains(d.Name, substr) {
continue
}
if tags == "none" && len(d.Tags) != 0 {
continue
}
if tags == "some" && len(d.Tags) == 0 {
continue
// if partitionStr is set to all (*) and we are using bigtable then we must
// ensure that we know the total number of partitions
if partitionStr == "*" && btI > 0 {
if btTotalPartitions == -1 {
log.Println("When selecting all partitions with bigtable you must specify the total number of partitions for the instance")
flag.Usage()
os.Exit(-1)
} else {
for i := 0; i < btTotalPartitions; i++ {
partitions = append(partitions, int32(i))
}
}
if regex != nil && !regex.MatchString(d.Name) {
continue
}

var total int
var shown int

processDefs := func(defs []schema.MetricDefinition) {
total += len(defs)
if shown >= limit && limit > 0 {
fmt.Fprintf(os.Stderr, "Limit (%d) reached while processing Metric Definitions", limit)
return
}
if tags == "valid" || tags == "invalid" {
err := schema.ValidateTags(d.Tags)
for _, d := range defs {
// note that prefix and substr can be "", meaning filter disabled.
// the conditions handle this fine as well.
if !strings.HasPrefix(d.Name, prefix) {
continue
}
if !strings.HasSuffix(d.Name, suffix) {
continue
}
if !strings.Contains(d.Name, substr) {
continue
}
if tags == "none" && len(d.Tags) != 0 {
continue
}
if tags == "some" && len(d.Tags) == 0 {
continue
}
if regex != nil && !regex.MatchString(d.Name) {
continue
}
if tags == "valid" || tags == "invalid" {
err := schema.ValidateTags(d.Tags)

// skip the metric if the validation result is not what we want
if (err == nil) != (tags == "valid") {
// skip the metric if the validation result is not what we want
if (err == nil) != (tags == "valid") {
continue
}
}
if cutoffMin != 0 && d.LastUpdate >= cutoffMin {
continue
}
show(d)
shown++
if shown >= limit && limit > 0 {
fmt.Fprintf(os.Stderr, "Limit (%d) reached while processing Metric Definitions", limit)
return
}
}
if cutoffMin != 0 && d.LastUpdate >= cutoffMin {
continue
}

var defs []schema.MetricDefinition
if len(partitions) == 0 {
if cassI > 0 {
defs = cassIdx.Load(nil, time.Now())
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
processDefs(defs)
}
show(d)
shown += 1
if shown == limit {
break
} else {
if cassI > 0 {
defs = cassIdx.LoadPartitions(partitions, nil, time.Now())
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
processDefs(defs)
} else {
now := time.Now()
for _, p := range partitions {
defs = btIdx.LoadPartition(p, nil, now)
// set this after doing the query, to assure age can't possibly be negative unless if clocks are misconfigured.
out.QueryTime = time.Now().Unix()
processDefs(defs)
}
}
}

Expand Down
40 changes: 38 additions & 2 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ global config flags:

-addr string
graphite/metrictank address (default "http://localhost:6060")
-bt-total-partitions int
total number of partitions (when using bigtable and partitions='*') (default -1)
-from string
for vegeta outputs, will generate requests for data starting from now minus... eg '30min', '5h', '14d', etc. or a unix timestamp (default "30min")
-limit int
Expand Down Expand Up @@ -221,7 +223,7 @@ tags filter:
'valid' only show metrics whose tags (if any) are valid
'invalid' only show metrics that have one or more invalid tags

idxtype: only 'cass' supported for now
idxtype: 'cass' (cassandra) or 'bt' (bigtable)

cass config flags:

Expand Down Expand Up @@ -276,6 +278,34 @@ cass config flags:
-write-queue-size int
Max number of metricDefs allowed to be unwritten to cassandra (default 100000)


bigtable config flags:

-bigtable-instance string
Name of bigtable instance (default "default")
-create-cf
enable the creation of the table and column families (default true)
-enabled

-gcp-project string
Name of GCP project the bigtable cluster resides in (default "default")
-prune-interval duration
Interval at which the index should be checked for stale series. (default 3h0m0s)
-table-name string
Name of bigtable table used for metricDefs (default "metrics")
-update-bigtable-index
synchronize index changes to bigtable. not all your nodes need to do this. (default true)
-update-interval duration
frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates (default 3h0m0s)
-write-concurrency int
Number of writer threads to use (default 5)
-write-max-flush-size int
Max number of metricDefs in each batch write to bigtable (default 10000)
-write-queue-size int
Max number of metricDefs allowed to be unwritten to bigtable. Must be larger then write-max-flush-size (default 100000)



output:

* presets: dump|list|vegeta-render|vegeta-render-patterns
Expand All @@ -297,12 +327,18 @@ output:
roundDuration: formats an integer-seconds duration using aggressive rounding. for the purpose of getting an idea of overal metrics age


EXAMPLES:
Cassandra Examples:
mt-index-cat -from 60min cass -hosts cassandra:9042 list
mt-index-cat -from 60min cass -hosts cassandra:9042 'sumSeries({{.Name | pattern}})'
mt-index-cat -from 60min cass -hosts cassandra:9042 'GET http://localhost:6060/render?target=sumSeries({{.Name | pattern}})&from=-6h\nX-Org-Id: 1\n\n'
mt-index-cat cass -hosts cassandra:9042 -timeout 60s '{{.LastUpdate | age | roundDuration}}\n' | sort | uniq -c
mt-index-cat cass -hosts localhost:9042 -schema-file ../../scripts/config/schema-idx-cassandra.toml '{{.Name | patternCustom 15 "pass" 40 "1rcnw" 15 "2rcnw" 10 "3rcnw" 10 "3rccw" 10 "2rccw"}}\n'


Bigtable Examples:
mt-index-cat -max-stale 0 -bt-total-partitions 128 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false list
mt-index-cat -max-stale 768h -partitions 1,2,3 bt -gcp-project your_project -bigtable-instance the_bt_instance -table-name metric_idx -create-cf false '{{.NameWithTags}} {{.Id}} {{.OrgId}} {{.LastUpdate}} {{.Partition}}
'
```


Expand Down
3 changes: 2 additions & 1 deletion idx/bigtable/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewIdxConfig() *IdxConfig {

var CliConfig = NewIdxConfig()

func ConfigSetup() {
func ConfigSetup() *flag.FlagSet {
btIdx := flag.NewFlagSet("bigtable-idx", flag.ExitOnError)

btIdx.BoolVar(&CliConfig.Enabled, "enabled", CliConfig.Enabled, "")
Expand All @@ -73,6 +73,7 @@ func ConfigSetup() {
btIdx.BoolVar(&CliConfig.CreateCF, "create-cf", CliConfig.CreateCF, "enable the creation of the table and column families")

globalconf.Register("bigtable-idx", btIdx, flag.ExitOnError)
return btIdx
}

func ConfigProcess() {
Expand Down