From 31627fec8116564c02a794ff353e19e1784ebbbd Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 16 Nov 2022 19:49:14 +0800 Subject: [PATCH] address comments --- ddl/backfilling.go | 2 +- ddl/ingest/engine_mgr.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 10d8d692c468e..886fcf71841bf 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -828,7 +828,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if len(remains) == 0 { if ingestBeCtx != nil { - ingestBeCtx.EngMgr.ResetWorkers(job.ID, reorgInfo.currElement.ID) + ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID) } break } diff --git a/ddl/ingest/engine_mgr.go b/ddl/ingest/engine_mgr.go index 703634b5ac62d..4cf1734747ee6 100644 --- a/ddl/ingest/engine_mgr.go +++ b/ddl/ingest/engine_mgr.go @@ -103,13 +103,15 @@ func (m *engineManager) Unregister(jobID, indexID int64) { // ResetWorkers reset the writer count of the engineInfo because // the goroutines of backfill workers have been terminated. -func (m *engineManager) ResetWorkers(jobID, indexID int64) { +func (m *engineManager) ResetWorkers(bc *BackendContext, jobID, indexID int64) { ei, exist := m.Load(indexID) if !exist { return } m.MemRoot.Release(StructSizeWriterCtx * int64(ei.writerCount)) m.MemRoot.ReleaseWithTag(encodeEngineTag(jobID, indexID)) + engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) + m.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize) ei.writerCount = 0 }