Skip to content

Commit

Permalink
Merge pull request redpanda-data#20820 from r-vasquez/jsonschema-serde
Browse files Browse the repository at this point in the history
rpk: add support to produce/consume with jsonschema
  • Loading branch information
r-vasquez authored Jul 3, 2024
2 parents 7685b6d + 86078b4 commit 413d0c7
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/prometheus/common v0.53.0
github.com/rs/xid v1.5.0
github.com/safchain/ethtool v0.3.0
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
github.com/schollz/progressbar/v3 v3.14.2
github.com/sethgrid/pester v1.2.0
github.com/spf13/afero v1.11.0
Expand Down
4 changes: 4 additions & 0 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnN
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/docker/docker v26.1.1+incompatible h1:oI+4kkAgIwwb54b9OC7Xc3hSgu1RlJA/Lln/DF72djQ=
github.com/docker/docker v26.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
Expand Down Expand Up @@ -219,6 +221,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/safchain/ethtool v0.3.0 h1:gimQJpsI6sc1yIqP/y8GYgiXn/NjgvpM0RNoWLVVmP0=
github.com/safchain/ethtool v0.3.0/go.mod h1:SA9BwrgyAqNo7M+uaL6IYbxpm5wk3L7Mm6ocLW+CJUs=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
github.com/schollz/progressbar/v3 v3.14.2 h1:EducH6uNLIWsr560zSV1KrTeUb/wZGAHqyMFIEa99ks=
github.com/schollz/progressbar/v3 v3.14.2/go.mod h1:aQAZQnhF4JGFtRJiw/eobaXpsqpVQAftEQ+hLGXaRc4=
github.com/sethgrid/pester v1.2.0 h1:adC9RS29rRUef3rIKWPOuP1Jm3/MmB6ke+OhE5giENI=
Expand Down
8 changes: 4 additions & 4 deletions src/go/rpk/pkg/serde/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newAvroDecoder(codec *goavro.Codec) (serdeFunc, error) {
func generateAvroCodec(ctx context.Context, cl *sr.Client, schema *sr.Schema) (*goavro.Codec, error) {
schemaStr := schema.Schema
if len(schema.References) > 0 {
err := parseReferences(ctx, cl, schema)
err := parseAvroReferences(ctx, cl, schema)
if err != nil {
return nil, fmt.Errorf("unable to parse references: %v", err)
}
Expand All @@ -82,10 +82,10 @@ func generateAvroCodec(ctx context.Context, cl *sr.Client, schema *sr.Schema) (*
return codec, nil
}

// parseReferences uses hamba/avro Parse method to parse every reference. We
// parseAvroReferences uses hamba/avro Parse method to parse every reference. We
// don't need to store the references since the library already cache these
// schemas and use it later for handling references in the parent schema.
func parseReferences(ctx context.Context, cl *sr.Client, schema *sr.Schema) error {
func parseAvroReferences(ctx context.Context, cl *sr.Client, schema *sr.Schema) error {
if len(schema.References) == 0 {
_, err := avro.Parse(schema.Schema)
if err != nil {
Expand All @@ -99,7 +99,7 @@ func parseReferences(ctx context.Context, cl *sr.Client, schema *sr.Schema) erro
return err
}
refSchema := r.Schema
err = parseReferences(ctx, cl, &refSchema)
err = parseAvroReferences(ctx, cl, &refSchema)
if err != nil {
return fmt.Errorf("unable to parse schema with subject %q and version %v: %v", ref.Subject, ref.Version, err)
}
Expand Down
1 change: 1 addition & 0 deletions src/go/rpk/pkg/serde/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func Test_encodeDecodeAvroRecordNoReferences(t *testing.T) {
noopCl, _ := sr.NewClient()
schema := sr.Schema{
Schema: tt.schema,
Type: sr.TypeAvro,
}
serde, err := NewSerde(context.Background(), noopCl, &schema, tt.schemaID, "")
if tt.expErr {
Expand Down
100 changes: 100 additions & 0 deletions src/go/rpk/pkg/serde/jsonschema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package serde

import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"

"github.com/santhosh-tekuri/jsonschema/v6"
"github.com/twmb/franz-go/pkg/sr"
)

func compileJsonschema(ctx context.Context, cl *sr.Client, schema *sr.Schema) (*jsonschema.Schema, error) {
c := jsonschema.NewCompiler()
err := parseJsonschemaReferences(ctx, cl, schema, c)
if err != nil {
return nil, fmt.Errorf("unable to parse json schema references: %v", err)
}
sch, err := jsonschema.UnmarshalJSON(strings.NewReader(schema.Schema))
if err != nil {
return nil, fmt.Errorf("unable to unmarshal base schema: %v", err)
}
baseFileName := "redpanda_jsonschema.json"
err = c.AddResource(baseFileName, sch)
if err != nil {
return nil, fmt.Errorf("unable to add base json schema to the compiler resource: %v", err)
}
return c.Compile(baseFileName)
}

func newJsonschemaEncoder(schemaID int, jSchema *jsonschema.Schema) (serdeFunc, error) {
return func(record []byte) ([]byte, error) {
sch, err := jsonschema.UnmarshalJSON(bytes.NewReader(record))
if err != nil {
return nil, fmt.Errorf("unable to unmarshal record: %v", err)
}
err = jSchema.Validate(sch)
if err != nil {
return nil, fmt.Errorf("unable to validate json schema: %v", err)
}
// Append the magic byte + the schema ID bytes.
var serdeHeader sr.ConfluentHeader
h, err := serdeHeader.AppendEncode(nil, schemaID, nil)
if err != nil {
return nil, fmt.Errorf("unable to append header: %v", err)
}
return append(h, record...), nil
}, nil
}

func newJsonschemaDecoder(jSchema *jsonschema.Schema) (serdeFunc, error) {
return func(record []byte) ([]byte, error) {
var recordDec any
err := json.Unmarshal(record, &recordDec)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal record: %v", err)
}
err = jSchema.Validate(recordDec)
if err != nil {
return nil, fmt.Errorf("unable to validate record: %v", err)
}
return record, nil
}, nil
}

func parseJsonschemaReferences(ctx context.Context, cl *sr.Client, schema *sr.Schema, compiler *jsonschema.Compiler) error {
for _, ref := range schema.References {
r, err := cl.SchemaByVersion(ctx, ref.Subject, ref.Version)
if err != nil {
return fmt.Errorf("unable to get schema with subject %q and version %v: %v", ref.Subject, ref.Version, err)
}
refSchema := r.Schema

sch, err := jsonschema.UnmarshalJSON(strings.NewReader(refSchema.Schema))
if err != nil {
return fmt.Errorf("unable to unmarshal json schema %q: %v", ref.Name, err)
}

err = compiler.AddResource(ref.Name, sch)
if err != nil {
return fmt.Errorf("unable to add schema %v: %v", ref.Name, err)
}

err = parseJsonschemaReferences(ctx, cl, &refSchema, compiler)
if err != nil {
return fmt.Errorf("unable to parse schema with subject %q and version %v: %v", ref.Subject, ref.Version, err)
}
}
return nil
}
Loading

0 comments on commit 413d0c7

Please sign in to comment.