Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Clone record without reflection #1247

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ require (
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.5.2
github.com/jackc/pgx/v5 v5.4.3
github.com/jinzhu/copier v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a
github.com/matryer/is v1.4.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,6 @@ github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0
github.com/jdx/go-netrc v1.0.0 h1:QbLMLyCZGj0NA8glAhxUpf1zDg6cxnWgMBbjq40W0gQ=
github.com/jdx/go-netrc v1.0.0/go.mod h1:Gh9eFQJnoTNIRHXl2j5bJXA1u84hQWJWgGh569zF3v8=
github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
3 changes: 1 addition & 2 deletions pkg/processor/procjs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit/pkg/foundation/cchan"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/processor"
Expand Down Expand Up @@ -414,7 +413,7 @@ func TestJSProcessor_Inspect(t *testing.T) {
Position: record.Position("test-pos"),
Operation: record.OperationUpdate,
Metadata: record.Metadata{"test": "true"},
Key: sdk.RawData("test-key"),
Key: record.RawData{Raw: []byte("test-key")},
Payload: record.Change{},
}
recOut, err := underTest.Process(ctx, recIn)
Expand Down
74 changes: 53 additions & 21 deletions pkg/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package record

import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
Expand All @@ -25,7 +26,6 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/record/schema"
"github.com/jinzhu/copier"
)

const (
Expand Down Expand Up @@ -149,27 +149,39 @@ func (r Record) mapData(d Data) interface{} {
}

func (r Record) Clone() Record {
clone := Record{}
// todo copier uses reflection under the hood
// we should optimize it, because Clone() is on a hot path.
// https://github.com/ConduitIO/conduit/issues/885
err := copier.CopyWithOption(
&clone,
&r,
copier.Option{DeepCopy: true, IgnoreEmpty: true},
var (
metadata map[string]string
key Data
payloadBefore Data
payloadAfter Data
)
if err != nil {
// At the moment, a clone error cannot happen because of the input data
// (i.e. the record itself).
// It can only happen if the clone destination is not addressable
// or if reflect.ValueOf(&r) is invalid.
// The first should never happen, because &Record{} is always addressable.
// The second, reflect.ValueOf(&r) is invalid if &r is nil, which in our case
// is also not possible.
// Hence, if the copier returns an error, it's a bug related to how we use copier
// which would cause all pipelines to fail anyway, so we panic here.
// This also makes the method signature simpler.
panic(cerrors.Errorf("record clone error: %w", err))

if r.Metadata != nil {
metadata = make(map[string]string, len(r.Metadata))
for k, v := range r.Metadata {
metadata[k] = v
}
}

if r.Key != nil {
key = r.Key.Clone()
}
if r.Payload.Before != nil {
payloadBefore = r.Payload.Before.Clone()
}
if r.Payload.After != nil {
payloadAfter = r.Payload.After.Clone()
}

clone := Record{
Position: bytes.Clone(r.Position),
Operation: r.Operation,
Metadata: metadata,
Key: key,
Payload: Change{
Before: payloadBefore,
After: payloadAfter,
},
}
return clone
}
Expand Down Expand Up @@ -204,6 +216,7 @@ func (p Position) String() string {
// Data are RawData and StructuredData.
type Data interface {
Bytes() []byte
Clone() Data
}

// StructuredData contains data in form of a map with string keys and arbitrary
Expand All @@ -220,6 +233,18 @@ func (d StructuredData) Bytes() []byte {
return b
}

func (d StructuredData) Clone() Data {
cloned := make(map[string]any, len(d))
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range d {
if vmap, ok := v.(map[string]any); ok {
cloned[k] = StructuredData(vmap).Clone()
} else {
cloned[k] = v
}
}
return StructuredData(cloned)
}

// RawData contains unstructured data in form of a byte slice.
type RawData struct {
Raw []byte
Expand All @@ -239,3 +264,10 @@ func (d *RawData) UnmarshalText() ([]byte, error) {
func (d RawData) Bytes() []byte {
return d.Raw
}

func (d RawData) Clone() Data {
return RawData{
Raw: bytes.Clone(d.Raw),
Schema: d.Schema, // this field is currently unused, we don't care about cloning it atm
}
}
Loading