Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix length of the variable names #136

Merged
merged 1 commit into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,67 +76,67 @@ type ConsumeConfig struct {
// XReader is a wrapper around kafkago.Reader and acts as a JS constructor
// for this extension, thus it must be called with new operator, e.g. new Reader(...).
func (k *Kafka) XReader(call goja.ConstructorCall) *goja.Object {
rt := k.vu.Runtime()
runtime := k.vu.Runtime()
var readerConfig *ReaderConfig
if len(call.Arguments) <= 0 {
common.Throw(rt, ErrorNotEnoughArguments)
common.Throw(runtime, ErrorNotEnoughArguments)
}

if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
if b, err := json.Marshal(params); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
} else {
if err = json.Unmarshal(b, &readerConfig); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}
}
}

reader := k.reader(readerConfig)

readerObject := rt.NewObject()
readerObject := runtime.NewObject()
// This is the reader object itself
if err := readerObject.Set("This", reader); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

err := readerObject.Set("consume", func(call goja.FunctionCall) goja.Value {
var consumeConfig *ConsumeConfig
if len(call.Arguments) <= 0 {
common.Throw(rt, ErrorNotEnoughArguments)
common.Throw(runtime, ErrorNotEnoughArguments)
}

if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
if b, err := json.Marshal(params); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
} else {
if err = json.Unmarshal(b, &consumeConfig); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}
}
}

return rt.ToValue(k.consume(reader, consumeConfig))
return runtime.ToValue(k.consume(reader, consumeConfig))
})
if err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

// This is unnecessary, but it's here for reference purposes
err = readerObject.Set("close", func(call goja.FunctionCall) goja.Value {
if err := reader.Close(); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

return goja.Undefined()
})
if err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

freeze(readerObject)

return rt.ToValue(readerObject).ToObject(rt)
return runtime.ToValue(readerObject).ToObject(runtime)
}

// reader creates a Kafka reader with the given configuration
Expand Down
14 changes: 7 additions & 7 deletions kafka_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,28 @@ type kafkaTest struct {
// nolint: golint
func GetTestModuleInstance(tb testing.TB) *kafkaTest {
tb.Helper()
rt := goja.New()
rt.SetFieldNameMapper(common.FieldNameMapper{})
runtime := goja.New()
runtime.SetFieldNameMapper(common.FieldNameMapper{})

ctx, cancel := context.WithCancel(context.Background())
tb.Cleanup(cancel)

root := New()
mockVU := &modulestest.VU{
RuntimeField: rt,
RuntimeField: runtime,
InitEnvField: &common.InitEnvironment{
Registry: metrics.NewRegistry(),
},
CtxField: ctx,
}
mi, ok := root.NewModuleInstance(mockVU).(*Module)
moduleInstance, ok := root.NewModuleInstance(mockVU).(*Module)
require.True(tb, ok)

require.NoError(tb, rt.Set("kafka", mi.Exports().Default))
require.NoError(tb, runtime.Set("kafka", moduleInstance.Exports().Default))

return &kafkaTest{
rt: rt,
module: mi,
rt: runtime,
module: moduleInstance,
vu: mockVU,
cancelContext: cancel,
}
Expand Down
24 changes: 12 additions & 12 deletions module.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ func New() *RootModule {
}

// NewModuleInstance creates a new instance of the Kafka module.
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
rt := vu.Runtime()
func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance {
runtime := virtualUser.Runtime()

m, err := registerMetrics(vu)
metrics, err := registerMetrics(virtualUser)
if err != nil {
common.Throw(vu.Runtime(), err)
common.Throw(virtualUser.Runtime(), err)
}

// Create a new Kafka module.
moduleInstance := &Module{
Kafka: &Kafka{
vu: vu,
metrics: m,
vu: virtualUser,
metrics: metrics,
serializerRegistry: NewSerializersRegistry(),
deserializerRegistry: NewDeserializersRegistry(),
exports: rt.NewObject(),
exports: runtime.NewObject(),
},
}

Expand All @@ -111,7 +111,7 @@ func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {

mustExport := func(name string, value interface{}) {
if err := moduleInstance.exports.Set(name, value); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}
}

Expand All @@ -124,7 +124,7 @@ func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
mustExport("Connection", moduleInstance.XConnection)

// This causes the struct fields to be exported to the native (camelCases) JS code.
vu.Runtime().SetFieldNameMapper(goja.TagFieldNameMapper("json", true))
virtualUser.Runtime().SetFieldNameMapper(goja.TagFieldNameMapper("json", true))

return moduleInstance
}
Expand All @@ -140,13 +140,13 @@ func (m *Module) Exports() modules.Exports {
// defineConstants defines the constants that can be used in the JS code.
// nolint: funlen
func (m *Module) defineConstants() {
rt := m.vu.Runtime()
runtime := m.vu.Runtime()
mustAddProp := func(name, val string) {
err := m.exports.DefineDataProperty(
name, rt.ToValue(val), goja.FLAG_FALSE, goja.FLAG_FALSE, goja.FLAG_TRUE,
name, runtime.ToValue(val), goja.FLAG_FALSE, goja.FLAG_FALSE, goja.FLAG_TRUE,
)
if err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}
}

Expand Down
40 changes: 20 additions & 20 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,69 +93,69 @@ type ProduceConfig struct {
// XWriter is a wrapper around kafkago.Writer and acts as a JS constructor
// for this extension, thus it must be called with new operator, e.g. new Writer(...).
func (k *Kafka) XWriter(call goja.ConstructorCall) *goja.Object {
rt := k.vu.Runtime()
runtime := k.vu.Runtime()
var writerConfig *WriterConfig
if len(call.Arguments) <= 0 {
common.Throw(rt, ErrorNotEnoughArguments)
common.Throw(runtime, ErrorNotEnoughArguments)
}

if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
if b, err := json.Marshal(params); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
} else {
if err = json.Unmarshal(b, &writerConfig); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}
}
}

writer := k.writer(writerConfig)

writerObject := rt.NewObject()
writerObject := runtime.NewObject()
// This is the writer object itself.
if err := writerObject.Set("This", writer); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

err := writerObject.Set("produce", func(call goja.FunctionCall) goja.Value {
var producerConfig *ProduceConfig
if len(call.Arguments) <= 0 {
common.Throw(rt, ErrorNotEnoughArguments)
common.Throw(runtime, ErrorNotEnoughArguments)
}

if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
b, err := json.Marshal(params)
if err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}
err = json.Unmarshal(b, &producerConfig)
if err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}
}

k.produce(writer, producerConfig)
return goja.Undefined()
})
if err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

// This is unnecessary, but it's here for reference purposes.
err = writerObject.Set("close", func(call goja.FunctionCall) goja.Value {
if err := writer.Close(); err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

return goja.Undefined()
})
if err != nil {
common.Throw(rt, err)
common.Throw(runtime, err)
}

freeze(writerObject)

return rt.ToValue(writerObject).ToObject(rt)
return runtime.ToValue(writerObject).ToObject(runtime)
}

// writer creates a new Kafka writer.
Expand Down Expand Up @@ -242,21 +242,21 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) {
valueSerializer := k.GetSerializer(produceConfig.Config.Producer.ValueSerializer)

kafkaMessages := make([]kafkago.Message, len(produceConfig.Messages))
for i, message := range produceConfig.Messages {
kafkaMessages[i] = kafkago.Message{
for index, message := range produceConfig.Messages {
kafkaMessages[index] = kafkago.Message{
Offset: message.Offset,
}

// Topic can be explicitly set on each individual message.
// Setting topic on the writer and the messages are mutually exclusive.
if message.Topic != "" {
kafkaMessages[i].Topic = message.Topic
kafkaMessages[index].Topic = message.Topic
}

// If time is set, use it to set the time on the message,
// otherwise use the current time.
if !message.Time.IsZero() {
kafkaMessages[i].Time = message.Time
kafkaMessages[index].Time = message.Time
}

// If a key was provided, add it to the message. Keys are optional.
Expand All @@ -268,7 +268,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) {
logger.WithField("error", err).Error(err)
}

kafkaMessages[i].Key = keyData
kafkaMessages[index].Key = keyData
}

// Then add the value to the message.
Expand All @@ -279,12 +279,12 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) {
logger.WithField("error", err).Error(err)
}

kafkaMessages[i].Value = valueData
kafkaMessages[index].Value = valueData

// If headers are provided, add them to the message.
if len(message.Headers) > 0 {
for key, value := range message.Headers {
kafkaMessages[i].Headers = append(kafkaMessages[i].Headers, kafkago.Header{
kafkaMessages[index].Headers = append(kafkaMessages[index].Headers, kafkago.Header{
Key: key,
Value: []byte(fmt.Sprint(value)),
})
Expand Down
Loading