-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
restore_data_processor.go
193 lines (168 loc) · 5.9 KB
/
restore_data_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package backupccl
import (
"context"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/kv"
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
gogotypes "github.com/gogo/protobuf/types"
)
// Progress is streamed to the coordinator through metadata.
var restoreDataOutputTypes = []*types.T{}
type restoreDataProcessor struct {
execinfra.ProcessorBase
flowCtx *execinfra.FlowCtx
spec execinfrapb.RestoreDataSpec
input execinfra.RowSource
output execinfra.RowReceiver
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
lastErr error
alloc *rowenc.DatumAlloc
kr *storageccl.KeyRewriter
}
var _ execinfra.Processor = &restoreDataProcessor{}
var _ execinfra.RowSource = &restoreDataProcessor{}
// OutputTypes implements the execinfra.Processor interface.
func (rd *restoreDataProcessor) OutputTypes() []*types.T {
return restoreDataOutputTypes
}
func newRestoreDataProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.RestoreDataSpec,
post *execinfrapb.PostProcessSpec,
input execinfra.RowSource,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
rd := &restoreDataProcessor{
flowCtx: flowCtx,
input: input,
spec: spec,
output: output,
}
if err := rd.Init(rd, post, rd.OutputTypes(), flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata {
rd.close()
return nil
},
}); err != nil {
return nil, err
}
return rd, nil
}
func (rd *restoreDataProcessor) Start(ctx context.Context) context.Context {
ctx = rd.StartInternal(ctx, "restore-data")
ctx, span := tracing.ChildSpan(ctx, "restoreDataProcessor")
defer tracing.FinishSpan(span)
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the goroutine returns.
rd.progCh = make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
rd.input.Start(ctx)
var err error
rd.kr, err = storageccl.MakeKeyRewriterFromRekeys(rd.spec.Rekeys)
if err != nil {
log.Error(ctx, err.Error())
}
rd.alloc = &rowenc.DatumAlloc{}
return ctx
}
// Run implements the execinfra.Processor interface.
func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for rd.State == execinfra.StateRunning {
// We read rows from the SplitAndScatter processor. We expect each row to
// contain 2 columns. The first is used to route the row to this processor,
// and the second contains the RestoreSpanEntry that we're interested in.
row, meta := rd.input.Next()
if meta != nil {
if meta.Err != nil {
rd.MoveToDraining(nil /* err */)
}
return nil, meta
}
if row == nil {
rd.MoveToDraining(nil /* err */)
break
}
if len(row) != 2 {
rd.MoveToDraining(errors.New("expected input rows to have exactly 2 columns"))
break
}
if err := row[1].EnsureDecoded(types.Bytes, rd.alloc); err != nil {
rd.MoveToDraining(err)
break
}
datum := row[1].Datum
entryDatumBytes, ok := datum.(*tree.DBytes)
if !ok {
rd.MoveToDraining(errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row))
break
}
var entry execinfrapb.RestoreSpanEntry
if err := protoutil.Unmarshal([]byte(*entryDatumBytes), &entry); err != nil {
rd.MoveToDraining(errors.Wrap(err, "un-marshaling restore span entry"))
break
}
newSpanKey, err := rewriteBackupSpanKey(rd.kr, entry.Span.Key)
if err != nil {
rd.MoveToDraining(errors.Wrap(err, "re-writing span key to import"))
break
}
log.VEventf(context.TODO(), 1 /* level */, "importing span %v", entry.Span)
importRequest := &roachpb.ImportRequest{
// Import is a point request because we don't want DistSender to split
// it. Assume (but don't require) the entire post-rewrite span is on the
// same range.
RequestHeader: roachpb.RequestHeader{Key: newSpanKey},
DataSpan: entry.Span,
Files: entry.Files,
EndTime: rd.spec.RestoreTime,
Rekeys: rd.spec.Rekeys,
Encryption: rd.spec.Encryption,
}
importRes, pErr := kv.SendWrapped(context.TODO(), rd.flowCtx.Cfg.DB.NonTransactionalSender(), importRequest)
if pErr != nil {
rd.MoveToDraining(errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan))
break
}
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
progDetails := RestoreProgress{}
progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, rd.spec.PKIDs)
progDetails.ProgressIdx = entry.ProgressIdx
progDetails.DataSpan = entry.Span
details, err := gogotypes.MarshalAny(&progDetails)
if err != nil {
rd.MoveToDraining(err)
break
}
prog.ProgressDetails = *details
return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}
}
return nil, rd.DrainHelper()
}
func (rd *restoreDataProcessor) ConsumerClosed() {
rd.close()
}
func (rd *restoreDataProcessor) close() {
rd.InternalClose()
}
func init() {
rowexec.NewRestoreDataProcessor = newRestoreDataProcessor
}