diff --git a/producer/io_worker.go b/producer/io_worker.go index 95400d01..f2e33d4d 100644 --- a/producer/io_worker.go +++ b/producer/io_worker.go @@ -25,9 +25,10 @@ type IoWorker struct { logger log.Logger maxIoWorker chan int64 noRetryStatusCodeMap map[int]*string + producer *Producer } -func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log.Logger, maxIoWorkerCount int64, errorStatusMap map[int]*string) *IoWorker { +func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log.Logger, maxIoWorkerCount int64, errorStatusMap map[int]*string, producer *Producer) *IoWorker { return &IoWorker{ client: client, retryQueue: retryQueue, @@ -36,10 +37,14 @@ func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log logger: logger, maxIoWorker: make(chan int64, maxIoWorkerCount), noRetryStatusCodeMap: errorStatusMap, + producer: producer, } } func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWaitGroup *sync.WaitGroup) { + if producerBatch == nil || ioWorkerWaitGroup == nil { + return + } level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server") defer ioWorker.closeSendTask(ioWorkerWaitGroup) var err error @@ -57,7 +62,7 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWai } producerBatch.result.successful = true // After successful delivery, producer removes the batch size sent out - atomic.AddInt64(&producerLogGroupSize, -producerBatch.totalDataSize) + atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize) if len(producerBatch.callBackList) > 0 { for _, callBack := range producerBatch.callBackList { callBack.Success(producerBatch.result) @@ -118,7 +123,7 @@ func (ioWorker *IoWorker) closeSendTask(ioWorkerWaitGroup *sync.WaitGroup) { func (ioWorker *IoWorker) excuteFailedCallback(producerBatch *ProducerBatch) { level.Info(ioWorker.logger).Log("msg", "sendToServer failed,Execute failed callback function") - atomic.AddInt64(&producerLogGroupSize, -producerBatch.totalDataSize) + atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize) if len(producerBatch.callBackList) > 0 { for _, callBack := range producerBatch.callBackList { callBack.Fail(producerBatch.result) diff --git a/producer/log_accumulator.go b/producer/log_accumulator.go index 338e6968..8b19ee08 100644 --- a/producer/log_accumulator.go +++ b/producer/log_accumulator.go @@ -14,21 +14,24 @@ import ( type LogAccumulator struct { lock sync.RWMutex - logGroupData sync.Map //map[string]*ProducerBatch, + logGroupData map[string]*ProducerBatch producerConfig *ProducerConfig ioWorker *IoWorker shutDownFlag *uberatomic.Bool logger log.Logger threadPool *IoThreadPool + producer *Producer } -func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool) *LogAccumulator { +func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool, producer *Producer) *LogAccumulator { return &LogAccumulator{ + logGroupData: make(map[string]*ProducerBatch), producerConfig: config, ioWorker: ioWorker, shutDownFlag: uberatomic.NewBool(false), logger: logger, threadPool: threadPool, + producer: producer, } } @@ -39,14 +42,14 @@ func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logst if callback != nil { producerBatch.addProducerBatchCallBack(callback) } - logAccumulator.sendToServer(key, producerBatch) + logAccumulator.innerSendToServer(key, producerBatch) } else if int64(producerBatch.totalDataSize) <= logAccumulator.producerConfig.MaxBatchSize && totalDataCount <= logAccumulator.producerConfig.MaxBatchCount { producerBatch.addLogToLogGroup(log) if callback != nil { producerBatch.addProducerBatchCallBack(callback) } } else { - logAccumulator.sendToServer(key, producerBatch) + logAccumulator.innerSendToServer(key, producerBatch) logAccumulator.createNewProducerBatch(log, callback, key, project, logstore, logTopic, logSource, shardHash) } } @@ -54,30 +57,28 @@ func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logst // In this function,Naming with mlog is to avoid conflicts with the introduced kit/log package names. func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, shardHash, logTopic, logSource string, logData interface{}, callback CallBack) error { - defer logAccumulator.lock.Unlock() - logAccumulator.lock.Lock() if logAccumulator.shutDownFlag.Load() { level.Warn(logAccumulator.logger).Log("msg", "Producer has started and shut down and cannot write to new logs") return errors.New("Producer has started and shut down and cannot write to new logs") } key := logAccumulator.getKeyString(project, logstore, logTopic, shardHash, logSource) + defer logAccumulator.lock.Unlock() + logAccumulator.lock.Lock() if mlog, ok := logData.(*sls.Log); ok { - if data, ok := logAccumulator.logGroupData.Load(key); ok == true { - producerBatch := data.(*ProducerBatch) + if producerBatch, ok := logAccumulator.logGroupData[key]; ok == true { logSize := int64(GetLogSizeCalculate(mlog)) atomic.AddInt64(&producerBatch.totalDataSize, logSize) - atomic.AddInt64(&producerLogGroupSize, logSize) + atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logSize) logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, mlog, callback) } else { logAccumulator.createNewProducerBatch(mlog, callback, key, project, logstore, logTopic, logSource, shardHash) } } else if logList, ok := logData.([]*sls.Log); ok { - if data, ok := logAccumulator.logGroupData.Load(key); ok == true { - producerBatch := data.(*ProducerBatch) + if producerBatch, ok := logAccumulator.logGroupData[key]; ok == true { logListSize := int64(GetLogListSize(logList)) atomic.AddInt64(&producerBatch.totalDataSize, logListSize) - atomic.AddInt64(&producerLogGroupSize, logListSize) + atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logListSize) logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, logList, callback) } else { @@ -96,20 +97,17 @@ func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{} if mlog, ok := logType.(*sls.Log); ok { newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) - logAccumulator.logGroupData.Store(key, newProducerBatch) + logAccumulator.logGroupData[key] = newProducerBatch } else if logList, ok := logType.([]*sls.Log); ok { newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig) - logAccumulator.logGroupData.Store(key, newProducerBatch) + logAccumulator.logGroupData[key] = newProducerBatch } } -func (logAccumulator *LogAccumulator) sendToServer(key string, producerBatch *ProducerBatch) { - defer ioLock.Unlock() - ioLock.Lock() +func (logAccumulator *LogAccumulator) innerSendToServer(key string, producerBatch *ProducerBatch) { level.Debug(logAccumulator.logger).Log("msg", "Send producerBatch to IoWorker from logAccumulator") logAccumulator.threadPool.addTask(producerBatch) - logAccumulator.logGroupData.Delete(key) - + delete(logAccumulator.logGroupData, key) } func (logAccumulator *LogAccumulator) getKeyString(project, logstore, logTopic, shardHash, logSource string) string { diff --git a/producer/mover.go b/producer/mover.go index ef4ec597..6d85a5d4 100644 --- a/producer/mover.go +++ b/producer/mover.go @@ -31,38 +31,37 @@ func initMover(logAccumulator *LogAccumulator, retryQueue *RetryQueue, ioWorker } -func (mover *Mover) sendToServer(key interface{}, batch *ProducerBatch, config *ProducerConfig) { - defer ioLock.Unlock() - ioLock.Lock() - if value, ok := mover.logAccumulator.logGroupData.Load(key); !ok { +func (mover *Mover) sendToServer(key string, batch *ProducerBatch, config *ProducerConfig) { + if value, ok := mover.logAccumulator.logGroupData[key]; !ok { return - } else if GetTimeMs(time.Now().UnixNano())-value.(*ProducerBatch).createTimeMs < config.LingerMs { + } else if GetTimeMs(time.Now().UnixNano())-value.createTimeMs < config.LingerMs { return } mover.threadPool.addTask(batch) - mover.logAccumulator.logGroupData.Delete(key) + delete(mover.logAccumulator.logGroupData, key) } func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) { defer moverWaitGroup.Done() for !mover.moverShutDownFlag.Load() { sleepMs := config.LingerMs - mapCount := 0 - mover.logAccumulator.logGroupData.Range(func(key, value interface{}) bool { - mapCount = mapCount + 1 - if batch, ok := value.(*ProducerBatch); ok { - timeInterval := batch.createTimeMs + config.LingerMs - GetTimeMs(time.Now().UnixNano()) - if timeInterval <= 0 { - level.Debug(mover.logger).Log("msg", "mover groutine execute sent producerBatch to IoWorker") - mover.sendToServer(key, batch, config) - } else { - if sleepMs > timeInterval { - sleepMs = timeInterval - } + + nowTimeMs := GetTimeMs(time.Now().UnixNano()) + mover.logAccumulator.lock.Lock() + mapCount := len(mover.logAccumulator.logGroupData) + for key, batch := range mover.logAccumulator.logGroupData { + timeInterval := batch.createTimeMs + config.LingerMs - nowTimeMs + if timeInterval <= 0 { + level.Debug(mover.logger).Log("msg", "mover groutine execute sent producerBatch to IoWorker") + mover.sendToServer(key, batch, config) + } else { + if sleepMs > timeInterval { + sleepMs = timeInterval } } - return true - }) + } + mover.logAccumulator.lock.Unlock() + if mapCount == 0 { level.Debug(mover.logger).Log("msg", "No data time in map waiting for user configured RemainMs parameter values") sleepMs = config.LingerMs @@ -80,11 +79,12 @@ func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) } } - mover.logAccumulator.logGroupData.Range(func(key, batch interface{}) bool { - mover.threadPool.addTask(batch.(*ProducerBatch)) - mover.logAccumulator.logGroupData.Delete(key) - return true - }) + mover.logAccumulator.lock.Lock() + for _, batch := range mover.logAccumulator.logGroupData { + mover.threadPool.addTask(batch) + } + mover.logAccumulator.logGroupData = make(map[string]*ProducerBatch) + mover.logAccumulator.lock.Unlock() producerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load()) count := len(producerBatchList) diff --git a/producer/producer.go b/producer/producer.go index 6d8e5963..a6f5f00b 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -16,9 +16,6 @@ const ( IllegalStateException = "IllegalStateException" ) -var producerLogGroupSize int64 -var ioLock sync.RWMutex - type Producer struct { producerConfig *ProducerConfig logAccumulator *LogAccumulator @@ -29,6 +26,7 @@ type Producer struct { ioThreadPoolWaitGroup *sync.WaitGroup buckets int logger log.Logger + producerLogGroupSize int64 } func InitProducer(producerConfig *ProducerConfig) *Producer { @@ -52,17 +50,18 @@ func InitProducer(producerConfig *ProducerConfig) *Producer { } return errorCodeMap }() - ioWorker := initIoWorker(client, retryQueue, logger, finalProducerConfig.MaxIoWorkerCount, errorStatusMap) - threadPool := initIoThreadPool(ioWorker, logger) - logAccumulator := initLogAccumulator(finalProducerConfig, ioWorker, logger, threadPool) - mover := initMover(logAccumulator, retryQueue, ioWorker, logger, threadPool) producer := &Producer{ producerConfig: finalProducerConfig, - logAccumulator: logAccumulator, - mover: mover, - threadPool: threadPool, buckets: finalProducerConfig.Buckets, } + ioWorker := initIoWorker(client, retryQueue, logger, finalProducerConfig.MaxIoWorkerCount, errorStatusMap, producer) + threadPool := initIoThreadPool(ioWorker, logger) + logAccumulator := initLogAccumulator(finalProducerConfig, ioWorker, logger, threadPool, producer) + mover := initMover(logAccumulator, retryQueue, ioWorker, logger, threadPool) + + producer.logAccumulator = logAccumulator + producer.mover = mover + producer.threadPool = threadPool producer.moverWaitGroup = &sync.WaitGroup{} producer.ioWorkerWaitGroup = &sync.WaitGroup{} producer.ioThreadPoolWaitGroup = &sync.WaitGroup{} @@ -205,7 +204,7 @@ func (producer *Producer) waitTime() error { if producer.producerConfig.MaxBlockSec > 0 { for i := 0; i < producer.producerConfig.MaxBlockSec; i++ { - if atomic.LoadInt64(&producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { + if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { time.Sleep(time.Second) } else { return nil @@ -214,13 +213,13 @@ func (producer *Producer) waitTime() error { level.Error(producer.logger).Log("msg", "Over producer set maximum blocking time") return errors.New(TimeoutExecption) } else if producer.producerConfig.MaxBlockSec == 0 { - if atomic.LoadInt64(&producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { + if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { level.Error(producer.logger).Log("msg", "Over producer set maximum blocking time") return errors.New(TimeoutExecption) } } else if producer.producerConfig.MaxBlockSec < 0 { for { - if atomic.LoadInt64(&producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { + if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes { time.Sleep(time.Second) } else { return nil