From 75ce6c3de34492bed145fbc0aecd9b3bc154f809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linas=20Med=C5=BEi=C5=ABnas?= Date: Mon, 7 Dec 2020 07:44:46 +0200 Subject: [PATCH] [tools] Add benchmarking support to read_data_files (#2986) --- src/cmd/tools/read_data_files/main/main.go | 91 ++++++++++++++++++---- 1 file changed, 74 insertions(+), 17 deletions(-) diff --git a/src/cmd/tools/read_data_files/main/main.go b/src/cmd/tools/read_data_files/main/main.go index d48e9ec561..550d4ff563 100644 --- a/src/cmd/tools/read_data_files/main/main.go +++ b/src/cmd/tools/read_data_files/main/main.go @@ -41,18 +41,35 @@ import ( "go.uber.org/zap" ) -const snapshotType = "snapshot" -const flushType = "flush" +const ( + snapshotType = "snapshot" + flushType = "flush" +) + +type benchmarkMode uint8 + +const ( + // benchmarkNone prints the data read to the standard output and does not measure performance. + benchmarkNone benchmarkMode = iota + + // benchmarkSeries benchmarks time series read performance (skipping datapoint decoding). + benchmarkSeries + + // benchmarkDatapoints benchmarks series read, including datapoint decoding. + benchmarkDatapoints +) func main() { var ( optPathPrefix = getopt.StringLong("path-prefix", 'p', "", "Path prefix [e.g. /var/lib/m3db]") - optNamespace = getopt.StringLong("namespace", 'n', "", "Namespace [e.g. metrics]") + optNamespace = getopt.StringLong("namespace", 'n', "default", "Namespace [e.g. metrics]") optShard = getopt.Uint32Long("shard", 's', 0, "Shard [expected format uint32]") optBlockstart = getopt.Int64Long("block-start", 'b', 0, "Block Start Time [in nsec]") volume = getopt.Int64Long("volume", 'v', 0, "Volume number") fileSetTypeArg = getopt.StringLong("fileset-type", 't', flushType, fmt.Sprintf("%s|%s", flushType, snapshotType)) idFilter = getopt.StringLong("id-filter", 'f', "", "ID Contains Filter (optional)") + benchmark = getopt.StringLong( + "benchmark", 'B', "", "benchmark mode (optional), [series/datapoints]") ) getopt.Parse() @@ -82,12 +99,30 @@ func main() { log.Fatalf("unknown fileset type: %s", *fileSetTypeArg) } + var benchMode benchmarkMode + switch *benchmark { + case "": + case "series": + benchMode = benchmarkSeries + case "datapoints": + benchMode = benchmarkDatapoints + default: + log.Fatalf("unknown benchmark type: %s", *benchmark) + } + bytesPool := tools.NewCheckedBytesPool() bytesPool.Init() encodingOpts := encoding.NewOptions().SetBytesPool(bytesPool) fsOpts := fs.NewOptions().SetFilePathPrefix(*optPathPrefix) + + var ( + seriesCount = 0 + datapointCount = 0 + start = time.Now() + ) + reader, err := fs.NewReader(bytesPool, fsOpts) if err != nil { log.Fatalf("could not create new reader: %v", err) @@ -121,23 +156,45 @@ func main() { continue } - data.IncRef() - iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts) - for iter.Next() { - dp, _, annotation := iter.Current() - // Use fmt package so it goes to stdout instead of stderr - fmt.Printf("{id: %s, dp: %+v", id.String(), dp) - if len(annotation) > 0 { - fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation)) + if benchMode != benchmarkSeries { + data.IncRef() + + iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts) + for iter.Next() { + dp, _, annotation := iter.Current() + if benchMode == benchmarkNone { + // Use fmt package so it goes to stdout instead of stderr + fmt.Printf("{id: %s, dp: %+v", id.String(), dp) + if len(annotation) > 0 { + fmt.Printf(", annotation: %s", base64.StdEncoding.EncodeToString(annotation)) + } + fmt.Println("}") + } + datapointCount++ } - fmt.Println("}") - } - if err := iter.Err(); err != nil { - log.Fatalf("unable to iterate original data: %v", err) + if err := iter.Err(); err != nil { + log.Fatalf("unable to iterate original data: %v", err) + } + iter.Close() + + data.DecRef() } - iter.Close() - data.DecRef() data.Finalize() + seriesCount++ + } + + if benchMode != benchmarkNone { + runTime := time.Since(start) + fmt.Printf("Running time: %s\n", runTime) + fmt.Printf("\n%d series read\n", seriesCount) + if runTime > 0 { + fmt.Printf("(%.2f series/second)\n", float64(seriesCount)/runTime.Seconds()) + } + + if benchMode == benchmarkDatapoints { + fmt.Printf("\n%d datapoints decoded\n", datapointCount) + fmt.Printf("(%.2f datapoints/second)\n", float64(datapointCount)/runTime.Seconds()) + } } }