Skip to content

Commit

Permalink
new: support custom encoding (#77)
Browse files Browse the repository at this point in the history
* new: support for arbitrary wire encoding

* working checkpoint

* use elemental.Encoding

* fixed: test
  • Loading branch information
primalmotion authored Apr 16, 2019
1 parent 63fafb4 commit 319f152
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 36 deletions.
16 changes: 13 additions & 3 deletions internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package push
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -40,6 +39,8 @@ type subscription struct {
currentTokenLock sync.RWMutex
unregisterTokenNotifier func(string)
registerTokenNotifier func(string, func(string))
readEncoding elemental.EncodingType
writeEncoding elemental.EncodingType
}

// NewSubscriber creates a new Subscription.
Expand All @@ -50,9 +51,15 @@ func NewSubscriber(
registerTokenNotifier func(string, func(string)),
unregisterTokenNotifier func(string),
tlsConfig *tls.Config,
headers http.Header,
recursive bool,
) manipulate.Subscriber {

readEncoding, writeEncoding, err := elemental.EncodingFromHeaders(headers)
if err != nil {
panic(err)
}

return &subscription{
id: uuid.Must(uuid.NewV4()).String(),
url: url,
Expand All @@ -67,12 +74,15 @@ func NewSubscriber(
status: make(chan manipulate.SubscriberStatus, statusChSize),
filters: make(chan *elemental.PushFilter, filterChSize),
currentFilterLock: sync.RWMutex{},
readEncoding: readEncoding,
writeEncoding: writeEncoding,
config: wsc.Config{
PongWait: 10 * time.Second,
WriteWait: 10 * time.Second,
PingPeriod: 5 * time.Second,
ReadChanSize: 2048,
TLSConfig: tlsConfig,
Headers: headers,
},
}
}
Expand Down Expand Up @@ -181,7 +191,7 @@ func (s *subscription) listen(ctx context.Context) {

case filter := <-s.filters:

filterData, err = json.Marshal(filter)
filterData, err = elemental.Encode(s.writeEncoding, filter)
if err != nil {
s.publishError(err)
continue
Expand All @@ -192,7 +202,7 @@ func (s *subscription) listen(ctx context.Context) {
case data := <-s.conn.Read():

event := &elemental.Event{}
if err = json.Unmarshal(data, event); err != nil {
if err = elemental.Decode(s.readEncoding, data, event); err != nil {
s.publishError(err)
continue
}
Expand Down
24 changes: 13 additions & 11 deletions maniphttp/manipulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -44,6 +43,7 @@ type httpManipulator struct {
tokenManager manipulate.TokenManager
globalHeaders http.Header
transport *http.Transport
encoding elemental.EncodingType
}

// New returns a maniphttp.Manipulator configured according to the given suite of Option.
Expand All @@ -60,6 +60,7 @@ func New(ctx context.Context, url string, options ...Option) (manipulate.Manipul
renewNotifiers: map[string]func(string){},
ctx: ctx,
url: url,
encoding: elemental.EncodingTypeJSON,
}

// Apply the options.
Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *httpManipulator) RetrieveMany(mctx manipulate.Context, dest elemental.I

if response.StatusCode != http.StatusNoContent {
defer response.Body.Close() // nolint: errcheck
if err := decodeData(response, dest); err != nil {
if err := decodeData(response, s.encoding, dest); err != nil {
sp.SetTag("error", true)
sp.LogFields(log.Error(err))
return err
Expand Down Expand Up @@ -187,7 +188,7 @@ func (s *httpManipulator) Retrieve(mctx manipulate.Context, object elemental.Ide

if response.StatusCode != http.StatusNoContent {
defer response.Body.Close() // nolint: errcheck
if err := decodeData(response, &object); err != nil {
if err := decodeData(response, s.encoding, object); err != nil {
sp.SetTag("error", true)
sp.LogFields(log.Error(err))
return err
Expand Down Expand Up @@ -224,7 +225,7 @@ func (s *httpManipulator) Create(mctx manipulate.Context, object elemental.Ident
return manipulate.NewErrCannotBuildQuery(err.Error())
}

data, err := json.Marshal(object)
data, err := elemental.Encode(s.encoding, object)
if err != nil {
sp.SetTag("error", true)
sp.LogFields(log.Error(err))
Expand All @@ -240,7 +241,7 @@ func (s *httpManipulator) Create(mctx manipulate.Context, object elemental.Ident

if response.StatusCode != http.StatusNoContent {
defer response.Body.Close() // nolint: errcheck
if err := decodeData(response, &object); err != nil {
if err := decodeData(response, s.encoding, object); err != nil {
sp.SetTag("error", true)
sp.LogFields(log.Error(err))
return err
Expand Down Expand Up @@ -286,7 +287,7 @@ func (s *httpManipulator) Update(mctx manipulate.Context, object elemental.Ident
return manipulate.NewErrCannotBuildQuery(err.Error())
}

data, err := json.Marshal(object)
data, err := elemental.Encode(s.encoding, object)
if err != nil {
sp.SetTag("error", true)
sp.LogFields(log.Error(err))
Expand All @@ -302,7 +303,7 @@ func (s *httpManipulator) Update(mctx manipulate.Context, object elemental.Ident

if response.StatusCode != http.StatusNoContent {
defer response.Body.Close() // nolint: errcheck
if err := decodeData(response, &object); err != nil {
if err := decodeData(response, s.encoding, object); err != nil {
sp.SetTag("error", true)
sp.LogFields(log.Error(err))
return err
Expand Down Expand Up @@ -347,7 +348,7 @@ func (s *httpManipulator) Delete(mctx manipulate.Context, object elemental.Ident

if response.StatusCode != http.StatusNoContent {
defer response.Body.Close() // nolint: errcheck
if err := decodeData(response, &object); err != nil {
if err := decodeData(response, s.encoding, object); err != nil {
sp.SetTag("error", true)
sp.LogFields(log.Error(err))
return err
Expand Down Expand Up @@ -407,7 +408,8 @@ func (s *httpManipulator) prepareHeaders(request *http.Request, mctx manipulate.
request.Header[k] = v
}

request.Header.Set("Content-Type", "application/json; charset=UTF-8")
request.Header.Set("Content-Type", string(s.encoding))
request.Header.Set("Accept", string(s.encoding))
request.Header.Set("Accept-Encoding", "gzip")

if ns != "" {
Expand Down Expand Up @@ -585,10 +587,10 @@ func (s *httpManipulator) send(mctx manipulate.Context, method string, requrl st

if response.StatusCode < 200 || response.StatusCode >= 300 {

es := []elemental.Error{}
es := elemental.Errors{}

defer response.Body.Close() // nolint: errcheck
if err := decodeData(response, &es); err != nil {
if err := decodeData(response, s.encoding, &es); err != nil {
return nil, err
}

Expand Down
21 changes: 10 additions & 11 deletions maniphttp/manipulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"testing"
"time"

"go.aporeto.io/manipulate/internal/idempotency"

. "github.com/smartystreets/goconvey/convey"
"go.aporeto.io/elemental"
testmodel "go.aporeto.io/elemental/test/model"
"go.aporeto.io/manipulate"
"go.aporeto.io/manipulate/internal/idempotency"
"go.aporeto.io/manipulate/internal/tracing"
"go.aporeto.io/manipulate/maniptest"
)
Expand Down Expand Up @@ -54,9 +53,6 @@ func TestHTTP_NewSHTTPm(t *testing.T) {
})
}

/*
Privates
*/
func TestHTTP_makeAuthorizationHeaders(t *testing.T) {

Convey("Given I create a new HTTP manipulator", t, func() {
Expand Down Expand Up @@ -399,7 +395,7 @@ func TestHTTP_Retrieve(t *testing.T) {
Convey("Then error should not be nil", func() {
So(err, ShouldNotBeNil)
So(err.(elemental.Errors).Code(), ShouldEqual, 422)
So(err.(elemental.Errors)[0].(elemental.Error).Description, ShouldEqual, "nope.")
So(err.(elemental.Errors)[0].Description, ShouldEqual, "nope.")
})
})
})
Expand Down Expand Up @@ -548,7 +544,7 @@ func TestHTTP_Delete(t *testing.T) {

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `[{"ID": "yyy"}]`)
w.Write([]byte(`{"ID":"yyy"}`)) // nolint
}))
defer ts.Close()

Expand All @@ -559,10 +555,13 @@ func TestHTTP_Delete(t *testing.T) {
list := testmodel.NewList()
list.ID = "xxx"

_ = m.Delete(nil, list)
err := m.Delete(nil, list)
if err != nil {
panic(err)
}

Convey("Then ID should 'xxx'", func() {
So(list.Identifier(), ShouldEqual, "xxx")
Convey("Then ID should 'yyy'", func() {
So(list.Identifier(), ShouldEqual, "yyy")
})
})

Expand Down Expand Up @@ -1212,7 +1211,7 @@ func TestHTTP_send(t *testing.T) {
Convey("Then err should not be nil", func() {
So(err, ShouldNotBeNil)
So(err, ShouldHaveSameTypeAs, manipulate.ErrCannotUnmarshal{})
So(err.Error(), ShouldEqual, `Unable to unmarshal data: invalid character '\n' in string literal. original data:
So(err.Error(), ShouldEqual, `Unable to unmarshal data: unable to decode application/json: EOF. original data:
[{"code": 423, "]
`)
})
Expand Down
8 changes: 8 additions & 0 deletions maniphttp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"net/http"

"go.aporeto.io/elemental"
"go.aporeto.io/manipulate"
)

Expand Down Expand Up @@ -88,3 +89,10 @@ func OptionDisableBuiltInRetry() Option {
m.disableAutoRetry = true
}
}

// OptionEncoding sets the encoding/decoding type to use.
func OptionEncoding(enc elemental.EncodingType) Option {
return func(m *httpManipulator) {
m.encoding = enc
}
}
7 changes: 7 additions & 0 deletions maniphttp/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

. "github.com/smartystreets/goconvey/convey"
"go.aporeto.io/elemental"
)

type testTokenManager struct{}
Expand Down Expand Up @@ -76,4 +77,10 @@ func TestManipHttp_Optionions(t *testing.T) {
OptionDisableBuiltInRetry()(m)
So(m.disableAutoRetry, ShouldBeTrue)
})

Convey("Calling OptionEncoding should work", t, func() {
m := &httpManipulator{}
OptionEncoding(elemental.EncodingTypeMSGPACK)(m)
So(m.encoding, ShouldEqual, elemental.EncodingTypeMSGPACK)
})
}
5 changes: 5 additions & 0 deletions maniphttp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package maniphttp
import (
"crypto/tls"
"fmt"
"net/http"
"strings"

"go.aporeto.io/manipulate"
Expand Down Expand Up @@ -75,6 +76,10 @@ func NewSubscriber(manipulator manipulate.Manipulator, options ...SubscriberOpti
m.registerRenewNotifier,
m.unregisterRenewNotifier,
cfg.tlsConfig,
http.Header{
"Content-Type": []string{string(m.encoding)},
"Accept": []string{string(m.encoding)},
},
cfg.recursive,
)
}
Expand Down
7 changes: 4 additions & 3 deletions maniphttp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -16,6 +15,8 @@ import (
"sync"
"time"

"go.aporeto.io/elemental"

"go.aporeto.io/manipulate"
"go.aporeto.io/manipulate/maniphttp/internal/compiler"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ func addQueryParameters(req *http.Request, ctx manipulate.Context) error {
return nil
}

func decodeData(r *http.Response, dest interface{}) (err error) {
func decodeData(r *http.Response, encodingType elemental.EncodingType, dest interface{}) (err error) {

if r.Body == nil {
return manipulate.NewErrCannotUnmarshal("nil reader")
Expand Down Expand Up @@ -96,7 +97,7 @@ func decodeData(r *http.Response, dest interface{}) (err error) {
return manipulate.NewErrCannotUnmarshal(fmt.Sprintf("unable to read data: %s", err.Error()))
}

if err = json.Unmarshal(data, dest); err != nil {
if err = elemental.Decode(encodingType, data, dest); err != nil {
return manipulate.NewErrCannotUnmarshal(fmt.Sprintf("%s. original data:\n%s", err.Error(), string(data)))
}

Expand Down
Loading

0 comments on commit 319f152

Please sign in to comment.