Skip to content

Commit

Permalink
Merge branch 'main' into support-python
Browse files Browse the repository at this point in the history
  • Loading branch information
rvql authored Nov 6, 2024
2 parents e19b9a1 + 6458096 commit 793504a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@
* Fix prometheus data cannot be labeled with universal tags,if slow-decoder is used. [#7100](https://github.com/deepflowio/deepflow/pull/7100)

#### NEW FEATURE
* feat: Enable agent new rpc by default [#8449](https://github.com/deepflowio/deepflow/pull/8449) by [rvql](https://github.com/rvql)
* feat: agent - eBPF Add DPDK packet capture [#8415](https://github.com/deepflowio/deepflow/pull/8415) by [yinjiping](https://github.com/yinjiping)
* feat: add new agent.proto func [#8112](https://github.com/deepflowio/deepflow/pull/8112) by [jin-xiaofeng](https://github.com/jin-xiaofeng)
* feat: agent - eBPF Support for the ARM version of Kylin v10 SP2 [#8439](https://github.com/deepflowio/deepflow/pull/8439) by [yinjiping](https://github.com/yinjiping)
* feat: Change bpf map feat to feat_flags to support multi-function [#8424](https://github.com/deepflowio/deepflow/pull/8424) by [rvql](https://github.com/rvql)
Expand Down
35 changes: 6 additions & 29 deletions server/ingester/pkg/ckwriter/ckwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,14 @@ func (c *Cache) Write() error {
if c.batchSize == 0 {
return nil
}
if err := c.block.Send(); err != nil {
return fmt.Errorf("cache send write block failed: %s", err)
}
err := c.block.Send()
c.writeCounter++
c.lastWriteTime = time.Now()
c.resetBatch()
c.batchSize = 0
if err != nil {
return fmt.Errorf("cache send write block failed: %s", err)
}
return nil
}

Expand Down Expand Up @@ -562,33 +563,9 @@ func (w *CKWriter) Write(queueID int, cache *Cache) {
}
if err := cache.Write(); err != nil {
if logEnabled {
log.Warningf("write table (%s.%s) failed, will retry write (%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err)
}
if err := w.ResetConnection(queueID, cache.writeCounter%qc.connCount); err != nil {
log.Warningf("reconnect clickhouse failed: %s", err)
time.Sleep(time.Second * 10)
} else {
if logEnabled {
log.Infof("reconnect clickhouse success: %s %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName)
}
}

qc.counter.RetryCount++
// 写失败重连后重试一次, 规避偶尔写失败问题
err = cache.Write()
if logEnabled {
if err != nil {
qc.counter.RetryFailedCount++
log.Warningf("retry write table (%s.%s) failed, drop (%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err)
} else {
log.Infof("retry write table (%s.%s) success, write (%d) items", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen)
}
}
if err != nil {
qc.counter.WriteFailedCount += int64(itemsLen)
} else {
qc.counter.WriteSuccessCount += int64(itemsLen)
log.Warningf("write table (%s.%s) failed, drop (%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err)
}
qc.counter.WriteFailedCount += int64(itemsLen)
} else {
qc.counter.WriteSuccessCount += int64(itemsLen)
}
Expand Down

0 comments on commit 793504a

Please sign in to comment.