-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathread_import_workload.go
270 lines (253 loc) · 8.7 KB
/
read_import_workload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package importccl
import (
"context"
"net/url"
"runtime"
"strings"
"sync/atomic"
"unsafe"
"github.com/cockroachdb/apd/v2"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
)
type workloadReader struct {
evalCtx *tree.EvalContext
table *sqlbase.ImmutableTableDescriptor
kvCh chan row.KVBatch
}
var _ inputConverter = &workloadReader{}
func newWorkloadReader(
kvCh chan row.KVBatch, table *sqlbase.ImmutableTableDescriptor, evalCtx *tree.EvalContext,
) *workloadReader {
return &workloadReader{evalCtx: evalCtx, table: table, kvCh: kvCh}
}
func (w *workloadReader) start(ctx ctxgroup.Group) {
}
// makeDatumFromColOffset tries to fast-path a few workload-generated types into
// directly datums, to dodge making a string and then the parsing it.
func makeDatumFromColOffset(
alloc *sqlbase.DatumAlloc, hint *types.T, evalCtx *tree.EvalContext, col coldata.Vec, rowIdx int,
) (tree.Datum, error) {
if col.Nulls().NullAt(rowIdx) {
return tree.DNull, nil
}
switch t := col.Type(); col.CanonicalTypeFamily() {
case types.BoolFamily:
return tree.MakeDBool(tree.DBool(col.Bool()[rowIdx])), nil
case types.IntFamily:
switch t.Width() {
case 0, 64:
switch hint.Family() {
case types.IntFamily:
return alloc.NewDInt(tree.DInt(col.Int64()[rowIdx])), nil
case types.DecimalFamily:
d := *apd.New(col.Int64()[rowIdx], 0)
return alloc.NewDDecimal(tree.DDecimal{Decimal: d}), nil
case types.DateFamily:
date, err := pgdate.MakeDateFromUnixEpoch(col.Int64()[rowIdx])
if err != nil {
return nil, err
}
return alloc.NewDDate(tree.DDate{Date: date}), nil
}
case 16:
switch hint.Family() {
case types.IntFamily:
return alloc.NewDInt(tree.DInt(col.Int16()[rowIdx])), nil
}
}
case types.FloatFamily:
switch hint.Family() {
case types.FloatFamily:
return alloc.NewDFloat(tree.DFloat(col.Float64()[rowIdx])), nil
case types.DecimalFamily:
var d apd.Decimal
if _, err := d.SetFloat64(col.Float64()[rowIdx]); err != nil {
return nil, err
}
return alloc.NewDDecimal(tree.DDecimal{Decimal: d}), nil
}
case types.BytesFamily:
switch hint.Family() {
case types.BytesFamily:
return alloc.NewDBytes(tree.DBytes(col.Bytes().Get(rowIdx))), nil
case types.StringFamily:
data := col.Bytes().Get(rowIdx)
str := *(*string)(unsafe.Pointer(&data))
return alloc.NewDString(tree.DString(str)), nil
default:
data := col.Bytes().Get(rowIdx)
str := *(*string)(unsafe.Pointer(&data))
return sqlbase.ParseDatumStringAs(hint, str, evalCtx)
}
}
return nil, errors.Errorf(
`don't know how to interpret %s column as %s`, col.Type(), hint)
}
func (w *workloadReader) readFiles(
ctx context.Context,
dataFiles map[int32]string,
_ map[int32]int64,
_ roachpb.IOFileFormat,
_ cloud.ExternalStorageFactory,
_ string,
) error {
wcs := make([]*WorkloadKVConverter, 0, len(dataFiles))
for fileID, fileName := range dataFiles {
file, err := url.Parse(fileName)
if err != nil {
return err
}
conf, err := cloudimpl.ParseWorkloadConfig(file)
if err != nil {
return err
}
meta, err := workload.Get(conf.Generator)
if err != nil {
return err
}
// Different versions of the workload could generate different data, so
// disallow this.
if meta.Version != conf.Version {
return errors.Errorf(
`expected %s version "%s" but got "%s"`, meta.Name, conf.Version, meta.Version)
}
gen := meta.New()
if f, ok := gen.(workload.Flagser); ok {
flags := f.Flags()
if err := flags.Parse(conf.Flags); err != nil {
return errors.Wrapf(err, `parsing parameters %s`, strings.Join(conf.Flags, ` `))
}
}
var t workload.Table
for _, tbl := range gen.Tables() {
if tbl.Name == conf.Table {
t = tbl
break
}
}
if t.Name == `` {
return errors.Wrapf(err, `unknown table %s for generator %s`, conf.Table, meta.Name)
}
wc := NewWorkloadKVConverter(
fileID, w.table, t.InitialRows, int(conf.BatchBegin), int(conf.BatchEnd), w.kvCh)
wcs = append(wcs, wc)
}
for _, wc := range wcs {
if err := ctxgroup.GroupWorkers(ctx, runtime.NumCPU(), func(ctx context.Context, _ int) error {
evalCtx := w.evalCtx.Copy()
return wc.Worker(ctx, evalCtx)
}); err != nil {
return err
}
}
return nil
}
// WorkloadKVConverter converts workload.BatchedTuples to []roachpb.KeyValues.
type WorkloadKVConverter struct {
tableDesc *sqlbase.ImmutableTableDescriptor
rows workload.BatchedTuples
batchIdxAtomic int64
batchEnd int
kvCh chan row.KVBatch
// For progress reporting
fileID int32
totalBatches float32
finishedBatchesAtomic int64
}
// NewWorkloadKVConverter returns a WorkloadKVConverter for the given table and
// range of batches, emitted converted kvs to the given channel.
func NewWorkloadKVConverter(
fileID int32,
tableDesc *sqlbase.ImmutableTableDescriptor,
rows workload.BatchedTuples,
batchStart, batchEnd int,
kvCh chan row.KVBatch,
) *WorkloadKVConverter {
return &WorkloadKVConverter{
tableDesc: tableDesc,
rows: rows,
batchIdxAtomic: int64(batchStart) - 1,
batchEnd: batchEnd,
kvCh: kvCh,
totalBatches: float32(batchEnd - batchStart),
fileID: fileID,
}
}
// Worker can be called concurrently to create multiple workers to process
// batches in order. This keeps concurrently running workers ~adjacent batches
// at any given moment (as opposed to handing large ranges of batches to each
// worker, e.g. 0-999 to worker 1, 1000-1999 to worker 2, etc). This property is
// relevant when ordered workload batches produce ordered PK data, since the
// workers feed into a shared kvCH so then contiguous blocks of PK data will
// usually be buffered together and thus batched together in the SST builder,
// minimzing the amount of overlapping SSTs ingested.
//
// This worker needs its own EvalContext and DatumAlloc.
func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalContext) error {
conv, err := row.NewDatumRowConverter(ctx, w.tableDesc, nil /* targetColNames */, evalCtx, nil, w.kvCh)
if err != nil {
return err
}
conv.KvBatch.Source = w.fileID
conv.FractionFn = func() float32 {
return float32(atomic.LoadInt64(&w.finishedBatchesAtomic)) / w.totalBatches
}
var alloc sqlbase.DatumAlloc
var a bufalloc.ByteAllocator
cb := coldata.NewMemBatchWithCapacity(nil /* typs */, 0 /* capacity */, coldata.StandardColumnFactory)
for {
batchIdx := int(atomic.AddInt64(&w.batchIdxAtomic, 1))
if batchIdx >= w.batchEnd {
break
}
a = a[:0]
w.rows.FillBatch(batchIdx, cb, &a)
for rowIdx, numRows := 0, cb.Length(); rowIdx < numRows; rowIdx++ {
for colIdx, col := range cb.ColVecs() {
// TODO(dan): This does a type switch once per-datum. Reduce this to
// a one-time switch per column.
converted, err := makeDatumFromColOffset(
&alloc, conv.VisibleColTypes[colIdx], evalCtx, col, rowIdx)
if err != nil {
return err
}
conv.Datums[colIdx] = converted
}
// `conv.Row` uses these as arguments to GenerateUniqueID to generate
// hidden primary keys, when necessary. We want them to be ascending per
// batch (to reduce overlap in the resulting kvs) and non-conflicting
// (because of primary key uniqueness). The ids that come out of
// GenerateUniqueID are sorted by (fileIdx, timestamp) and unique as long
// as the two inputs are a unique combo, so using the index of the batch
// within the table and the index of the row within the batch should do
// what we want.
fileIdx, timestamp := int32(batchIdx), int64(rowIdx)
// TODO (Anzoteh96): the job field might need to be populated here if we're
// a nextval() default expression is involved here.
if err := conv.Row(ctx, fileIdx, timestamp, nil /* job */); err != nil {
return err
}
}
atomic.AddInt64(&w.finishedBatchesAtomic, 1)
}
return conv.SendBatch(ctx)
}