diff --git a/error_codes.go b/error_codes.go index 0582650..a02bfe6 100644 --- a/error_codes.go +++ b/error_codes.go @@ -11,13 +11,13 @@ const ( // non specific. kafkaForbiddenInInitContext errCode = 1000 noContextError errCode = 1001 - contextCancelled errCode = 1002 - cannotReportStats errCode = 1003 - fileNotFound errCode = 1004 - dialerError errCode = 1005 - noTLSConfig errCode = 1006 - failedTypeCast errCode = 1007 - unsupportedOperation errCode = 1008 + cannotReportStats errCode = 1002 + fileNotFound errCode = 1003 + dialerError errCode = 1004 + noTLSConfig errCode = 1005 + failedTypeCast errCode = 1006 + unsupportedOperation errCode = 1007 + writerError errCode = 1008 // serdes errors. invalidDataType errCode = 2000 diff --git a/reader.go b/reader.go index bf735a0..754c152 100644 --- a/reader.go +++ b/reader.go @@ -318,7 +318,7 @@ func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) { ctx := k.vu.Context() if ctx == nil { - err := NewXk6KafkaError(contextCancelled, "No context.", nil) + err := NewXk6KafkaError(noContextError, "No context.", nil) logger.WithField("error", err).Info(err) common.Throw(k.vu.Runtime(), err) } diff --git a/writer.go b/writer.go index 6eec803..bfd6e8a 100644 --- a/writer.go +++ b/writer.go @@ -3,7 +3,6 @@ package kafka import ( "context" "encoding/json" - "errors" "fmt" "time" @@ -250,10 +249,10 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { k.reportWriterStats(writer.Stats()) - if originalErr != nil && errors.Is(originalErr, k.vu.Context().Err()) { - logger.WithField("error", k.vu.Context().Err()).Error(k.vu.Context().Err()) - common.Throw(k.vu.Runtime(), - NewXk6KafkaError(contextCancelled, "Context cancelled.", originalErr)) + if originalErr != nil { + err := NewXk6KafkaError(writerError, "Error writing messages.", originalErr) + logger.WithField("error", err).Error(err) + common.Throw(k.vu.Runtime(), err) } }