diff --git a/pkg/processor/procbuiltin/decodewithschema.go b/pkg/processor/procbuiltin/decodewithschema.go index 9aee21126..b5b07f932 100644 --- a/pkg/processor/procbuiltin/decodewithschema.go +++ b/pkg/processor/procbuiltin/decodewithschema.go @@ -48,8 +48,35 @@ func init() { processor.GlobalBuilderRegistry.MustRegister(decodeWithSchemaPayloadProcType, DecodeWithSchemaPayload) } -// DecodeWithSchemaKey builds the following processor: -// TODO +// DecodeWithSchemaKey builds a processor with the following config fields: +// - `url` (Required) - URL of the schema registry (e.g. http://localhost:8085) +// - `auth.basic.username` (Optional) - Configures the username to use with +// basic authentication. This option is required if `auth.basic.password` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `auth.basic.password` (Optional) - Configures the password to use with +// basic authentication. This option is required if `auth.basic.username` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA +// certificates. If this option is empty, Conduit falls back to using the +// host's root CA set. +// - `tls.client.cert` (Optional) - Path to a file containing a PEM encoded +// certificate. This option is required if `tls.client.key` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// - `tls.client.key` (Optional) - Path to a file containing a PEM encoded +// private key. This option is required if `tls.client.cert` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// +// The processor takes raw data (bytes) and decodes it from the Confluent wire +// format into structured data. It extracts the schema ID from the data, +// downloads the associated schema from the schema registry and decodes the +// payload. The schema is cached locally after it's first downloaded. Currently, +// the processor only supports the Avro format. If the processor encounters +// structured data or the data can't be decoded it returns an error. +// +// More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format +// More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html func DecodeWithSchemaKey(config processor.Config) (processor.Interface, error) { return decodeWithSchema(decodeWithSchemaKeyProcType, recordKeyGetSetter{}, config) } diff --git a/pkg/processor/procbuiltin/encodewithschema.go b/pkg/processor/procbuiltin/encodewithschema.go new file mode 100644 index 000000000..01de3910f --- /dev/null +++ b/pkg/processor/procbuiltin/encodewithschema.go @@ -0,0 +1,239 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package procbuiltin + +import ( + "context" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/processor" + "github.com/conduitio/conduit/pkg/processor/schemaregistry" + "github.com/conduitio/conduit/pkg/record" + "github.com/lovromazgon/franz-go/pkg/sr" + "github.com/rs/zerolog" +) + +const ( + encodeWithSchemaKeyProcType = "encodewithschemakey" + encodeWithSchemaPayloadProcType = "encodewithschemapayload" + + encodeWithSchemaStrategy = "schema.strategy" + encodeWithSchemaPreRegisteredSubject = "schema.preRegistered.subject" + encodeWithSchemaPreRegisteredVersion = "schema.preRegistered.version" + encodeWithSchemaAutoRegisterSubject = "schema.autoRegister.subject" + encodeWithSchemaAutoRegisterFormat = "schema.autoRegister.format" + + encodeWithSchemaStrategyPreRegistered = "preRegistered" + encodeWithSchemaStrategyAutoRegister = "autoRegister" +) + +func init() { + processor.GlobalBuilderRegistry.MustRegister(encodeWithSchemaKeyProcType, EncodeWithSchemaKey) + processor.GlobalBuilderRegistry.MustRegister(encodeWithSchemaPayloadProcType, EncodeWithSchemaPayload) +} + +// EncodeWithSchemaKey builds a processor with the following config fields: +// - `url` (Required) - URL of the schema registry (e.g. http://localhost:8085) +// - `schema.strategy` (Required, Enum: `preRegistered`,`autoRegister`) - Specifies +// which strategy to use to determine the schema for the record. Available +// strategies: +// - `preRegistered` (recommended) - Download an existing schema from the schema +// registry. This strategy is further configured with options starting +// with `schema.preRegistered.*`. +// - `autoRegister` (for development purposes) - Infer the schema from the +// record and register it in the schema registry. This strategy is further +// configured with options starting with `schema.autoRegister.*`. +// - `schema.preRegistered.subject` (Required if `schema.strategy` = `preRegistered`) - +// Specifies the subject of the schema in the schema registry used to encode +// the record. +// - `schema.preRegistered.version` (Required if `schema.strategy` = `preRegistered`) - +// Specifies the version of the schema in the schema registry used to encode +// the record. +// - `schema.autoRegister.subject` (Required if `schema.strategy` = `autoRegister`) - +// Specifies the subject name under which the inferred schema will be +// registered in the schema registry. +// - `schema.autoRegister.format` (Required if `schema.strategy` = `autoRegister`, Enum: `avro`) - +// Specifies the schema format that should be inferred. Currently the only +// supported format is `avro`. +// - `auth.basic.username` (Optional) - Configures the username to use with +// basic authentication. This option is required if `auth.basic.password` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `auth.basic.password` (Optional) - Configures the password to use with +// basic authentication. This option is required if `auth.basic.username` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA +// certificates. If this option is empty, Conduit falls back to using the +// host's root CA set. +// - `tls.client.cert` (Optional) - Path to a file containing a PEM encoded +// certificate. This option is required if `tls.client.key` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// - `tls.client.key` (Optional) - Path to a file containing a PEM encoded +// private key. This option is required if `tls.client.cert` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// +// The processor takes structured data and encodes it using a schema into the +// Confluent wire format. It provides two strategies for determining the +// schema: +// +// - `preRegistered` (recommended) +// +// This strategy downloads an existing schema from the schema registry and +// uses it to encode the record. This requires the schema to already be +// registered in the schema registry. The schema is downloaded only once and +// cached locally. +// +// - `autoRegister` (for development purposes) +// This strategy infers the schema by inspecting the structured data and +// registers it in the schema registry. If the record schema is known in +// advance it's recommended to use the `preRegistered` strategy and manually +// register the schema, as this strategy comes with limitations. +// +// The strategy uses reflection to traverse the structured data of each +// record and determine the type of each field. If a specific field is set +// to `nil` the processor won't have enough information to determine the +// type and will default to a nullable string. Because of this it is not +// guaranteed that two records with the same structure produce the same +// schema or even a backwards compatible schema. The processor registers +// each inferred schema in the schema registry with the same subject, +// therefore the schema compatibility checks need to be disabled for this +// schema to prevent failures. If the schema subject does not exist before +// running this processor, it will automatically set the correct +// compatibility settings the first time it registers the schema. +// +// The processor currently only supports the Avro format. +// +// More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format +// More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html +func EncodeWithSchemaKey(config processor.Config) (processor.Interface, error) { + return encodeWithSchema(encodeWithSchemaKeyProcType, recordKeyGetSetter{}, config) +} + +// EncodeWithSchemaPayload builds the same processor as EncodeWithSchemaKey, +// except that it operates on the field Record.Payload.After. +func EncodeWithSchemaPayload(config processor.Config) (processor.Interface, error) { + return encodeWithSchema(encodeWithSchemaPayloadProcType, recordPayloadGetSetter{}, config) +} + +type encodeWithSchemaConfig struct { + schemaRegistryConfig + strategy schemaregistry.SchemaStrategy +} + +func (c *encodeWithSchemaConfig) Parse(cfg processor.Config) error { + if err := c.schemaRegistryConfig.Parse(cfg); err != nil { + return err + } + return c.parseSchemaStrategy(cfg) +} + +func (c *encodeWithSchemaConfig) parseSchemaStrategy(cfg processor.Config) error { + strategy, err := getConfigFieldString(cfg, encodeWithSchemaStrategy) + if err != nil { + return err + } + + switch strategy { + case encodeWithSchemaStrategyPreRegistered: + return c.parseSchemaStrategyPreRegistered(cfg) + case encodeWithSchemaStrategyAutoRegister: + return c.parseSchemaStrategyAutoRegister(cfg) + default: + return cerrors.Errorf("failed to parse %q: unknown schema strategy %q", encodeWithSchemaStrategy, strategy) + } +} + +func (c *encodeWithSchemaConfig) parseSchemaStrategyPreRegistered(cfg processor.Config) error { + subject, err := getConfigFieldString(cfg, encodeWithSchemaPreRegisteredSubject) + if err != nil { + return err + } + // TODO allow version to be set to "latest" + version, err := getConfigFieldInt64(cfg, encodeWithSchemaPreRegisteredVersion) + if err != nil { + return err + } + c.strategy = schemaregistry.DownloadSchemaStrategy{ + Subject: subject, + Version: int(version), + } + return nil +} + +func (c *encodeWithSchemaConfig) parseSchemaStrategyAutoRegister(cfg processor.Config) error { + subject, err := getConfigFieldString(cfg, encodeWithSchemaAutoRegisterSubject) + if err != nil { + return err + } + format, err := getConfigFieldString(cfg, encodeWithSchemaAutoRegisterFormat) + if err != nil { + return err + } + var schemaType sr.SchemaType + err = schemaType.UnmarshalText([]byte(format)) + if err != nil { + return cerrors.Errorf("failed to parse %q: %w", encodeWithSchemaAutoRegisterSubject, err) + } + c.strategy = schemaregistry.ExtractAndUploadSchemaStrategy{ + Type: schemaType, + Subject: subject, + } + return nil +} + +func (c *encodeWithSchemaConfig) SchemaStrategy() schemaregistry.SchemaStrategy { + return c.strategy +} + +func encodeWithSchema( + processorType string, + getSetter recordDataGetSetter, + config processor.Config, +) (processor.Interface, error) { + var c encodeWithSchemaConfig + err := c.Parse(config) + if err != nil { + return nil, cerrors.Errorf("%s: %w", processorType, err) + } + + // TODO get logger from config or some other place + logger := log.InitLogger(zerolog.InfoLevel, log.FormatCLI) + + client, err := schemaregistry.NewClient(logger, c.ClientOptions()...) + if err != nil { + return nil, cerrors.Errorf("%s: could not create schema registry client: %w", processorType, err) + } + encoder := schemaregistry.NewEncoder(client, logger, &sr.Serde{}, c.SchemaStrategy()) + + return NewFuncWrapper(func(ctx context.Context, r record.Record) (record.Record, error) { + data := getSetter.Get(r) + + switch d := data.(type) { + case record.RawData: + return record.Record{}, cerrors.Errorf("%s: raw data not supported (hint: if your records carry JSON data you can parse them into structured data with the processor `parsejsonpayload`)", processorType) + case record.StructuredData: + rd, err := encoder.Encode(ctx, d) + if err != nil { + return record.Record{}, cerrors.Errorf("%s: %w:", processorType, err) + } + r = getSetter.Set(r, rd) + return r, nil + default: + return record.Record{}, cerrors.Errorf("%s: unexpected data type %T", processorType, data) + } + }), nil +}