diff --git a/CHANGELOG.md b/CHANGELOG.md index 72f24a51e5b..cfbff217938 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - [#8853](https://github.com/influxdata/influxdb/pull/8853): Reduce allocations, improve `readEntries` performance by simplifying loop - [#8830](https://github.com/influxdata/influxdb/issues/8830): Separate importer log statements to stdout and stderr. - [#8857](https://github.com/influxdata/influxdb/pull/8857): Improve performance of Bloom Filter in TSI index. +- [#8897](https://github.com/influxdata/influxdb/pull/8897): Add message pack format for query responses. ### Bugfixes diff --git a/Godeps b/Godeps index f4eaf4be234..84196b0e652 100644 --- a/Godeps +++ b/Godeps @@ -14,8 +14,10 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815 github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac +github.com/philhofer/fwd 1612a298117663d7bc9a760ae20d383413859798 github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce +github.com/tinylib/msgp ad0ff2e232ad2e37faf67087fb24bf8d04a8ce20 github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6 github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577 golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd diff --git a/LICENSE_OF_DEPENDENCIES.md b/LICENSE_OF_DEPENDENCIES.md index 8f3b434377f..da4ff1d04a8 100644 --- a/LICENSE_OF_DEPENDENCIES.md +++ b/LICENSE_OF_DEPENDENCIES.md @@ -15,8 +15,10 @@ - github.com/google/go-cmp [BSD LICENSE](https://github.com/google/go-cmp/blob/master/LICENSE) - github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt) - github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) +- github.com/philhofer/fwd [MIT LICENSE](https://github.com/philhofer/fwd/blob/master/LICENSE.md) - github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) - github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) +- github.com/tinylib/msgp [MIT LICENSE](https://github.com/tinylib/msgp/blob/master/LICENSE) - github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) - github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) - github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt) diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go index c59535d57f5..4e6239df8eb 100644 --- a/services/httpd/response_writer.go +++ b/services/httpd/response_writer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/tinylib/msgp/msgp" ) // ResponseWriter is an interface for writing a response. @@ -28,6 +29,9 @@ func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter { case "application/csv", "text/csv": w.Header().Add("Content-Type", "text/csv") rw.formatter = &csvFormatter{statementID: -1, Writer: w} + case "application/x-msgpack": + w.Header().Add("Content-Type", "application/x-msgpack") + rw.formatter = &msgpackFormatter{Writer: w} case "application/json": fallthrough default: @@ -188,3 +192,105 @@ func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) { } return n, nil } + +type msgpackFormatter struct { + io.Writer +} + +func (f *msgpackFormatter) ContentType() string { + return "application/x-msgpack" +} + +func (f *msgpackFormatter) WriteResponse(resp Response) (n int, err error) { + enc := msgp.NewWriter(f.Writer) + defer enc.Flush() + + enc.WriteMapHeader(1) + if resp.Err != nil { + enc.WriteString("error") + enc.WriteString(err.Error()) + return 0, nil + } else { + enc.WriteString("results") + enc.WriteArrayHeader(uint32(len(resp.Results))) + for _, result := range resp.Results { + if result.Err != nil { + enc.WriteMapHeader(1) + enc.WriteString("error") + enc.WriteString(result.Err.Error()) + continue + } + + sz := 2 + if len(result.Messages) > 0 { + sz++ + } + if result.Partial { + sz++ + } + enc.WriteMapHeader(uint32(sz)) + enc.WriteString("statement_id") + enc.WriteInt(result.StatementID) + if len(result.Messages) > 0 { + enc.WriteString("messages") + enc.WriteArrayHeader(uint32(len(result.Messages))) + for _, msg := range result.Messages { + enc.WriteMapHeader(2) + enc.WriteString("level") + enc.WriteString(msg.Level) + enc.WriteString("text") + enc.WriteString(msg.Text) + } + } + enc.WriteString("series") + enc.WriteArrayHeader(uint32(len(result.Series))) + for _, series := range result.Series { + sz := 2 + if series.Name != "" { + sz++ + } + if len(series.Tags) > 0 { + sz++ + } + if series.Partial { + sz++ + } + enc.WriteMapHeader(uint32(sz)) + if series.Name != "" { + enc.WriteString("name") + enc.WriteString(series.Name) + } + if len(series.Tags) > 0 { + enc.WriteString("tags") + enc.WriteMapHeader(uint32(len(series.Tags))) + for k, v := range series.Tags { + enc.WriteString(k) + enc.WriteString(v) + } + } + enc.WriteString("columns") + enc.WriteArrayHeader(uint32(len(series.Columns))) + for _, col := range series.Columns { + enc.WriteString(col) + } + enc.WriteString("values") + enc.WriteArrayHeader(uint32(len(series.Values))) + for _, values := range series.Values { + enc.WriteArrayHeader(uint32(len(values))) + for _, v := range values { + enc.WriteIntf(v) + } + } + if series.Partial { + enc.WriteString("partial") + enc.WriteBool(series.Partial) + } + } + if result.Partial { + enc.WriteString("partial") + enc.WriteBool(true) + } + } + } + return 0, nil +} diff --git a/services/httpd/response_writer_test.go b/services/httpd/response_writer_test.go index e74781b0bb3..08b262a9145 100644 --- a/services/httpd/response_writer_test.go +++ b/services/httpd/response_writer_test.go @@ -1,16 +1,21 @@ package httpd_test import ( + "bytes" + "encoding/json" + "fmt" "math" "net/http" "net/http/httptest" "net/url" + "strings" "testing" "time" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" + "github.com/tinylib/msgp/msgp" ) func TestResponseWriter_CSV(t *testing.T) { @@ -62,3 +67,65 @@ cpu,"host=server01,region=uswest",70,9223372036854775808 t.Errorf("unexpected output:\n\ngot=%v\nwant=%s", got, want) } } + +func TestResponseWriter_MessagePack(t *testing.T) { + header := make(http.Header) + header.Set("Accept", "application/x-msgpack") + r := &http.Request{ + Header: header, + URL: &url.URL{}, + } + w := httptest.NewRecorder() + + writer := httpd.NewResponseWriter(w, r) + writer.WriteResponse(httpd.Response{ + Results: []*query.Result{ + { + StatementID: 0, + Series: []*models.Row{ + { + Name: "cpu", + Tags: map[string]string{ + "host": "server01", + }, + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + {time.Unix(0, 10), float64(2.5)}, + {time.Unix(0, 20), int64(5)}, + {time.Unix(0, 30), nil}, + {time.Unix(0, 40), "foobar"}, + {time.Unix(0, 50), true}, + {time.Unix(0, 60), false}, + {time.Unix(0, 70), uint64(math.MaxInt64 + 1)}, + }, + }, + }, + }, + }, + }) + + // The reader always reads times as time.Local so encode the expected response + // as JSON and insert it into the expected values. + values, err := json.Marshal([][]interface{}{ + {time.Unix(0, 10).Local(), float64(2.5)}, + {time.Unix(0, 20).Local(), int64(5)}, + {time.Unix(0, 30).Local(), nil}, + {time.Unix(0, 40).Local(), "foobar"}, + {time.Unix(0, 50).Local(), true}, + {time.Unix(0, 60).Local(), false}, + {time.Unix(0, 70).Local(), uint64(math.MaxInt64 + 1)}, + }) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + reader := msgp.NewReader(w.Body) + var buf bytes.Buffer + if _, err := reader.WriteToJSON(&buf); err != nil { + t.Fatalf("unexpected error: %s", err) + } + want := fmt.Sprintf(`{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":%s}]}]}`, string(values)) + if have := strings.TrimSpace(buf.String()); have != want { + t.Fatalf("unexpected output: %s != %s", have, want) + } +}