-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
importccl: [WIP] support nextval and currval as default expression #52910
Conversation
b52df26
to
1e9aa59
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tagging @cockroachdb/bulk-prs for an early review (the skeleton is ready, but the details might not be fully fleshed).
So the whole task is broken down into two (focusing on nextval
)
-
Ask for chunks, store the chunk as part of evalCtx annotations. When asking for more chunks we'll need to update the status of the sequence value, which is covered
kv.IncrementRetryable
. -
These chunks have to be communicated back and forth during job progress saving. Therefore, I've stored this information (I wrapped it under
DefaultValueMetaData
just in case there are other metadata needed to store as we support more default expressions for import later on) as part of ImportProgress, ReadImportDataSpec, and RemoteProducerMetaData. Here's how I think it should work (bottom line: after those things the same time as we update the resume position, or last row).
-> At the start of import, importProgress
relays information on chunks (DefaultValueMetaData) (along with resume position) to ReadImportDataSpec
.
-> The DefaultValueMetaData
from ReadImportDataSpec
is then relayed to importCtx
, which will then be relayed to DatumRowConverter
.
-> As we encounter more instances of nextval, these metadata is updated (with more chunks being written).
-> This metadata is periodically stored to KVBatches (together with resume position).
-> As we update progress (with involves updating resume pos), we need to send these chunks from KVBatches to RemoteProducerMetaData.
-> Finally, RemoteProducerMetaData relays this information back to ImportProgress.
More details in comments below.
Reviewable status: complete! 0 of 0 LGTMs obtained
pkg/ccl/importccl/import_processor.go, line 154 at r3 (raw file):
return newCSVInputReader( kvCh, spec.Format.Csv, spec.WalltimeNanos, int(spec.ReaderParallelism), singleTable, singleTableTargetCols, evalCtx, spec.DefaultExprMetaData), nil
Note: DefaultExprMetaData
is used as an argument for for creating input readers so that this information can be stored in importCtx. I haven't done that for dump file formats (where creation of row converter happens in the newReader function), though.
pkg/ccl/importccl/import_processor.go, line 262 at r3 (raw file):
for i, emitted := range writtenRow { atomic.StoreInt64(&pkFlushedRow[i], emitted) pkDefaultExprMetaData[i] = tempDefaultExprMetaData[i]
Here's something that troubles me: atomic
can be used to store the flushed rows, but defaultExprMetaData is a (rather complicated) struct and cannot be directly used here. How should we approach this?
pkg/ccl/importccl/read_import_base.go, line 72 at r3 (raw file):
evalCtx := flowCtx.NewEvalCtx() evalCtx.DB = flowCtx.Cfg.DB
Getting this DB populated here as we need this to create ID-to-SequenceTable correspondence in rowConverter.
pkg/jobs/jobspb/jobs.proto, line 228 at r3 (raw file):
sequenceChunkMap sequenceMap = 1; }
Here's another thing that I think it's worth grabbing attention: goproto doesn't seem to have a facility to do map<int32, repeated chunkInfo> (or anything like that) so whenever I have to refer to a chunk in a defaultExprMetaData I have to do
defMetaData.sequenceMap.Chunks[file_id].Chunks[chunk_index]
which seems unnecessarily long.
pkg/sql/distsql_plan_csv.go, line 291 at r3 (raw file):
} for i, v := range meta.BulkProcessorProgress.DefaultExprMetaData { defaultExprMetaData[i] = v
Again this is a place I wish there's an atomic operation available.
pkg/sql/rowexec/bulk_row_writer.go, line 104 at r3 (raw file):
var g ctxgroup.Group // TODO (anzoteh96): try populating defaultValueMetaData.
Note: here I'm not too sure what would the defaultValueMetaData
be.
5d38ab4
to
a69329b
Compare
8fae480
to
faef143
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an update on the state of this PR:
-
The tests pass locally, but somehow on CI it fails on
TestImportDefaultWithResume/nextval
with 10 extra rows so I think it's worth looking for. -
Something worth fixing is how do we effectively communicate the
defaultExprMetaData
(which contains the sequence chunks) between the job progress struct and the actually import job itself (say, rowConverter). Same goes to theimportJob
itself. The current way is to populate them as arguments when calling rowConverter but this won't be a great idea for the following reasons:
-- (a) some other places call these functions, eithernewDatumRowConverter
orrowConverter.Row()
.
-- (b) for dump file reader / converter, I can't seem to find a good way to populate these arguments (though, arguably, for the dump files thesenextval
values have been automatically produced within the dump files itself.
More comments below.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt and @pbardea)
pkg/ccl/importccl/import_processor.go, line 171 at r4 (raw file):
return newAvroInputReader( kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos, int(spec.ReaderParallelism), evalCtx, spec.DefaultExprMetaData, job)
Here the DefaultExprMetaData and job are populated for non-dump files but not dump files.
pkg/ccl/importccl/read_import_base.go, line 418 at r4 (raw file):
kvCh chan row.KVBatch // Channel for sending KV batches. job *jobs.Job // Job (of this import) to be populated into row converter. defaultValueMetaData map[int32]*jobspb.DefaultExprMetaData // Metadata of default values to be communicated.
Here, we put both job
and defaultValueMetaData
as fields for parallelImportContext
where the non-dump files will use.
pkg/ccl/importccl/read_import_workload.go, line 222 at r4 (raw file):
// This worker needs its own EvalContext and DatumAlloc. func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalContext) error { conv, err := row.NewDatumRowConverter(ctx, w.tableDesc, nil /* targetColNames */, evalCtx, nil, w.kvCh)
This is another place where we have to populate nil
for defaultExprMetaData
-> unless there's an efficient way to retrieve that information.
pkg/sql/row/expr_walker.go, line 238 at r4 (raw file):
return nil, errors.Newf( "job needs to be injected into evalCtx for allocation of chunks") }
Just some thought: are we supposed to be able to process a non-targeted column with default expression nextval
without the presence of a job (say, during an import?)
pkg/sql/row/row_converter.go, line 436 at r4 (raw file):
// if necessary. func (c *DatumRowConverter) Row( ctx context.Context, sourceID int32, rowIndex int64, job *jobs.Job,
Note: job
information is populated for every row.
These changes went in with #56473. Thanks again! |
This PR follows up from #50295 in supporting default expressions, but for the function
nextval()
andcurrval()
.(This is WIP and I'll think of the description later on.)