diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 611aebb8c123..f7ea3d6bd81d 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -501,6 +501,8 @@ type IncomingSnapshot struct { SnapUUID uuid.UUID // The storage interface for the underlying SSTs. SSTStorageScratch *SSTSnapshotStorageScratch + // Non-nil for small snapshots. + Batch storage.Batch // The Raft log entries for this snapshot. LogEntries [][]byte // The replica state at the time the snapshot was generated (never nil). @@ -835,77 +837,128 @@ func (r *Replica) applySnapshot( stats.subsumedReplicas.Sub(start).Seconds()*1000, ) } - ingestionLog := fmt.Sprintf( - "ingestion=%d@%0.0fms ", - len(inSnap.SSTStorageScratch.SSTs()), - stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000, - ) + var ingestionLog string + if inSnap.SSTStorageScratch != nil { + ingestionLog = fmt.Sprintf( + "ingestion=%d@%0.0fms ", + len(inSnap.SSTStorageScratch.SSTs()), + stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000, + ) + } log.Infof( ctx, "applied snapshot of type %s [%s%s%sid=%s index=%d]", inSnap.snapType, totalLog, subsumedReplicasLog, ingestionLog, inSnap.SnapUUID.Short(), snap.Metadata.Index, ) }(timeutil.Now()) - unreplicatedSSTFile := &storage.MemFile{} - unreplicatedSST := storage.MakeIngestionSSTWriter(unreplicatedSSTFile) - defer unreplicatedSST.Close() - - // Clearing the unreplicated state. - unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) - unreplicatedStart := unreplicatedPrefixKey - unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() - if err = unreplicatedSST.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil { - return errors.Wrapf(err, "error clearing range of unreplicated SST writer") - } - - // Update HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { - return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") - } - - // Update Raft entries. var lastTerm uint64 var raftLogSize int64 - if len(inSnap.LogEntries) > 0 { - logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) - for i, bytes := range inSnap.LogEntries { - if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + if inSnap.SSTStorageScratch != nil { + unreplicatedSSTFile := &storage.MemFile{} + unreplicatedSST := storage.MakeIngestionSSTWriter(unreplicatedSSTFile) + defer unreplicatedSST.Close() + + // Clearing the unreplicated state. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) + unreplicatedStart := unreplicatedPrefixKey + unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() + if err = unreplicatedSST.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil { + return errors.Wrapf(err, "error clearing range of unreplicated SST writer") + } + + // Update HardState. + if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + + // Update Raft entries. + if len(inSnap.LogEntries) > 0 { + logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) + for i, bytes := range inSnap.LogEntries { + if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + return err + } + } + var sideloadedEntriesSize int64 + var err error + logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + if err != nil { return err } + raftLogSize += sideloadedEntriesSize + _, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries) + if err != nil { + return err + } + } else { + lastTerm = invalidLastTerm } - var sideloadedEntriesSize int64 - var err error - logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) - if err != nil { - return err + r.store.raftEntryCache.Drop(r.RangeID) + + // Update TruncatedState if it is unreplicated. + if inSnap.UsesUnreplicatedTruncatedState { + if err := r.raftMu.stateLoader.SetRaftTruncatedState( + ctx, &unreplicatedSST, s.TruncatedState, + ); err != nil { + return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer") + } } - raftLogSize += sideloadedEntriesSize - _, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries) - if err != nil { + + if err := unreplicatedSST.Finish(); err != nil { return err } + if unreplicatedSST.DataSize > 0 { + // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than + // buffering in a MemFile first. + if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { + return err + } + } } else { - lastTerm = invalidLastTerm - } - r.store.raftEntryCache.Drop(r.RangeID) - - // Update TruncatedState if it is unreplicated. - if inSnap.UsesUnreplicatedTruncatedState { - if err := r.raftMu.stateLoader.SetRaftTruncatedState( - ctx, &unreplicatedSST, s.TruncatedState, - ); err != nil { - return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer") + // Clearing the unreplicated state. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) + unreplicatedStart := unreplicatedPrefixKey + unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() + if err = inSnap.Batch.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil { + return errors.Wrapf(err, "error clearing range of unreplicated SST writer") } - } - if err := unreplicatedSST.Finish(); err != nil { - return err - } - if unreplicatedSST.DataSize > 0 { - // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than - // buffering in a MemFile first. - if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { - return err + // Update HardState. + if err := r.raftMu.stateLoader.SetHardState(ctx, inSnap.Batch, hs); err != nil { + return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + + // Update Raft entries. + if len(inSnap.LogEntries) > 0 { + logEntries := make([]raftpb.Entry, len(inSnap.LogEntries)) + for i, bytes := range inSnap.LogEntries { + if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil { + return err + } + } + var sideloadedEntriesSize int64 + var err error + logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries) + if err != nil { + return err + } + raftLogSize += sideloadedEntriesSize + _, lastTerm, raftLogSize, err = r.append(ctx, inSnap.Batch, 0, invalidLastTerm, raftLogSize, logEntries) + if err != nil { + return err + } + } else { + lastTerm = invalidLastTerm + } + r.store.raftEntryCache.Drop(r.RangeID) + + // Update TruncatedState if it is unreplicated. + if inSnap.UsesUnreplicatedTruncatedState { + if err := r.raftMu.stateLoader.SetRaftTruncatedState( + ctx, inSnap.Batch, s.TruncatedState, + ); err != nil { + return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer") + } } } @@ -941,19 +994,30 @@ func (r *Replica) applySnapshot( // problematic, as it would prevent this store from ever having a new replica // of the removed range. In this case, however, it's copacetic, as subsumed // ranges _can't_ have new replicas. - if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil { + if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, inSnap.Batch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil { return err } stats.subsumedReplicas = timeutil.Now() // Ingest all SSTs atomically. - if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { - if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil { - return err + if inSnap.SSTStorageScratch != nil { + if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { + if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil { + return err + } + } + if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { + return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) + } + } else { + if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { + if err := fn(inSnap, inSnap.snapType, nil); err != nil { + return err + } + } + if err := inSnap.Batch.Commit(true /* sync */); err != nil { + return errors.Wrapf(err, "while ingesting batch") } - } - if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { - return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) } stats.ingestion = timeutil.Now() @@ -1068,6 +1132,7 @@ func (r *Replica) applySnapshot( func (r *Replica) clearSubsumedReplicaDiskData( ctx context.Context, scratch *SSTSnapshotStorageScratch, + batch storage.Batch, desc *roachpb.RangeDescriptor, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID, @@ -1087,31 +1152,44 @@ func (r *Replica) clearSubsumedReplicaDiskData( destroyReasonRemoved) sr.mu.Unlock() - // We have to create an SST for the subsumed replica's range-id local keys. - subsumedReplSSTFile := &storage.MemFile{} - subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile) - defer subsumedReplSST.Close() - // NOTE: We set mustClearRange to true because we are setting - // RangeTombstoneKey. Since Clears and Puts need to be done in increasing - // order of keys, it is not safe to use ClearRangeIter. - if err := sr.preDestroyRaftMuLocked( - ctx, - r.store.Engine(), - &subsumedReplSST, - subsumedNextReplicaID, - true, /* clearRangeIDLocalOnly */ - true, /* mustClearRange */ - ); err != nil { - subsumedReplSST.Close() - return err - } - if err := subsumedReplSST.Finish(); err != nil { - return err - } - if subsumedReplSST.DataSize > 0 { - // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than - // buffering in a MemFile first. - if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil { + if scratch != nil { + // We have to create an SST for the subsumed replica's range-id local keys. + subsumedReplSSTFile := &storage.MemFile{} + subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile) + defer subsumedReplSST.Close() + // NOTE: We set mustClearRange to true because we are setting + // RangeTombstoneKey. Since Clears and Puts need to be done in increasing + // order of keys, it is not safe to use ClearRangeIter. + if err := sr.preDestroyRaftMuLocked( + ctx, + r.store.Engine(), + &subsumedReplSST, + subsumedNextReplicaID, + true, /* clearRangeIDLocalOnly */ + true, /* mustClearRange */ + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := subsumedReplSST.Finish(); err != nil { + return err + } + if subsumedReplSST.DataSize > 0 { + // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than + // buffering in a MemFile first. + if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil { + return err + } + } + } else { + if err := sr.preDestroyRaftMuLocked( + ctx, + r.store.Engine(), + batch, + subsumedNextReplicaID, + true, /* clearRangeIDLocalOnly */ + true, /* mustClearRange */ + ); err != nil { return err } } @@ -1144,25 +1222,36 @@ func (r *Replica) clearSubsumedReplicaDiskData( // subsume both r1 and r2 in S1. for i := range keyRanges { if totalKeyRanges[i].End.Key.Compare(keyRanges[i].End.Key) > 0 { - subsumedReplSSTFile := &storage.MemFile{} - subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile) - defer subsumedReplSST.Close() - if err := storage.ClearRangeWithHeuristic( - r.store.Engine(), - &subsumedReplSST, - keyRanges[i].End.Key, - totalKeyRanges[i].End.Key, - ); err != nil { - subsumedReplSST.Close() - return err - } - if err := subsumedReplSST.Finish(); err != nil { - return err - } - if subsumedReplSST.DataSize > 0 { - // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than - // buffering in a MemFile first. - if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil { + if scratch != nil { + subsumedReplSSTFile := &storage.MemFile{} + subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile) + defer subsumedReplSST.Close() + if err := storage.ClearRangeWithHeuristic( + r.store.Engine(), + &subsumedReplSST, + keyRanges[i].End.Key, + totalKeyRanges[i].End.Key, + ); err != nil { + subsumedReplSST.Close() + return err + } + if err := subsumedReplSST.Finish(); err != nil { + return err + } + if subsumedReplSST.DataSize > 0 { + // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than + // buffering in a MemFile first. + if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil { + return err + } + } + } else { + if err := storage.ClearRangeWithHeuristic( + r.store.Engine(), + batch, + keyRanges[i].End.Key, + totalKeyRanges[i].End.Key, + ); err != nil { return err } } diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage.go b/pkg/kv/kvserver/replica_sst_snapshot_storage.go index 72b36aee6c27..90fecc882845 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage.go @@ -67,6 +67,7 @@ type SSTSnapshotStorageScratch struct { ssts []string snapDir string dirCreated bool + bytes int64 } func (s *SSTSnapshotStorageScratch) filename(id int) string { @@ -124,6 +125,11 @@ func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) e return f.Close() } +// Size returns the combined size of files created. +func (s *SSTSnapshotStorageScratch) Size() int64 { + return s.bytes +} + // SSTs returns the names of the files created. func (s *SSTSnapshotStorageScratch) SSTs() []string { return s.ssts @@ -142,6 +148,7 @@ type SSTSnapshotStorageFile struct { file fs.File filename string ctx context.Context + bytes int64 bytesPerSync int64 } @@ -180,6 +187,7 @@ func (f *SSTSnapshotStorageFile) Write(contents []byte) (int, error) { if err := f.ensureFile(); err != nil { return 0, err } + f.bytes += int64(len(contents)) limitBulkIOWrite(f.ctx, f.scratch.storage.limiter, len(contents)) return f.file.Write(contents) } @@ -199,6 +207,8 @@ func (f *SSTSnapshotStorageFile) Close() error { return err } f.file = nil + f.scratch.bytes += f.bytes + f.bytes = 0 return nil } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 5e8d4f427df1..9d4cc81dc3f5 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -125,6 +125,7 @@ type kvBatchSnapshotStrategy struct { sstChunkSize int64 // Only used on the receiver side. scratch *SSTSnapshotStorageScratch + batch storage.Batch } // multiSSTWriter is a wrapper around RocksDBSstFileWriter and @@ -240,11 +241,22 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // At the moment we'll write at most five SSTs. // TODO(jeffreyxiao): Re-evaluate as the default range size grows. keyRanges := rditer.MakeReplicatedKeyRanges(header.State.Desc) - msstw, err := newMultiSSTWriter(ctx, kvSS.scratch, keyRanges, kvSS.sstChunkSize) - if err != nil { - return noSnap, err + + var msstw multiSSTWriter + if kvSS.scratch != nil { + var err error + msstw, err = newMultiSSTWriter(ctx, kvSS.scratch, keyRanges, kvSS.sstChunkSize) + if err != nil { + return noSnap, err + } + defer msstw.Close() + } else { + for _, keyRange := range keyRanges { + if err := kvSS.batch.ClearRawRange(keyRange.Start.Key, keyRange.End.Key); err != nil { + return noSnap, sendSnapshotError(stream, err) + } + } } - defer msstw.Close() var logEntries [][]byte for { @@ -258,20 +270,26 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } if req.KVBatch != nil { - batchReader, err := storage.NewRocksDBBatchReader(req.KVBatch) - if err != nil { - return noSnap, errors.Wrap(err, "failed to decode batch") - } - // All operations in the batch are guaranteed to be puts. - for batchReader.Next() { - if batchReader.BatchType() != storage.BatchTypeValue { - return noSnap, errors.AssertionFailedf("expected type %d, found type %d", storage.BatchTypeValue, batchReader.BatchType()) - } - key, err := batchReader.EngineKey() + if kvSS.scratch != nil { + batchReader, err := storage.NewRocksDBBatchReader(req.KVBatch) if err != nil { - return noSnap, errors.Wrap(err, "failed to decode mvcc key") + return noSnap, errors.Wrap(err, "failed to decode batch") } - if err := msstw.Put(ctx, key, batchReader.Value()); err != nil { + // All operations in the batch are guaranteed to be puts. + for batchReader.Next() { + if batchReader.BatchType() != storage.BatchTypeValue { + return noSnap, errors.AssertionFailedf("expected type %d, found type %d", storage.BatchTypeValue, batchReader.BatchType()) + } + key, err := batchReader.EngineKey() + if err != nil { + return noSnap, errors.Wrap(err, "failed to decode mvcc key") + } + if err := msstw.Put(ctx, key, batchReader.Value()); err != nil { + return noSnap, err + } + } + } else { + if err := kvSS.batch.ApplyBatchRepr(req.KVBatch, false /* sync */); err != nil { return noSnap, err } } @@ -280,16 +298,17 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( logEntries = append(logEntries, req.LogEntries...) } if req.Final { - // We finished receiving all batches and log entries. It's possible that - // we did not receive any key-value pairs for some of the key ranges, but - // we must still construct SSTs with range deletion tombstones to remove - // the data. - if err := msstw.Finish(ctx); err != nil { - return noSnap, err + if kvSS.scratch != nil { + // We finished receiving all batches and log entries. It's possible that + // we did not receive any key-value pairs for some of the key ranges, but + // we must still construct SSTs with range deletion tombstones to remove + // the data. + if err := msstw.Finish(ctx); err != nil { + return noSnap, err + } + msstw.Close() } - msstw.Close() - snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "client error: invalid snapshot") @@ -300,6 +319,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( UsesUnreplicatedTruncatedState: header.UnreplicatedTruncatedState, SnapUUID: snapUUID, SSTStorageScratch: kvSS.scratch, + Batch: kvSS.batch, LogEntries: logEntries, State: &header.State, snapType: header.Type, @@ -315,7 +335,11 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( inSnap.String(), len(logEntries), expLen) } - kvSS.status = fmt.Sprintf("log entries: %d, ssts: %d", len(logEntries), len(kvSS.scratch.SSTs())) + ssts := 0 + if kvSS.scratch != nil { + ssts = len(kvSS.scratch.SSTs()) + } + kvSS.status = fmt.Sprintf("log entries: %d, ssts: %d", len(logEntries), ssts) return inSnap, nil } } @@ -515,6 +539,9 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err) } } + if kvSS.batch != nil { + kvSS.batch.Close() + } } // reserveSnapshot throttles incoming snapshots. The returned closure is used @@ -794,14 +821,24 @@ func (s *Store) receiveSnapshot( var ss snapshotStrategy switch header.Strategy { case SnapshotRequest_KV_BATCH: - snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) - if err != nil { - err = errors.Wrap(err, "invalid snapshot") - return sendSnapshotError(stream, err) + useSSTs := header.State.Stats.Total() > 4<<20 /* 4 MB */ + + var scratch *SSTSnapshotStorageScratch + var batch storage.Batch + if useSSTs { + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return sendSnapshotError(stream, err) + } + scratch = s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID) + } else { + batch = s.engine.NewUnindexedBatch(true /* writeOnly */) } ss = &kvBatchSnapshotStrategy{ - scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID), + scratch: scratch, + batch: batch, sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), } defer ss.Close(ctx)