diff --git a/pkg/processor/procbuiltin/decodewithschema.go b/pkg/processor/procbuiltin/decodewithschema.go index 9aee21126..b5b07f932 100644 --- a/pkg/processor/procbuiltin/decodewithschema.go +++ b/pkg/processor/procbuiltin/decodewithschema.go @@ -48,8 +48,35 @@ func init() { processor.GlobalBuilderRegistry.MustRegister(decodeWithSchemaPayloadProcType, DecodeWithSchemaPayload) } -// DecodeWithSchemaKey builds the following processor: -// TODO +// DecodeWithSchemaKey builds a processor with the following config fields: +// - `url` (Required) - URL of the schema registry (e.g. http://localhost:8085) +// - `auth.basic.username` (Optional) - Configures the username to use with +// basic authentication. This option is required if `auth.basic.password` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `auth.basic.password` (Optional) - Configures the password to use with +// basic authentication. This option is required if `auth.basic.username` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA +// certificates. If this option is empty, Conduit falls back to using the +// host's root CA set. +// - `tls.client.cert` (Optional) - Path to a file containing a PEM encoded +// certificate. This option is required if `tls.client.key` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// - `tls.client.key` (Optional) - Path to a file containing a PEM encoded +// private key. This option is required if `tls.client.cert` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// +// The processor takes raw data (bytes) and decodes it from the Confluent wire +// format into structured data. It extracts the schema ID from the data, +// downloads the associated schema from the schema registry and decodes the +// payload. The schema is cached locally after it's first downloaded. Currently, +// the processor only supports the Avro format. If the processor encounters +// structured data or the data can't be decoded it returns an error. +// +// More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format +// More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html func DecodeWithSchemaKey(config processor.Config) (processor.Interface, error) { return decodeWithSchema(decodeWithSchemaKeyProcType, recordKeyGetSetter{}, config) } diff --git a/pkg/processor/procbuiltin/encodewithschema.go b/pkg/processor/procbuiltin/encodewithschema.go index 0592af2cb..eed68b69a 100644 --- a/pkg/processor/procbuiltin/encodewithschema.go +++ b/pkg/processor/procbuiltin/encodewithschema.go @@ -45,8 +45,80 @@ func init() { processor.GlobalBuilderRegistry.MustRegister(encodeWithSchemaPayloadProcType, EncodeWithSchemaPayload) } -// EncodeWithSchemaKey builds the following processor: -// TODO +// EncodeWithSchemaKey builds a processor with the following config fields: +// - `url` (Required) - URL of the schema registry (e.g. http://localhost:8085) +// - `schema.strategy` (Required, Enum: `registry`,`autoRegister`) - Specifies +// which strategy to use to determine the schema for the record. Available +// strategies: +// - `registry` (recommended) - Download an existing schema from the schema +// registry. This strategy is further configured with options starting +// with `schema.registry.*`. +// - `autoRegister` (for development purposes) - Infer the schema from the +// record and register it in the schema registry. This strategy is further +// configured with options starting with `schema.autoRegister.*`. +// - `schema.registry.subject` (Required if `schema.strategy` = `registry`) - +// Specifies the subject of the schema in the schema registry used to encode +// the record. +// - `schema.registry.version` (Required if `schema.strategy` = `registry`) - +// Specifies the version of the schema in the schema registry used to encode +// the record. +// - `schema.autoRegister.subject` (Required if `schema.strategy` = `autoRegister`) - +// Specifies the subject name under which the inferred schema will be +// registered in the schema registry. +// - `schema.autoRegister.format` (Required if `schema.strategy` = `autoRegister`, Enum: `avro`) - +// Specifies the schema format that should be inferred. Currently the only +// supported format is `avro`. +// - `auth.basic.username` (Optional) - Configures the username to use with +// basic authentication. This option is required if `auth.basic.password` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `auth.basic.password` (Optional) - Configures the password to use with +// basic authentication. This option is required if `auth.basic.username` +// contains a value. If both `auth.basic.username` and `auth.basic.password` +// are empty basic authentication is disabled. +// - `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA +// certificates. If this option is empty, Conduit falls back to using the +// host's root CA set. +// - `tls.client.cert` (Optional) - Path to a file containing a PEM encoded +// certificate. This option is required if `tls.client.key` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// - `tls.client.key` (Optional) - Path to a file containing a PEM encoded +// private key. This option is required if `tls.client.cert` contains a value. +// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled. +// +// The processor takes structured data and encodes it using a schema into the +// Confluent wire format. It provides two strategies for determining the +// schema: +// +// - `registry` (recommended) +// +// This strategy downloads an existing schema from the schema registry and +// uses it to encode the record. This requires the schema to already be +// registered in the schema registry. The schema is downloaded only once and +// cached locally. +// +// - `autoRegister` (for development purposes) +// This strategy infers the schema by inspecting the structured data and +// registers it in the schema registry. If the record schema is known in +// advance it's recommended to use the `registry` strategy and manually +// register the schema, as this strategy comes with limitations. +// +// The strategy uses reflection to traverse the structured data of each +// record and determine the type of each field. If a specific field is set +// to `nil` the processor won't have enough information to determine the +// type and will default to a nullable string. Because of this it is not +// guaranteed that two records with the same structure produce the same +// schema or even a backwards compatible schema. The processor registers +// each inferred schema in the schema registry with the same subject, +// therefore the schema compatibility checks need to be disabled for this +// schema to prevent failures. If the schema subject does not exist before +// running this processor, it will automatically set the correct +// compatibility settings the first time it registers the schema. +// +// The processor currently only supports the Avro format. +// +// More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format +// More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html func EncodeWithSchemaKey(config processor.Config) (processor.Interface, error) { return encodeWithSchema(encodeWithSchemaKeyProcType, recordKeyGetSetter{}, config) }