Skip to content

Commit

Permalink
[fix] producer concurrently operation crash
Browse files Browse the repository at this point in the history
  • Loading branch information
shabicheng committed Feb 28, 2021
1 parent 7996359 commit 70cf6db
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 60 deletions.
11 changes: 8 additions & 3 deletions producer/io_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type IoWorker struct {
logger log.Logger
maxIoWorker chan int64
noRetryStatusCodeMap map[int]*string
producer *Producer
}

func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log.Logger, maxIoWorkerCount int64, errorStatusMap map[int]*string) *IoWorker {
func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log.Logger, maxIoWorkerCount int64, errorStatusMap map[int]*string, producer *Producer) *IoWorker {
return &IoWorker{
client: client,
retryQueue: retryQueue,
Expand All @@ -36,10 +37,14 @@ func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log
logger: logger,
maxIoWorker: make(chan int64, maxIoWorkerCount),
noRetryStatusCodeMap: errorStatusMap,
producer: producer,
}
}

func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWaitGroup *sync.WaitGroup) {
if producerBatch == nil || ioWorkerWaitGroup == nil {
return
}
level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
defer ioWorker.closeSendTask(ioWorkerWaitGroup)
var err error
Expand All @@ -57,7 +62,7 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWai
}
producerBatch.result.successful = true
// After successful delivery, producer removes the batch size sent out
atomic.AddInt64(&producerLogGroupSize, -producerBatch.totalDataSize)
atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
callBack.Success(producerBatch.result)
Expand Down Expand Up @@ -118,7 +123,7 @@ func (ioWorker *IoWorker) closeSendTask(ioWorkerWaitGroup *sync.WaitGroup) {

func (ioWorker *IoWorker) excuteFailedCallback(producerBatch *ProducerBatch) {
level.Info(ioWorker.logger).Log("msg", "sendToServer failed,Execute failed callback function")
atomic.AddInt64(&producerLogGroupSize, -producerBatch.totalDataSize)
atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
callBack.Fail(producerBatch.result)
Expand Down
36 changes: 17 additions & 19 deletions producer/log_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ import (

type LogAccumulator struct {
lock sync.RWMutex
logGroupData sync.Map //map[string]*ProducerBatch,
logGroupData map[string]*ProducerBatch
producerConfig *ProducerConfig
ioWorker *IoWorker
shutDownFlag *uberatomic.Bool
logger log.Logger
threadPool *IoThreadPool
producer *Producer
}

func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool) *LogAccumulator {
func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool, producer *Producer) *LogAccumulator {
return &LogAccumulator{
logGroupData: make(map[string]*ProducerBatch),
producerConfig: config,
ioWorker: ioWorker,
shutDownFlag: uberatomic.NewBool(false),
logger: logger,
threadPool: threadPool,
producer: producer,
}
}

Expand All @@ -39,45 +42,43 @@ func (logAccumulator *LogAccumulator) addOrSendProducerBatch(key, project, logst
if callback != nil {
producerBatch.addProducerBatchCallBack(callback)
}
logAccumulator.sendToServer(key, producerBatch)
logAccumulator.innerSendToServer(key, producerBatch)
} else if int64(producerBatch.totalDataSize) <= logAccumulator.producerConfig.MaxBatchSize && totalDataCount <= logAccumulator.producerConfig.MaxBatchCount {
producerBatch.addLogToLogGroup(log)
if callback != nil {
producerBatch.addProducerBatchCallBack(callback)
}
} else {
logAccumulator.sendToServer(key, producerBatch)
logAccumulator.innerSendToServer(key, producerBatch)
logAccumulator.createNewProducerBatch(log, callback, key, project, logstore, logTopic, logSource, shardHash)
}
}

// In this function,Naming with mlog is to avoid conflicts with the introduced kit/log package names.
func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, shardHash, logTopic, logSource string,
logData interface{}, callback CallBack) error {
defer logAccumulator.lock.Unlock()
logAccumulator.lock.Lock()
if logAccumulator.shutDownFlag.Load() {
level.Warn(logAccumulator.logger).Log("msg", "Producer has started and shut down and cannot write to new logs")
return errors.New("Producer has started and shut down and cannot write to new logs")
}

key := logAccumulator.getKeyString(project, logstore, logTopic, shardHash, logSource)
defer logAccumulator.lock.Unlock()
logAccumulator.lock.Lock()
if mlog, ok := logData.(*sls.Log); ok {
if data, ok := logAccumulator.logGroupData.Load(key); ok == true {
producerBatch := data.(*ProducerBatch)
if producerBatch, ok := logAccumulator.logGroupData[key]; ok == true {
logSize := int64(GetLogSizeCalculate(mlog))
atomic.AddInt64(&producerBatch.totalDataSize, logSize)
atomic.AddInt64(&producerLogGroupSize, logSize)
atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logSize)
logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, mlog, callback)
} else {
logAccumulator.createNewProducerBatch(mlog, callback, key, project, logstore, logTopic, logSource, shardHash)
}
} else if logList, ok := logData.([]*sls.Log); ok {
if data, ok := logAccumulator.logGroupData.Load(key); ok == true {
producerBatch := data.(*ProducerBatch)
if producerBatch, ok := logAccumulator.logGroupData[key]; ok == true {
logListSize := int64(GetLogListSize(logList))
atomic.AddInt64(&producerBatch.totalDataSize, logListSize)
atomic.AddInt64(&producerLogGroupSize, logListSize)
atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logListSize)
logAccumulator.addOrSendProducerBatch(key, project, logstore, logTopic, logSource, shardHash, producerBatch, logList, callback)

} else {
Expand All @@ -96,20 +97,17 @@ func (logAccumulator *LogAccumulator) createNewProducerBatch(logType interface{}

if mlog, ok := logType.(*sls.Log); ok {
newProducerBatch := initProducerBatch(mlog, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData.Store(key, newProducerBatch)
logAccumulator.logGroupData[key] = newProducerBatch
} else if logList, ok := logType.([]*sls.Log); ok {
newProducerBatch := initProducerBatch(logList, callback, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData.Store(key, newProducerBatch)
logAccumulator.logGroupData[key] = newProducerBatch
}
}

func (logAccumulator *LogAccumulator) sendToServer(key string, producerBatch *ProducerBatch) {
defer ioLock.Unlock()
ioLock.Lock()
func (logAccumulator *LogAccumulator) innerSendToServer(key string, producerBatch *ProducerBatch) {
level.Debug(logAccumulator.logger).Log("msg", "Send producerBatch to IoWorker from logAccumulator")
logAccumulator.threadPool.addTask(producerBatch)
logAccumulator.logGroupData.Delete(key)

delete(logAccumulator.logGroupData, key)
}

func (logAccumulator *LogAccumulator) getKeyString(project, logstore, logTopic, shardHash, logSource string) string {
Expand Down
50 changes: 25 additions & 25 deletions producer/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,37 @@ func initMover(logAccumulator *LogAccumulator, retryQueue *RetryQueue, ioWorker

}

func (mover *Mover) sendToServer(key interface{}, batch *ProducerBatch, config *ProducerConfig) {
defer ioLock.Unlock()
ioLock.Lock()
if value, ok := mover.logAccumulator.logGroupData.Load(key); !ok {
func (mover *Mover) sendToServer(key string, batch *ProducerBatch, config *ProducerConfig) {
if value, ok := mover.logAccumulator.logGroupData[key]; !ok {
return
} else if GetTimeMs(time.Now().UnixNano())-value.(*ProducerBatch).createTimeMs < config.LingerMs {
} else if GetTimeMs(time.Now().UnixNano())-value.createTimeMs < config.LingerMs {
return
}
mover.threadPool.addTask(batch)
mover.logAccumulator.logGroupData.Delete(key)
delete(mover.logAccumulator.logGroupData, key)
}

func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig) {
defer moverWaitGroup.Done()
for !mover.moverShutDownFlag.Load() {
sleepMs := config.LingerMs
mapCount := 0
mover.logAccumulator.logGroupData.Range(func(key, value interface{}) bool {
mapCount = mapCount + 1
if batch, ok := value.(*ProducerBatch); ok {
timeInterval := batch.createTimeMs + config.LingerMs - GetTimeMs(time.Now().UnixNano())
if timeInterval <= 0 {
level.Debug(mover.logger).Log("msg", "mover groutine execute sent producerBatch to IoWorker")
mover.sendToServer(key, batch, config)
} else {
if sleepMs > timeInterval {
sleepMs = timeInterval
}

nowTimeMs := GetTimeMs(time.Now().UnixNano())
mover.logAccumulator.lock.Lock()
mapCount := len(mover.logAccumulator.logGroupData)
for key, batch := range mover.logAccumulator.logGroupData {
timeInterval := batch.createTimeMs + config.LingerMs - nowTimeMs
if timeInterval <= 0 {
level.Debug(mover.logger).Log("msg", "mover groutine execute sent producerBatch to IoWorker")
mover.sendToServer(key, batch, config)
} else {
if sleepMs > timeInterval {
sleepMs = timeInterval
}
}
return true
})
}
mover.logAccumulator.lock.Unlock()

if mapCount == 0 {
level.Debug(mover.logger).Log("msg", "No data time in map waiting for user configured RemainMs parameter values")
sleepMs = config.LingerMs
Expand All @@ -80,11 +79,12 @@ func (mover *Mover) run(moverWaitGroup *sync.WaitGroup, config *ProducerConfig)
}

}
mover.logAccumulator.logGroupData.Range(func(key, batch interface{}) bool {
mover.threadPool.addTask(batch.(*ProducerBatch))
mover.logAccumulator.logGroupData.Delete(key)
return true
})
mover.logAccumulator.lock.Lock()
for _, batch := range mover.logAccumulator.logGroupData {
mover.threadPool.addTask(batch)
}
mover.logAccumulator.logGroupData = make(map[string]*ProducerBatch)
mover.logAccumulator.lock.Unlock()

producerBatchList := mover.retryQueue.getRetryBatch(mover.moverShutDownFlag.Load())
count := len(producerBatchList)
Expand Down
25 changes: 12 additions & 13 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ const (
IllegalStateException = "IllegalStateException"
)

var producerLogGroupSize int64
var ioLock sync.RWMutex

type Producer struct {
producerConfig *ProducerConfig
logAccumulator *LogAccumulator
Expand All @@ -29,6 +26,7 @@ type Producer struct {
ioThreadPoolWaitGroup *sync.WaitGroup
buckets int
logger log.Logger
producerLogGroupSize int64
}

func InitProducer(producerConfig *ProducerConfig) *Producer {
Expand All @@ -52,17 +50,18 @@ func InitProducer(producerConfig *ProducerConfig) *Producer {
}
return errorCodeMap
}()
ioWorker := initIoWorker(client, retryQueue, logger, finalProducerConfig.MaxIoWorkerCount, errorStatusMap)
threadPool := initIoThreadPool(ioWorker, logger)
logAccumulator := initLogAccumulator(finalProducerConfig, ioWorker, logger, threadPool)
mover := initMover(logAccumulator, retryQueue, ioWorker, logger, threadPool)
producer := &Producer{
producerConfig: finalProducerConfig,
logAccumulator: logAccumulator,
mover: mover,
threadPool: threadPool,
buckets: finalProducerConfig.Buckets,
}
ioWorker := initIoWorker(client, retryQueue, logger, finalProducerConfig.MaxIoWorkerCount, errorStatusMap, producer)
threadPool := initIoThreadPool(ioWorker, logger)
logAccumulator := initLogAccumulator(finalProducerConfig, ioWorker, logger, threadPool, producer)
mover := initMover(logAccumulator, retryQueue, ioWorker, logger, threadPool)

producer.logAccumulator = logAccumulator
producer.mover = mover
producer.threadPool = threadPool
producer.moverWaitGroup = &sync.WaitGroup{}
producer.ioWorkerWaitGroup = &sync.WaitGroup{}
producer.ioThreadPoolWaitGroup = &sync.WaitGroup{}
Expand Down Expand Up @@ -205,7 +204,7 @@ func (producer *Producer) waitTime() error {
if producer.producerConfig.MaxBlockSec > 0 {
for i := 0; i < producer.producerConfig.MaxBlockSec; i++ {

if atomic.LoadInt64(&producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes {
if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes {
time.Sleep(time.Second)
} else {
return nil
Expand All @@ -214,13 +213,13 @@ func (producer *Producer) waitTime() error {
level.Error(producer.logger).Log("msg", "Over producer set maximum blocking time")
return errors.New(TimeoutExecption)
} else if producer.producerConfig.MaxBlockSec == 0 {
if atomic.LoadInt64(&producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes {
if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes {
level.Error(producer.logger).Log("msg", "Over producer set maximum blocking time")
return errors.New(TimeoutExecption)
}
} else if producer.producerConfig.MaxBlockSec < 0 {
for {
if atomic.LoadInt64(&producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes {
if atomic.LoadInt64(&producer.producerLogGroupSize) > producer.producerConfig.TotalSizeLnBytes {
time.Sleep(time.Second)
} else {
return nil
Expand Down

0 comments on commit 70cf6db

Please sign in to comment.