diff --git a/module.go b/module.go index c497cce..21f4b0c 100644 --- a/module.go +++ b/module.go @@ -65,9 +65,10 @@ func init() { type ( Kafka struct { - vu modules.VU - metrics kafkaMetrics - exports *goja.Object + vu modules.VU + metrics kafkaMetrics + exports *goja.Object + schemaCache map[string]*Schema } RootModule struct{} Module struct { @@ -97,9 +98,10 @@ func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance { // Create a new Kafka module. moduleInstance := &Module{ Kafka: &Kafka{ - vu: virtualUser, - metrics: metrics, - exports: runtime.NewObject(), + vu: virtualUser, + metrics: metrics, + exports: runtime.NewObject(), + schemaCache: make(map[string]*Schema), }, } diff --git a/schema_registry.go b/schema_registry.go index cf58d5e..1e607a4 100644 --- a/schema_registry.go +++ b/schema_registry.go @@ -28,9 +28,10 @@ type BasicAuth struct { } type SchemaRegistryConfig struct { - URL string `json:"url"` - BasicAuth BasicAuth `json:"basicAuth"` - TLS TLSConfig `json:"tls"` + EnableCaching bool `json:"enableCaching"` + URL string `json:"url"` + BasicAuth BasicAuth `json:"basicAuth"` + TLS TLSConfig `json:"tls"` } const ( @@ -42,14 +43,15 @@ const ( // Schema is a wrapper around the schema registry schema. // The Codec() and JsonSchema() methods will return the respective codecs (duck-typing). type Schema struct { - ID int `json:"id"` - Schema string `json:"schema"` - SchemaType *srclient.SchemaType `json:"schemaType"` - Version int `json:"version"` - References []srclient.Reference `json:"references"` - Subject string `json:"subject"` - codec *goavro.Codec - jsonSchema *jsonschema.Schema + EnableCaching bool `json:"enableCaching"` + ID int `json:"id"` + Schema string `json:"schema"` + SchemaType *srclient.SchemaType `json:"schemaType"` + Version int `json:"version"` + References []srclient.Reference `json:"references"` + Subject string `json:"subject"` + codec *goavro.Codec + jsonSchema *jsonschema.Schema } type SubjectNameConfig struct { @@ -238,18 +240,17 @@ func (k *Kafka) schemaRegistryClientClass(call goja.ConstructorCall) *goja.Objec // schemaRegistryClient creates a schemaRegistryClient instance // with the given configuration. It will also configure auth and TLS credentials if exists. -func (k *Kafka) schemaRegistryClient( - configuration *SchemaRegistryConfig) *srclient.SchemaRegistryClient { +func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.SchemaRegistryClient { runtime := k.vu.Runtime() var srClient *srclient.SchemaRegistryClient - tlsConfig, err := GetTLSConfig(configuration.TLS) + tlsConfig, err := GetTLSConfig(config.TLS) if err != nil { // Ignore the error if we're not using TLS if err.Code != noTLSConfig { common.Throw(runtime, err) } - srClient = srclient.CreateSchemaRegistryClient(configuration.URL) + srClient = srclient.CreateSchemaRegistryClient(config.URL) } if tlsConfig != nil { @@ -259,18 +260,29 @@ func (k *Kafka) schemaRegistryClient( }, } srClient = srclient.CreateSchemaRegistryClientWithOptions( - configuration.URL, httpClient, ConcurrentRequests) + config.URL, httpClient, ConcurrentRequests) } - if configuration.BasicAuth.Username != "" && configuration.BasicAuth.Password != "" { - srClient.SetCredentials(configuration.BasicAuth.Username, configuration.BasicAuth.Password) + if config.BasicAuth.Username != "" && config.BasicAuth.Password != "" { + srClient.SetCredentials(config.BasicAuth.Username, config.BasicAuth.Password) } + // The default value for a boolean is false, so the caching + // feature of the srclient package will be disabled. + srClient.CachingEnabled(config.EnableCaching) + return srClient } // getSchema returns the schema for the given subject and schema ID and version. func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema { + // If EnableCache is set, check if the schema is in the cache. + if schema.EnableCaching { + if schema, ok := k.schemaCache[schema.Subject]; ok { + return schema + } + } + runtime := k.vu.Runtime() // The client always caches the schema. var schemaInfo *srclient.Schema @@ -282,20 +294,27 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) schemaInfo, err = client.GetSchemaByVersion( schema.Subject, schema.Version) } - if err != nil { + + if err == nil { + wrappedSchema := &Schema{ + EnableCaching: schema.EnableCaching, + ID: schemaInfo.ID(), + Version: schemaInfo.Version(), + Schema: schemaInfo.Schema(), + SchemaType: schemaInfo.SchemaType(), + References: schemaInfo.References(), + Subject: schema.Subject, + } + // If the Cache is set, cache the schema. + if wrappedSchema.EnableCaching { + k.schemaCache[wrappedSchema.Subject] = wrappedSchema + } + return wrappedSchema + } else { err := NewXk6KafkaError(schemaNotFound, "Failed to get schema from schema registry", err) common.Throw(runtime, err) return nil } - - return &Schema{ - ID: schemaInfo.ID(), - Version: schemaInfo.Version(), - Schema: schemaInfo.Schema(), - SchemaType: schemaInfo.SchemaType(), - References: schemaInfo.References(), - Subject: schema.Subject, - } } // createSchema creates a new schema in the schema registry. @@ -312,14 +331,19 @@ func (k *Kafka) createSchema(client *srclient.SchemaRegistryClient, schema *Sche return nil } - return &Schema{ - ID: schemaInfo.ID(), - Version: schemaInfo.Version(), - Schema: schemaInfo.Schema(), - SchemaType: schemaInfo.SchemaType(), - References: schemaInfo.References(), - Subject: schema.Subject, + wrappedSchema := &Schema{ + EnableCaching: schema.EnableCaching, + ID: schemaInfo.ID(), + Version: schemaInfo.Version(), + Schema: schemaInfo.Schema(), + SchemaType: schemaInfo.SchemaType(), + References: schemaInfo.References(), + Subject: schema.Subject, + } + if schema.EnableCaching { + k.schemaCache[schema.Subject] = wrappedSchema } + return wrappedSchema } // getSubjectName returns the subject name for the given schema and topic.