Skip to content

Commit

Permalink
add processor documentations
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Jul 14, 2023
1 parent 3b8bdf4 commit 5350aa8
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 4 deletions.
31 changes: 29 additions & 2 deletions pkg/processor/procbuiltin/decodewithschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,35 @@ func init() {
processor.GlobalBuilderRegistry.MustRegister(decodeWithSchemaPayloadProcType, DecodeWithSchemaPayload)
}

// DecodeWithSchemaKey builds the following processor:
// TODO
// DecodeWithSchemaKey builds a processor with the following config fields:
// - `url` (Required) - URL of the schema registry (e.g. http://localhost:8085)
// - `auth.basic.username` (Optional) - Configures the username to use with
// basic authentication. This option is required if `auth.basic.password`
// contains a value. If both `auth.basic.username` and `auth.basic.password`
// are empty basic authentication is disabled.
// - `auth.basic.password` (Optional) - Configures the password to use with
// basic authentication. This option is required if `auth.basic.username`
// contains a value. If both `auth.basic.username` and `auth.basic.password`
// are empty basic authentication is disabled.
// - `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA
// certificates. If this option is empty, Conduit falls back to using the
// host's root CA set.
// - `tls.client.cert` (Optional) - Path to a file containing a PEM encoded
// certificate. This option is required if `tls.client.key` contains a value.
// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.
// - `tls.client.key` (Optional) - Path to a file containing a PEM encoded
// private key. This option is required if `tls.client.cert` contains a value.
// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.
//
// The processor takes raw data (bytes) and decodes it from the Confluent wire
// format into structured data. It extracts the schema ID from the data,
// downloads the associated schema from the schema registry and decodes the
// payload. The schema is cached locally after it's first downloaded. Currently,
// the processor only supports the Avro format. If the processor encounters
// structured data or the data can't be decoded it returns an error.
//
// More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
// More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html
func DecodeWithSchemaKey(config processor.Config) (processor.Interface, error) {
return decodeWithSchema(decodeWithSchemaKeyProcType, recordKeyGetSetter{}, config)
}
Expand Down
76 changes: 74 additions & 2 deletions pkg/processor/procbuiltin/encodewithschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,80 @@ func init() {
processor.GlobalBuilderRegistry.MustRegister(encodeWithSchemaPayloadProcType, EncodeWithSchemaPayload)
}

// EncodeWithSchemaKey builds the following processor:
// TODO
// EncodeWithSchemaKey builds a processor with the following config fields:
// - `url` (Required) - URL of the schema registry (e.g. http://localhost:8085)
// - `schema.strategy` (Required, Enum: `registry`,`autoRegister`) - Specifies
// which strategy to use to determine the schema for the record. Available
// strategies:
// - `registry` (recommended) - Download an existing schema from the schema
// registry. This strategy is further configured with options starting
// with `schema.registry.*`.
// - `autoRegister` (for development purposes) - Infer the schema from the
// record and register it in the schema registry. This strategy is further
// configured with options starting with `schema.autoRegister.*`.
// - `schema.registry.subject` (Required if `schema.strategy` = `registry`) -
// Specifies the subject of the schema in the schema registry used to encode
// the record.
// - `schema.registry.version` (Required if `schema.strategy` = `registry`) -
// Specifies the version of the schema in the schema registry used to encode
// the record.
// - `schema.autoRegister.subject` (Required if `schema.strategy` = `autoRegister`) -
// Specifies the subject name under which the inferred schema will be
// registered in the schema registry.
// - `schema.autoRegister.format` (Required if `schema.strategy` = `autoRegister`, Enum: `avro`) -
// Specifies the schema format that should be inferred. Currently the only
// supported format is `avro`.
// - `auth.basic.username` (Optional) - Configures the username to use with
// basic authentication. This option is required if `auth.basic.password`
// contains a value. If both `auth.basic.username` and `auth.basic.password`
// are empty basic authentication is disabled.
// - `auth.basic.password` (Optional) - Configures the password to use with
// basic authentication. This option is required if `auth.basic.username`
// contains a value. If both `auth.basic.username` and `auth.basic.password`
// are empty basic authentication is disabled.
// - `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA
// certificates. If this option is empty, Conduit falls back to using the
// host's root CA set.
// - `tls.client.cert` (Optional) - Path to a file containing a PEM encoded
// certificate. This option is required if `tls.client.key` contains a value.
// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.
// - `tls.client.key` (Optional) - Path to a file containing a PEM encoded
// private key. This option is required if `tls.client.cert` contains a value.
// If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.
//
// The processor takes structured data and encodes it using a schema into the
// Confluent wire format. It provides two strategies for determining the
// schema:
//
// - `registry` (recommended)
//
// This strategy downloads an existing schema from the schema registry and
// uses it to encode the record. This requires the schema to already be
// registered in the schema registry. The schema is downloaded only once and
// cached locally.
//
// - `autoRegister` (for development purposes)
// This strategy infers the schema by inspecting the structured data and
// registers it in the schema registry. If the record schema is known in
// advance it's recommended to use the `registry` strategy and manually
// register the schema, as this strategy comes with limitations.
//
// The strategy uses reflection to traverse the structured data of each
// record and determine the type of each field. If a specific field is set
// to `nil` the processor won't have enough information to determine the
// type and will default to a nullable string. Because of this it is not
// guaranteed that two records with the same structure produce the same
// schema or even a backwards compatible schema. The processor registers
// each inferred schema in the schema registry with the same subject,
// therefore the schema compatibility checks need to be disabled for this
// schema to prevent failures. If the schema subject does not exist before
// running this processor, it will automatically set the correct
// compatibility settings the first time it registers the schema.
//
// The processor currently only supports the Avro format.
//
// More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
// More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html
func EncodeWithSchemaKey(config processor.Config) (processor.Interface, error) {
return encodeWithSchema(encodeWithSchemaKeyProcType, recordKeyGetSetter{}, config)
}
Expand Down

0 comments on commit 5350aa8

Please sign in to comment.