diff --git a/cdc/sink/codec/schema_registry.go b/cdc/sink/codec/schema_registry.go index 305db1c62f7..b5b302c4dc3 100644 --- a/cdc/sink/codec/schema_registry.go +++ b/cdc/sink/codec/schema_registry.go @@ -52,8 +52,9 @@ type schemaCacheEntry struct { } type registerRequest struct { - Schema string `json:"schema"` - SchemaType string `json:"schemaType"` + Schema string `json:"schema"` + // Commented out for compatibility with Confluent 5.4.x + // SchemaType string `json:"schemaType"` } type registerResponse struct { @@ -112,8 +113,9 @@ var regexRemoveSpaces = regexp.MustCompile(`\s`) func (m *AvroSchemaManager) Register(ctx context.Context, tableName model.TableName, codec *goavro.Codec) error { // The Schema Registry expects the JSON to be without newline characters reqBody := registerRequest{ - Schema: regexRemoveSpaces.ReplaceAllString(codec.Schema(), ""), - SchemaType: "AVRO", + Schema: regexRemoveSpaces.ReplaceAllString(codec.Schema(), ""), + // Commented out for compatibility with Confluent 5.4.x + // SchemaType: "AVRO", } payload, err := json.Marshal(&reqBody) if err != nil { @@ -277,15 +279,25 @@ func httpRetry(ctx context.Context, credential *security.Credential, r *http.Req var ( err error resp *http.Response + data []byte ) expBackoff := backoff.NewExponentialBackOff() expBackoff.MaxInterval = time.Second * 30 httpCli, err := httputil.NewClient(credential) + + if r.Body != nil { + data, err = ioutil.ReadAll(r.Body) + _ = r.Body.Close() + } + if err != nil { return nil, errors.Trace(err) } for { + if data != nil { + r.Body = ioutil.NopCloser(bytes.NewReader(data)) + } resp, err = httpCli.Do(r) if err != nil { diff --git a/cdc/sink/codec/schema_registry_test.go b/cdc/sink/codec/schema_registry_test.go index e9018586571..994a2d5d25f 100644 --- a/cdc/sink/codec/schema_registry_test.go +++ b/cdc/sink/codec/schema_registry_test.go @@ -14,6 +14,7 @@ package codec import ( + "bytes" "context" "encoding/json" "io/ioutil" @@ -71,7 +72,7 @@ func startHTTPInterceptForTestingRegistry(c *check.C) { return nil, err } - c.Assert(reqData.SchemaType, check.Equals, "AVRO") + // c.Assert(reqData.SchemaType, check.Equals, "AVRO") var respData registerResponse registry.mu.Lock() @@ -139,6 +140,18 @@ func startHTTPInterceptForTestingRegistry(c *check.C) { return httpmock.NewStringResponse(200, ""), nil }) + failCounter := 0 + httpmock.RegisterResponder("POST", `=~^http://127.0.0.1:8081/may-fail`, + func(req *http.Request) (*http.Response, error) { + data, _ := ioutil.ReadAll(req.Body) + c.Assert(len(data), check.Greater, 0) + c.Assert(int64(len(data)), check.Equals, req.ContentLength) + if failCounter < 3 { + failCounter++ + return httpmock.NewStringResponse(422, ""), nil + } + return httpmock.NewStringResponse(200, ""), nil + }) } func stopHTTPInterceptForTestingRegistry() { @@ -274,3 +287,16 @@ func (s *AvroSchemaRegistrySuite) TestSchemaRegistryIdempotent(c *check.C) { c.Assert(err, check.IsNil) } } + +func (s *AvroSchemaRegistrySuite) TestHTTPRetry(c *check.C) { + payload := []byte("test") + req, err := http.NewRequest("POST", "http://127.0.0.1:8081/may-fail", bytes.NewReader(payload)) + c.Assert(err, check.IsNil) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + resp, err := httpRetry(ctx, nil, req, false) + c.Assert(err, check.IsNil) + _ = resp.Body.Close() +}