diff --git a/CHANGELOG.md b/CHANGELOG.md index 924c7a96ad6..59ce0e0c4a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -232,6 +232,8 @@ * Fix prometheus data cannot be labeled with universal tags,if slow-decoder is used. [#7100](https://github.com/deepflowio/deepflow/pull/7100) #### NEW FEATURE +* feat: Enable agent new rpc by default [#8449](https://github.com/deepflowio/deepflow/pull/8449) by [rvql](https://github.com/rvql) +* feat: agent - eBPF Add DPDK packet capture [#8415](https://github.com/deepflowio/deepflow/pull/8415) by [yinjiping](https://github.com/yinjiping) * feat: add new agent.proto func [#8112](https://github.com/deepflowio/deepflow/pull/8112) by [jin-xiaofeng](https://github.com/jin-xiaofeng) * feat: agent - eBPF Support for the ARM version of Kylin v10 SP2 [#8439](https://github.com/deepflowio/deepflow/pull/8439) by [yinjiping](https://github.com/yinjiping) * feat: Change bpf map feat to feat_flags to support multi-function [#8424](https://github.com/deepflowio/deepflow/pull/8424) by [rvql](https://github.com/rvql) diff --git a/server/ingester/pkg/ckwriter/ckwriter.go b/server/ingester/pkg/ckwriter/ckwriter.go index 3a2bcb6ad3e..9221953a4d3 100644 --- a/server/ingester/pkg/ckwriter/ckwriter.go +++ b/server/ingester/pkg/ckwriter/ckwriter.go @@ -506,13 +506,14 @@ func (c *Cache) Write() error { if c.batchSize == 0 { return nil } - if err := c.block.Send(); err != nil { - return fmt.Errorf("cache send write block failed: %s", err) - } + err := c.block.Send() c.writeCounter++ c.lastWriteTime = time.Now() c.resetBatch() c.batchSize = 0 + if err != nil { + return fmt.Errorf("cache send write block failed: %s", err) + } return nil } @@ -562,33 +563,9 @@ func (w *CKWriter) Write(queueID int, cache *Cache) { } if err := cache.Write(); err != nil { if logEnabled { - log.Warningf("write table (%s.%s) failed, will retry write (%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err) - } - if err := w.ResetConnection(queueID, cache.writeCounter%qc.connCount); err != nil { - log.Warningf("reconnect clickhouse failed: %s", err) - time.Sleep(time.Second * 10) - } else { - if logEnabled { - log.Infof("reconnect clickhouse success: %s %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName) - } - } - - qc.counter.RetryCount++ - // 写失败重连后重试一次, 规避偶尔写失败问题 - err = cache.Write() - if logEnabled { - if err != nil { - qc.counter.RetryFailedCount++ - log.Warningf("retry write table (%s.%s) failed, drop (%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err) - } else { - log.Infof("retry write table (%s.%s) success, write (%d) items", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen) - } - } - if err != nil { - qc.counter.WriteFailedCount += int64(itemsLen) - } else { - qc.counter.WriteSuccessCount += int64(itemsLen) + log.Warningf("write table (%s.%s) failed, drop (%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err) } + qc.counter.WriteFailedCount += int64(itemsLen) } else { qc.counter.WriteSuccessCount += int64(itemsLen) }