Skip to content

Commit

Permalink
Fix bugs in Avro implementation (#813)
Browse files Browse the repository at this point in the history
Co-authored-by: ti-srebot <[email protected]>
  • Loading branch information
liuzix and ti-srebot authored Aug 3, 2020
1 parent 9f55889 commit efd1df7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
20 changes: 16 additions & 4 deletions cdc/sink/codec/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type schemaCacheEntry struct {
}

type registerRequest struct {
Schema string `json:"schema"`
SchemaType string `json:"schemaType"`
Schema string `json:"schema"`
// Commented out for compatibility with Confluent 5.4.x
// SchemaType string `json:"schemaType"`
}

type registerResponse struct {
Expand Down Expand Up @@ -112,8 +113,9 @@ var regexRemoveSpaces = regexp.MustCompile(`\s`)
func (m *AvroSchemaManager) Register(ctx context.Context, tableName model.TableName, codec *goavro.Codec) error {
// The Schema Registry expects the JSON to be without newline characters
reqBody := registerRequest{
Schema: regexRemoveSpaces.ReplaceAllString(codec.Schema(), ""),
SchemaType: "AVRO",
Schema: regexRemoveSpaces.ReplaceAllString(codec.Schema(), ""),
// Commented out for compatibility with Confluent 5.4.x
// SchemaType: "AVRO",
}
payload, err := json.Marshal(&reqBody)
if err != nil {
Expand Down Expand Up @@ -277,15 +279,25 @@ func httpRetry(ctx context.Context, credential *security.Credential, r *http.Req
var (
err error
resp *http.Response
data []byte
)

expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxInterval = time.Second * 30
httpCli, err := httputil.NewClient(credential)

if r.Body != nil {
data, err = ioutil.ReadAll(r.Body)
_ = r.Body.Close()
}

if err != nil {
return nil, errors.Trace(err)
}
for {
if data != nil {
r.Body = ioutil.NopCloser(bytes.NewReader(data))
}
resp, err = httpCli.Do(r)

if err != nil {
Expand Down
28 changes: 27 additions & 1 deletion cdc/sink/codec/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package codec

import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
Expand Down Expand Up @@ -71,7 +72,7 @@ func startHTTPInterceptForTestingRegistry(c *check.C) {
return nil, err
}

c.Assert(reqData.SchemaType, check.Equals, "AVRO")
// c.Assert(reqData.SchemaType, check.Equals, "AVRO")

var respData registerResponse
registry.mu.Lock()
Expand Down Expand Up @@ -139,6 +140,18 @@ func startHTTPInterceptForTestingRegistry(c *check.C) {
return httpmock.NewStringResponse(200, ""), nil
})

failCounter := 0
httpmock.RegisterResponder("POST", `=~^http://127.0.0.1:8081/may-fail`,
func(req *http.Request) (*http.Response, error) {
data, _ := ioutil.ReadAll(req.Body)
c.Assert(len(data), check.Greater, 0)
c.Assert(int64(len(data)), check.Equals, req.ContentLength)
if failCounter < 3 {
failCounter++
return httpmock.NewStringResponse(422, ""), nil
}
return httpmock.NewStringResponse(200, ""), nil
})
}

func stopHTTPInterceptForTestingRegistry() {
Expand Down Expand Up @@ -274,3 +287,16 @@ func (s *AvroSchemaRegistrySuite) TestSchemaRegistryIdempotent(c *check.C) {
c.Assert(err, check.IsNil)
}
}

func (s *AvroSchemaRegistrySuite) TestHTTPRetry(c *check.C) {
payload := []byte("test")
req, err := http.NewRequest("POST", "http://127.0.0.1:8081/may-fail", bytes.NewReader(payload))
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

resp, err := httpRetry(ctx, nil, req, false)
c.Assert(err, check.IsNil)
_ = resp.Body.Close()
}

0 comments on commit efd1df7

Please sign in to comment.