-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
replica_sst_snapshot_storage.go
263 lines (242 loc) · 7.19 KB
/
replica_sst_snapshot_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"context"
"fmt"
"path/filepath"
"strconv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"golang.org/x/time/rate"
)
// SSTSnapshotStorage provides an interface to create scratches and owns the
// directory of scratches created. A scratch manages the SSTs created during a
// specific snapshot.
type SSTSnapshotStorage struct {
engine storage.Engine
limiter *rate.Limiter
dir string
mu struct {
syncutil.Mutex
ranges map[roachpb.RangeID]int
}
}
// NewSSTSnapshotStorage creates a new SST snapshot storage.
func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnapshotStorage {
return SSTSnapshotStorage{
engine: engine,
limiter: limiter,
dir: filepath.Join(engine.GetAuxiliaryDir(), "sstsnapshot"),
}
}
// Init initializes the SSTSnapshotStorage struct.
func (s *SSTSnapshotStorage) Init() {
s.mu.ranges = make(map[roachpb.RangeID]int)
}
// NewScratchSpace creates a new storage scratch space for SSTs for a specific
// snapshot.
func (s *SSTSnapshotStorage) NewScratchSpace(
rangeID roachpb.RangeID, snapUUID uuid.UUID,
) *SSTSnapshotStorageScratch {
s.mu.Lock()
rangeStorage := s.mu.ranges[rangeID]
if rangeStorage == 0 {
s.mu.ranges[rangeID] = 1
}
s.mu.Unlock()
snapDir := filepath.Join(s.dir, strconv.Itoa(int(rangeID)), snapUUID.String())
return &SSTSnapshotStorageScratch{
storage: s,
rangeID: rangeID,
snapDir: snapDir,
}
}
// Clear removes all created directories and SSTs.
func (s *SSTSnapshotStorage) Clear() error {
return s.engine.RemoveAll(s.dir)
}
// scratchClosed is called when an SSTSnapshotStorageScratch created by this
// SSTSnapshotStorage is closed. This method handles any
func (s *SSTSnapshotStorage) scratchClosed(rangeID roachpb.RangeID) {
s.mu.Lock()
defer s.mu.Unlock()
val := s.mu.ranges[rangeID]
if val <= 0 {
panic("inconsistent scratch ref count")
}
val--
s.mu.ranges[rangeID] = val
if val == 0 {
delete(s.mu.ranges, rangeID)
// Suppressing an error here is okay, as orphaned directories are at worst
// a performance issue when we later walk directories in pebble.Capacity()
// but not a correctness issue.
_ = s.engine.RemoveAll(filepath.Join(s.dir, strconv.Itoa(int(rangeID))))
}
}
// SSTSnapshotStorageScratch keeps track of the SST files incrementally created
// when receiving a snapshot. Each scratch is associated with a specific
// snapshot.
type SSTSnapshotStorageScratch struct {
storage *SSTSnapshotStorage
rangeID roachpb.RangeID
ssts []string
snapDir string
dirCreated bool
closed bool
}
func (s *SSTSnapshotStorageScratch) filename(id int) string {
return filepath.Join(s.snapDir, fmt.Sprintf("%d.sst", id))
}
func (s *SSTSnapshotStorageScratch) createDir() error {
err := s.storage.engine.MkdirAll(s.snapDir)
s.dirCreated = s.dirCreated || err == nil
return err
}
// NewFile adds another file to SSTSnapshotStorageScratch. This file is lazily
// created when the file is written to the first time. A nonzero value for
// bytesPerSync will sync dirty data periodically as it is written. The syncing
// does not provide persistency guarantees, but is used to smooth out disk
// writes. Sync() must be called for data persistence.
func (s *SSTSnapshotStorageScratch) NewFile(
ctx context.Context, bytesPerSync int64,
) (*SSTSnapshotStorageFile, error) {
if s.closed {
panic("closed")
}
id := len(s.ssts)
filename := s.filename(id)
s.ssts = append(s.ssts, filename)
f := &SSTSnapshotStorageFile{
scratch: s,
filename: filename,
ctx: ctx,
bytesPerSync: bytesPerSync,
}
return f, nil
}
// WriteSST writes SST data to a file. The method closes
// the provided SST when it is finished using it. If the provided SST is empty,
// then no file will be created and nothing will be written.
func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) error {
if s.closed {
panic("closed")
}
if len(data) == 0 {
return nil
}
f, err := s.NewFile(ctx, 512<<10 /* 512 KB */)
if err != nil {
return err
}
defer func() {
// Closing an SSTSnapshotStorageFile multiple times is idempotent. Nothing
// actionable if closing fails.
_ = f.Close()
}()
if _, err := f.Write(data); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
return f.Close()
}
// SSTs returns the names of the files created.
func (s *SSTSnapshotStorageScratch) SSTs() []string {
return s.ssts
}
// Close removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Close() error {
if s.closed {
return nil
}
s.closed = true
defer s.storage.scratchClosed(s.rangeID)
return s.storage.engine.RemoveAll(s.snapDir)
}
// SSTSnapshotStorageFile is an SST file managed by a
// SSTSnapshotStorageScratch.
type SSTSnapshotStorageFile struct {
scratch *SSTSnapshotStorageScratch
created bool
file fs.File
filename string
ctx context.Context
bytesPerSync int64
}
func (f *SSTSnapshotStorageFile) ensureFile() error {
if f.created {
if f.file == nil {
return errors.Errorf("file has already been closed")
}
return nil
}
if !f.scratch.dirCreated {
if err := f.scratch.createDir(); err != nil {
return err
}
}
if f.scratch.closed {
panic("scratch closed")
}
var err error
if f.bytesPerSync > 0 {
f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
} else {
f.file, err = f.scratch.storage.engine.Create(f.filename)
}
if err != nil {
return err
}
f.created = true
return nil
}
// Write writes contents to the file while respecting the limiter passed into
// SSTSnapshotStorageScratch. Writing empty contents is okay and is treated as
// a noop. The file must have not been closed.
func (f *SSTSnapshotStorageFile) Write(contents []byte) (int, error) {
if len(contents) == 0 {
return 0, nil
}
if err := f.ensureFile(); err != nil {
return 0, err
}
if err := limitBulkIOWrite(f.ctx, f.scratch.storage.limiter, len(contents)); err != nil {
return 0, err
}
return f.file.Write(contents)
}
// Close closes the file. Calling this function multiple times is idempotent.
// The file must have been written to before being closed.
func (f *SSTSnapshotStorageFile) Close() error {
// We throw an error for empty files because it would be an error to ingest
// an empty SST so catch this error earlier.
if !f.created {
return errors.New("file is empty")
}
if f.file == nil {
return nil
}
if err := f.file.Close(); err != nil {
return err
}
f.file = nil
return nil
}
// Sync syncs the file to disk. Implements writeCloseSyncer in engine.
func (f *SSTSnapshotStorageFile) Sync() error {
return f.file.Sync()
}