diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go
index 6004a282bf88b..2e0f274650140 100644
--- a/br/pkg/lightning/backend/local/engine.go
+++ b/br/pkg/lightning/backend/local/engine.go
@@ -1037,9 +1037,9 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) (err error) {
 		totalKeySize += keySize
 	}
 	w.batchCount += len(kvs)
-	// NoopKeyAdapter doesn't really change the key,
+	// noopKeyAdapter doesn't really change the key,
 	// skipping the encoding to avoid unnecessary alloc and copy.
-	if _, ok := keyAdapter.(NoopKeyAdapter); !ok {
+	if _, ok := keyAdapter.(noopKeyAdapter); !ok {
 		if cap(w.sortedKeyBuf) < totalKeySize {
 			w.sortedKeyBuf = make([]byte, totalKeySize)
 		}
diff --git a/br/pkg/lightning/backend/local/engine_test.go b/br/pkg/lightning/backend/local/engine_test.go
index 434d0a1d100d9..93a007f867016 100644
--- a/br/pkg/lightning/backend/local/engine_test.go
+++ b/br/pkg/lightning/backend/local/engine_test.go
@@ -99,7 +99,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
 		ctx:          engineCtx,
 		cancel:       cancel,
 		sstMetasChan: make(chan metaOrFlush, 64),
-		keyAdapter:   NoopKeyAdapter{},
+		keyAdapter:   noopKeyAdapter{},
 		logger:       log.L(),
 	}
 	f.db.Store(db)
diff --git a/br/pkg/lightning/backend/local/key_adapter.go b/br/pkg/lightning/backend/local/key_adapter.go
index 9955c837d7d71..5d9d119b2c3ec 100644
--- a/br/pkg/lightning/backend/local/key_adapter.go
+++ b/br/pkg/lightning/backend/local/key_adapter.go
@@ -45,21 +45,21 @@ func reallocBytes(b []byte, n int) []byte {
 	return b
 }
 
-type NoopKeyAdapter struct{}
+type noopKeyAdapter struct{}
 
-func (NoopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte {
+func (noopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte {
 	return append(dst, key...)
 }
 
-func (NoopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
+func (noopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error) {
 	return append(dst, data...), nil
 }
 
-func (NoopKeyAdapter) EncodedLen(key []byte, _ []byte) int {
+func (noopKeyAdapter) EncodedLen(key []byte, _ []byte) int {
 	return len(key)
 }
 
-var _ KeyAdapter = NoopKeyAdapter{}
+var _ KeyAdapter = noopKeyAdapter{}
 
 type dupDetectKeyAdapter struct{}
 
diff --git a/br/pkg/lightning/backend/local/key_adapter_test.go b/br/pkg/lightning/backend/local/key_adapter_test.go
index d8df65d7ff7bf..d80efa6de2af4 100644
--- a/br/pkg/lightning/backend/local/key_adapter_test.go
+++ b/br/pkg/lightning/backend/local/key_adapter_test.go
@@ -33,7 +33,7 @@ func randBytes(n int) []byte {
 }
 
 func TestNoopKeyAdapter(t *testing.T) {
-	keyAdapter := NoopKeyAdapter{}
+	keyAdapter := noopKeyAdapter{}
 	key := randBytes(32)
 	require.Len(t, key, keyAdapter.EncodedLen(key, ZeroRowID))
 	encodedKey := keyAdapter.Encode(nil, key, ZeroRowID)
@@ -112,7 +112,7 @@ func startWithSameMemory(x []byte, y []byte) bool {
 }
 
 func TestEncodeKeyToPreAllocatedBuf(t *testing.T) {
-	keyAdapters := []KeyAdapter{NoopKeyAdapter{}, dupDetectKeyAdapter{}}
+	keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
 	for _, keyAdapter := range keyAdapters {
 		key := randBytes(32)
 		buf := make([]byte, 256)
@@ -130,7 +130,7 @@ func TestDecodeKeyToPreAllocatedBuf(t *testing.T) {
 		0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7,
 		0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8,
 	}
-	keyAdapters := []KeyAdapter{NoopKeyAdapter{}, dupDetectKeyAdapter{}}
+	keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
 	for _, keyAdapter := range keyAdapters {
 		key, err := keyAdapter.Decode(nil, data)
 		require.NoError(t, err)
@@ -147,7 +147,7 @@ func TestDecodeKeyDstIsInsufficient(t *testing.T) {
 		0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7,
 		0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x8,
 	}
-	keyAdapters := []KeyAdapter{NoopKeyAdapter{}, dupDetectKeyAdapter{}}
+	keyAdapters := []KeyAdapter{noopKeyAdapter{}, dupDetectKeyAdapter{}}
 	for _, keyAdapter := range keyAdapters {
 		key, err := keyAdapter.Decode(nil, data)
 		require.NoError(t, err)
diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go
index 95e4ea9caf174..978fbd8587545 100644
--- a/br/pkg/lightning/backend/local/local.go
+++ b/br/pkg/lightning/backend/local/local.go
@@ -560,7 +560,7 @@ func NewBackend(
 		return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
 	}
 	importClientFactory := NewImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
-	keyAdapter := KeyAdapter(NoopKeyAdapter{})
+	keyAdapter := KeyAdapter(noopKeyAdapter{})
 	if config.DupeDetectEnabled {
 		keyAdapter = dupDetectKeyAdapter{}
 	}
diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go
index 09807ec567c63..0129bbdb1dfba 100644
--- a/br/pkg/lightning/backend/local/local_test.go
+++ b/br/pkg/lightning/backend/local/local_test.go
@@ -180,7 +180,7 @@ func TestRangeProperties(t *testing.T) {
 	userProperties := make(map[string]string, 1)
 	_ = collector.Finish(userProperties)
 
-	props, err := decodeRangeProperties(hack.Slice(userProperties[propRangeIndex]), NoopKeyAdapter{})
+	props, err := decodeRangeProperties(hack.Slice(userProperties[propRangeIndex]), noopKeyAdapter{})
 	require.NoError(t, err)
 
 	// Smallest key in props.
@@ -333,7 +333,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
 		ctx:          engineCtx,
 		cancel:       cancel,
 		sstMetasChan: make(chan metaOrFlush, 64),
-		keyAdapter:   NoopKeyAdapter{},
+		keyAdapter:   noopKeyAdapter{},
 		logger:       log.L(),
 	}
 	f.db.Store(db)
@@ -1157,7 +1157,7 @@ func TestCheckPeersBusy(t *testing.T) {
 		ctx:          engineCtx,
 		cancel:       cancel2,
 		sstMetasChan: make(chan metaOrFlush, 64),
-		keyAdapter:   NoopKeyAdapter{},
+		keyAdapter:   noopKeyAdapter{},
 		logger:       log.L(),
 	}
 	f.db.Store(db)
@@ -1293,7 +1293,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
 		ctx:          engineCtx,
 		cancel:       cancel2,
 		sstMetasChan: make(chan metaOrFlush, 64),
-		keyAdapter:   NoopKeyAdapter{},
+		keyAdapter:   noopKeyAdapter{},
 		logger:       log.L(),
 	}
 	f.db.Store(db)
@@ -1400,7 +1400,7 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
 		ctx:          engineCtx,
 		cancel:       cancel2,
 		sstMetasChan: make(chan metaOrFlush, 64),
-		keyAdapter:   NoopKeyAdapter{},
+		keyAdapter:   noopKeyAdapter{},
 		logger:       log.L(),
 	}
 	f.db.Store(db)
@@ -1507,7 +1507,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
 		ctx:          engineCtx,
 		cancel:       cancel2,
 		sstMetasChan: make(chan metaOrFlush, 64),
-		keyAdapter:   NoopKeyAdapter{},
+		keyAdapter:   noopKeyAdapter{},
 		logger:       log.L(),
 	}
 	f.db.Store(db)
@@ -1646,7 +1646,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
 		ctx:          engineCtx,
 		cancel:       cancel,
 		sstMetasChan: make(chan metaOrFlush, 64),
-		keyAdapter:   NoopKeyAdapter{},
+		keyAdapter:   noopKeyAdapter{},
 		logger:       log.L(),
 	}
 	f.db.Store(db)
diff --git a/br/pkg/lightning/backend/remote/remote.go b/br/pkg/lightning/backend/remote/remote.go
index 9a6ef0c55658a..8835ac4ec7271 100644
--- a/br/pkg/lightning/backend/remote/remote.go
+++ b/br/pkg/lightning/backend/remote/remote.go
@@ -245,12 +245,7 @@ func (remote *Backend) SetRange(ctx context.Context, start, end kv.Key, dataFile
 func (remote *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
 	switch remote.phase {
 	case PhaseUpload:
-		for _, w := range remote.mu.writers {
-			_, err := w.Close(ctx)
-			if err != nil {
-				return err
-			}
-		}
+		// Do nothing for uploading stage.
 		return nil
 	case PhaseImport:
 		if len(remote.startKey) == 0 {
@@ -332,7 +327,9 @@ func (remote *Backend) LocalWriter(ctx context.Context, cfg *backend.LocalWriter
 			zap.Uint64("totalSize", s.TotalSize))
 	}
 	prefix := filepath.Join(strconv.Itoa(int(remote.jobID)), engineUUID.String())
-	writer := sharedisk.NewWriter(ctx, remote.externalStorage, prefix, remote.allocWriterID(), onClose)
+	writer := sharedisk.NewWriter(ctx, remote.externalStorage, prefix,
+		remote.allocWriterID(), remote.config.MemQuota, remote.config.StatSampleKeys,
+		remote.config.StatSampleSize, remote.config.WriteBatchSize, onClose)
 	remote.mu.Lock()
 	remote.mu.writers = append(remote.mu.writers, writer)
 	remote.mu.Unlock()
@@ -362,6 +359,16 @@ func (remote *Backend) handleWriterSummary(s *sharedisk.WriterSummary) {
 	remote.mu.totalSize += s.TotalSize
 }
 
+func (remote *Backend) CloseWriters(ctx context.Context) error {
+	for _, w := range remote.mu.writers {
+		_, err := w.Close(ctx)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 func (remote *Backend) GetSummary() (min, max kv.Key, totalSize uint64) {
 	remote.mu.Lock()
 	defer remote.mu.Unlock()
@@ -383,7 +390,13 @@ func (remote *Backend) GetRangeSplitter(ctx context.Context, totalKVSize uint64,
 		return nil, err
 	}
 	// TODO(tangenta): determine the max key and max ways.
-	maxSize := totalKVSize / uint64(instanceCnt)
+	var approxSubtaskCnt uint64
+	if remote.config.SubtaskCnt == -1 {
+		approxSubtaskCnt = uint64(instanceCnt)
+	} else {
+		approxSubtaskCnt = uint64(remote.config.SubtaskCnt)
+	}
+	maxSize := totalKVSize / approxSubtaskCnt
 	rs := sharedisk.NewRangeSplitter(maxSize, math.MaxUint64, math.MaxUint64, mergePropIter, dataFiles)
 	return rs, nil
 }
diff --git a/br/pkg/lightning/backend/sharedisk/sharedisk.go b/br/pkg/lightning/backend/sharedisk/sharedisk.go
index c5de3e670d255..ab63c14ab507a 100644
--- a/br/pkg/lightning/backend/sharedisk/sharedisk.go
+++ b/br/pkg/lightning/backend/sharedisk/sharedisk.go
@@ -28,7 +28,6 @@ import (
 	"github.com/pingcap/tidb/br/pkg/lightning/backend"
 	"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
 	"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
-	"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
 	"github.com/pingcap/tidb/br/pkg/lightning/common"
 	"github.com/pingcap/tidb/br/pkg/membuf"
 	"github.com/pingcap/tidb/br/pkg/storage"
@@ -135,11 +134,6 @@ type Engine struct {
 	rc *RangePropertiesCollector
 }
 
-var WriteBatchSize = 8 * 1024
-var MemQuota = 1024 * 1024 * 1024
-var SizeDist = 1024 * 1024
-var KeyDist = 8 * 1024
-
 type WriterSummary struct {
 	WriterID  int
 	Seq       int
@@ -153,27 +147,23 @@ type OnCloseFunc func(summary *WriterSummary)
 func DummyOnCloseFunc(*WriterSummary) {}
 
 func NewWriter(ctx context.Context, externalStorage storage.ExternalStorage,
-	prefix string, writerID int, onClose OnCloseFunc) *Writer {
-	// TODO(tangenta): make it configurable.
-	engine := NewEngine(uint64(SizeDist), uint64(KeyDist))
-	pool := membuf.NewPool()
+	prefix string, writerID int, memSizeLimit uint64, keyDist int64, sizeDist uint64, writeBatchSize int64,
+	onClose OnCloseFunc) *Writer {
+	engine := NewEngine(sizeDist, uint64(keyDist))
 	filePrefix := filepath.Join(prefix, strconv.Itoa(writerID))
 	return &Writer{
-		ctx:               ctx,
-		engine:            engine,
-		memtableSizeLimit: MemQuota,
-		keyAdapter:        &local.NoopKeyAdapter{},
-		exStorage:         externalStorage,
-		memBufPool:        pool,
-		kvBuffer:          pool.NewBuffer(),
-		writeBatch:        make([]common.KvPair, 0, WriteBatchSize),
-		currentSeq:        0,
-		tikvCodec:         keyspace.CodecV1,
-		filenamePrefix:    filePrefix,
-		writerID:          writerID,
-		kvStore:           nil,
-		onClose:           onClose,
-		closed:            false,
+		ctx:            ctx,
+		engine:         engine,
+		memSizeLimit:   memSizeLimit,
+		exStorage:      externalStorage,
+		writeBatch:     make([]common.KvPair, 0, writeBatchSize),
+		currentSeq:     0,
+		tikvCodec:      keyspace.CodecV1,
+		filenamePrefix: filePrefix,
+		writerID:       writerID,
+		kvStore:        nil,
+		onClose:        onClose,
+		closed:         false,
 	}
 }
 
@@ -181,16 +171,14 @@ func NewWriter(ctx context.Context, externalStorage storage.ExternalStorage,
 type Writer struct {
 	ctx context.Context
 	sync.Mutex
-	engine            *Engine
-	memtableSizeLimit int
-	keyAdapter        local.KeyAdapter
-	exStorage         storage.ExternalStorage
+	engine       *Engine
+	memSizeLimit uint64
+	exStorage    storage.ExternalStorage
 
 	// bytes buffer for writeBatch
 	memBufPool *membuf.Pool
-	kvBuffer   *membuf.Buffer
 	writeBatch []common.KvPair
-	batchSize  int
+	batchSize  uint64
 
 	currentSeq int
 	onClose    OnCloseFunc
@@ -224,14 +212,10 @@ func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows enco
 	w.Lock()
 	defer w.Unlock()
 
-	keyAdapter := w.keyAdapter
 	for _, pair := range kvs {
-		w.batchSize += len(pair.Key) + len(pair.Val)
-		buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
-		key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
-		val := w.kvBuffer.AddBytes(pair.Val)
-		w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
-		if w.batchSize >= w.memtableSizeLimit {
+		w.batchSize += uint64(len(pair.Key) + len(pair.Val))
+		w.writeBatch = append(w.writeBatch, common.KvPair{Key: pair.Key, Val: pair.Val})
+		if w.batchSize >= w.memSizeLimit {
 			if err := w.flushKVs(ctx); err != nil {
 				return err
 			}
@@ -252,7 +236,6 @@ func (w *Writer) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
 	logutil.BgLogger().Info("close writer", zap.Int("writerID", w.writerID),
 		zap.String("minKey", hex.EncodeToString(w.minKey)), zap.String("maxKey", hex.EncodeToString(w.maxKey)))
 	w.closed = true
-	defer w.memBufPool.Destroy()
 	err := w.flushKVs(ctx)
 	if err != nil {
 		return status(false), err
@@ -363,7 +346,6 @@ func (w *Writer) flushKVs(ctx context.Context) error {
 	w.recordMinMax(w.writeBatch[0].Key, w.writeBatch[len(w.writeBatch)-1].Key, size)
 
 	w.writeBatch = w.writeBatch[:0]
-	w.kvBuffer.Reset()
 	w.batchSize = 0
 	//w.engine.rc.reset()
 	return nil
diff --git a/br/pkg/lightning/backend/sharedisk/sharedisk_test.go b/br/pkg/lightning/backend/sharedisk/sharedisk_test.go
index f83c443ea791b..2ef4576afcb64 100644
--- a/br/pkg/lightning/backend/sharedisk/sharedisk_test.go
+++ b/br/pkg/lightning/backend/sharedisk/sharedisk_test.go
@@ -49,11 +49,14 @@ func TestWriter(t *testing.T) {
 	err = cleanupFiles(ctx, storage, "jobID/engineUUID")
 	require.NoError(t, err)
 
-	writer := NewWriter(context.Background(), storage, "jobID/engineUUID", 0, DummyOnCloseFunc)
-
-	pool := membuf.NewPool()
-	defer pool.Destroy()
-	writer.kvBuffer = pool.NewBuffer()
+	const (
+		memLimit       uint64 = 64 * 1024 * 1024
+		sizeDist       uint64 = 1024 * 1024
+		keyDist               = 8 * 1024
+		writeBatchSize        = 8 * 1024
+	)
+	writer := NewWriter(context.Background(), storage, "jobID/engineUUID", 0, memLimit,
+		keyDist, sizeDist, writeBatchSize, DummyOnCloseFunc)
 
 	var kvs []common.KvPair
 	value := make([]byte, 128)
@@ -157,8 +160,12 @@ func TestWriterPerf(t *testing.T) {
 	var valueSize = 1000
 	var rowCnt = 2000
 	var readBufferSize = 64 * 1024
-	//var memLimit = 64 * 1024 * 1024
-	//MemQuota = memLimit
+	const (
+		memLimit       uint64 = 64 * 1024 * 1024
+		keyDist               = 8 * 1024
+		sizeDist       uint64 = 1024 * 1024
+		writeBatchSize        = 8 * 1024
+	)
 
 	bucket := "globalsorttest"
 	prefix := "tools_test_data/sharedisk"
@@ -174,12 +181,11 @@ func TestWriterPerf(t *testing.T) {
 	storage, err := storage2.New(context.Background(), backend, &storage2.ExternalStorageOptions{})
 	require.NoError(t, err)
 
-	writer := NewWriter(context.Background(), storage, "test", 0, DummyOnCloseFunc)
+	writer := NewWriter(context.Background(), storage, "test", 0, memLimit, keyDist, sizeDist, writeBatchSize, DummyOnCloseFunc)
 	writer.filenamePrefix = "test"
 
 	pool := membuf.NewPool()
 	defer pool.Destroy()
-	writer.kvBuffer = pool.NewBuffer()
 	defer writer.Close(ctx)
 
 	var startMemory runtime.MemStats
diff --git a/ddl/scheduler.go b/ddl/scheduler.go
index 028fa94b7c4cd..8a4ac685e98c6 100644
--- a/ddl/scheduler.go
+++ b/ddl/scheduler.go
@@ -363,6 +363,10 @@ func (b *backfillSchedulerHandle) OnSubtaskFinished(ctx context.Context, meta []
 			if err != nil {
 				return nil, err
 			}
+			err = bc.CloseWriters(ctx)
+			if err != nil {
+				return nil, err
+			}
 			subtaskMeta.MinKey, subtaskMeta.MaxKey, subtaskMeta.TotalKVSize = bc.GetSummary()
 			log.FromContext(ctx).Info("get key boundary on subtask finished",
 				zap.String("min", hex.EncodeToString(subtaskMeta.MinKey)),