Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message pack formatter #8897

Merged
merged 1 commit into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- [#8853](https://github.com/influxdata/influxdb/pull/8853): Reduce allocations, improve `readEntries` performance by simplifying loop
- [#8830](https://github.com/influxdata/influxdb/issues/8830): Separate importer log statements to stdout and stderr.
- [#8857](https://github.com/influxdata/influxdb/pull/8857): Improve performance of Bloom Filter in TSI index.
- [#8897](https://github.com/influxdata/influxdb/pull/8897): Add message pack format for query responses.

### Bugfixes

Expand Down
2 changes: 2 additions & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815
github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac
github.com/philhofer/fwd 1612a298117663d7bc9a760ae20d383413859798
github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d
github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce
github.com/tinylib/msgp ad0ff2e232ad2e37faf67087fb24bf8d04a8ce20
github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6
github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
Expand Down
2 changes: 2 additions & 0 deletions LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
- github.com/google/go-cmp [BSD LICENSE](https://github.com/google/go-cmp/blob/master/LICENSE)
- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt)
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
- github.com/philhofer/fwd [MIT LICENSE](https://github.com/philhofer/fwd/blob/master/LICENSE.md)
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
- github.com/tinylib/msgp [MIT LICENSE](https://github.com/tinylib/msgp/blob/master/LICENSE)
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE)
- github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt)
Expand Down
106 changes: 106 additions & 0 deletions services/httpd/response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/influxdb/models"
"github.com/tinylib/msgp/msgp"
)

// ResponseWriter is an interface for writing a response.
Expand All @@ -28,6 +29,9 @@ func NewResponseWriter(w http.ResponseWriter, r *http.Request) ResponseWriter {
case "application/csv", "text/csv":
w.Header().Add("Content-Type", "text/csv")
rw.formatter = &csvFormatter{statementID: -1, Writer: w}
case "application/x-msgpack":
w.Header().Add("Content-Type", "application/x-msgpack")
rw.formatter = &msgpackFormatter{Writer: w}
case "application/json":
fallthrough
default:
Expand Down Expand Up @@ -188,3 +192,105 @@ func (w *csvFormatter) WriteResponse(resp Response) (n int, err error) {
}
return n, nil
}

type msgpackFormatter struct {
io.Writer
}

func (f *msgpackFormatter) ContentType() string {
return "application/x-msgpack"
}

func (f *msgpackFormatter) WriteResponse(resp Response) (n int, err error) {
enc := msgp.NewWriter(f.Writer)
defer enc.Flush()

enc.WriteMapHeader(1)
if resp.Err != nil {
enc.WriteString("error")
enc.WriteString(err.Error())
return 0, nil
} else {
enc.WriteString("results")
enc.WriteArrayHeader(uint32(len(resp.Results)))
for _, result := range resp.Results {
if result.Err != nil {
enc.WriteMapHeader(1)
enc.WriteString("error")
enc.WriteString(result.Err.Error())
continue
}

sz := 2
if len(result.Messages) > 0 {
sz++
}
if result.Partial {
sz++
}
enc.WriteMapHeader(uint32(sz))
enc.WriteString("statement_id")
enc.WriteInt(result.StatementID)
if len(result.Messages) > 0 {
enc.WriteString("messages")
enc.WriteArrayHeader(uint32(len(result.Messages)))
for _, msg := range result.Messages {
enc.WriteMapHeader(2)
enc.WriteString("level")
enc.WriteString(msg.Level)
enc.WriteString("text")
enc.WriteString(msg.Text)
}
}
enc.WriteString("series")
enc.WriteArrayHeader(uint32(len(result.Series)))
for _, series := range result.Series {
sz := 2
if series.Name != "" {
sz++
}
if len(series.Tags) > 0 {
sz++
}
if series.Partial {
sz++
}
enc.WriteMapHeader(uint32(sz))
if series.Name != "" {
enc.WriteString("name")
enc.WriteString(series.Name)
}
if len(series.Tags) > 0 {
enc.WriteString("tags")
enc.WriteMapHeader(uint32(len(series.Tags)))
for k, v := range series.Tags {
enc.WriteString(k)
enc.WriteString(v)
}
}
enc.WriteString("columns")
enc.WriteArrayHeader(uint32(len(series.Columns)))
for _, col := range series.Columns {
enc.WriteString(col)
}
enc.WriteString("values")
enc.WriteArrayHeader(uint32(len(series.Values)))
for _, values := range series.Values {
enc.WriteArrayHeader(uint32(len(values)))
for _, v := range values {
enc.WriteIntf(v)
}
}
if series.Partial {
enc.WriteString("partial")
enc.WriteBool(series.Partial)
}
}
if result.Partial {
enc.WriteString("partial")
enc.WriteBool(true)
}
}
}
return 0, nil
}
67 changes: 67 additions & 0 deletions services/httpd/response_writer_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package httpd_test

import (
"bytes"
"encoding/json"
"fmt"
"math"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/httpd"
"github.com/tinylib/msgp/msgp"
)

func TestResponseWriter_CSV(t *testing.T) {
Expand Down Expand Up @@ -62,3 +67,65 @@ cpu,"host=server01,region=uswest",70,9223372036854775808
t.Errorf("unexpected output:\n\ngot=%v\nwant=%s", got, want)
}
}

func TestResponseWriter_MessagePack(t *testing.T) {
header := make(http.Header)
header.Set("Accept", "application/x-msgpack")
r := &http.Request{
Header: header,
URL: &url.URL{},
}
w := httptest.NewRecorder()

writer := httpd.NewResponseWriter(w, r)
writer.WriteResponse(httpd.Response{
Results: []*query.Result{
{
StatementID: 0,
Series: []*models.Row{
{
Name: "cpu",
Tags: map[string]string{
"host": "server01",
},
Columns: []string{"time", "value"},
Values: [][]interface{}{
{time.Unix(0, 10), float64(2.5)},
{time.Unix(0, 20), int64(5)},
{time.Unix(0, 30), nil},
{time.Unix(0, 40), "foobar"},
{time.Unix(0, 50), true},
{time.Unix(0, 60), false},
{time.Unix(0, 70), uint64(math.MaxInt64 + 1)},
},
},
},
},
},
})

// The reader always reads times as time.Local so encode the expected response
// as JSON and insert it into the expected values.
values, err := json.Marshal([][]interface{}{
{time.Unix(0, 10).Local(), float64(2.5)},
{time.Unix(0, 20).Local(), int64(5)},
{time.Unix(0, 30).Local(), nil},
{time.Unix(0, 40).Local(), "foobar"},
{time.Unix(0, 50).Local(), true},
{time.Unix(0, 60).Local(), false},
{time.Unix(0, 70).Local(), uint64(math.MaxInt64 + 1)},
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

reader := msgp.NewReader(w.Body)
var buf bytes.Buffer
if _, err := reader.WriteToJSON(&buf); err != nil {
t.Fatalf("unexpected error: %s", err)
}
want := fmt.Sprintf(`{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":%s}]}]}`, string(values))
if have := strings.TrimSpace(buf.String()); have != want {
t.Fatalf("unexpected output: %s != %s", have, want)
}
}