-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
importccl: add nextval
support for IMPORT INTO CSV
#56473
Conversation
Big h/t to @Anzoteh96 for fleshing out a lot of the semantics in #52910. |
@dt wanted to get you input on the decision to plumb the job running the import to the processors. @miretskiy raised a valid concern that has been bugging me as well, that this is the first of its kind (i think) in that the processors are directly interacting with the jobs table to write their progress. The usual pattern being streaming back stuff to the coordinator and allowing a single node to do all the updating.
The other option is to have a two-way request/response interaction between the processors and the coordinator. The processor asks the coordinator to write to the job's progress via a channel, the coord acks on completion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to decide what to do w/ the direct jobs access in the processor implementation. I'm not a fan; but let's see what @dt says.
For now, I have some comments/suggestions on the existing code.
Reviewed 14 of 18 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/ccl/importccl/import_stmt_test.go, line 3729 at r1 (raw file):
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ // Arrange for our special job resumer to be // returned the very first time we start the import.
Probably drop "the very first time" part .
Why do you need this resume at all if you're not overriding the actual resumer in any way?
pkg/ccl/importccl/import_stmt_test.go, line 3743 at r1 (raw file):
Quoted 7 lines of code…
var data string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { _, _ = w.Write([]byte(data)) } })) defer srv.Close()
this doesn't appear to be used?
pkg/ccl/importccl/import_stmt_test.go, line 3764 at r1 (raw file):
targetCols: []string{"a", "c"}, // 1000 rows means we will allocate 3 chunks of 10, 100, 1000. // The 2 inserts will add 6 more nextval calls.
why 6? is it number of columns? Can't we determine how many nextval() calls we have?
pkg/ccl/importccl/import_stmt_test.go, line 3820 at r1 (raw file):
} defer sqlDB.Exec(t, `DROP TABLE t`) for seqName := range test.seqToNumNextval {
why have 2 separate loops?
pkg/sql/execinfrapb/processors_bulk_io.proto, line 136 at r1 (raw file):
map<int32, jobs.jobspb.SequenceDetails> sequence_details = 16;
Apologies for a long comment...
On one hand, the change in this proto (and the matching change in dist_plan_csv to initialize this field) is symmetric to the way we handle resume position.
However, I'm not sure we actually need it. The resume position has to be communicated
to the import resumer so that it actually skips processing rows. We have to know that position before we start import. On the other hand, the nextval resume position is only needed when we actually call nextval -- which may be never (if import data doesn't have nextval columns).
That's kinda strike one. The second issue is that the evaluation of nextval itself (importNextVal function), already has to deal with the business of reserving the next chunk in a retry loop (db.Txn basically). This function already reads progress details for that.
I think importNextVal logic can be modified (slightly) so that if current chunk not found (i.e. the sequence details wasn't even initialized), then we load progress information, initialize sequence details and reserve chunks if needed.
If we do that, then we can drop this field, we can maybe even drop the DefaultExprSequeceDetails.
Wdyt?
pkg/sql/row/expr_walker.go, line 214 at r1 (raw file):
} func IncrementSequenceByVal(
does this method need to be exported?
pkg/sql/row/expr_walker.go, line 227 at r1 (raw file):
if seqOpts.Virtual { return 0, errors.New("virtual sequences are not supported by IMPORT INTO") } else {
drop else{}/un-indent the code?
pkg/sql/row/expr_walker.go, line 238 at r1 (raw file):
Quoted 10 lines of code…
val, err = kv.IncrementValRetryable(ctx, db, seqValueKey, incrementBy) if err != nil { if errors.HasType(err, (*roachpb.IntegerOverflowError)(nil)) { return 0, boundsExceededError(descriptor) } return 0, err } if val > seqOpts.MaxValue || val < seqOpts.MinValue { return 0, boundsExceededError(descriptor) }
Looks like this code, as well as the boundsExceededError was copied from sequence.go.
Do you think it would make sense to simply introduce a helper function in sequence.go instead?
Something along the lines of
func IncrementPhysicalSequenceValue() ...
pkg/sql/row/expr_walker.go, line 275 at r1 (raw file):
// seqName, or the row we are processing is outside the range of rows covered // by the active chunk, we need to do some extra legwork as described below. if curSeqChunk == nil || c.rowID == curSeqChunk.NextChunkStartRow {
let's extract the body of this if{} into a reserveNext()
helper
pkg/sql/row/expr_walker.go, line 281 at r1 (raw file):
// nextval(). var foundChunk bool if seqMetadata.allChunks != nil {
I think this check is superfluous: range works correctly over nil slices.
pkg/sql/row/expr_walker.go, line 353 at r1 (raw file):
} fileProgress.SeqIdToChunks[int32(seqID)].Chunks = append( fileProgress.SeqIdToChunks[int32(seqID)].Chunks, seqMetadata.curChunk)
4TB import (as one of the customers tried)... Let's assume 4KB rows on average.
1B rows.
Let's ignore the ramp up (10, 100, 1000, ...). We will be generating new chunk every 100K rows.
10K chunks. Not horrible, but not great; particularly since we're going to be serializing/deserializing this rather large protocol message; possibly in a txn retry loop.
I think there is an easy fix though.. Can't we wipe any previously allocated chunks whose end position is less than the previously checkpointed minimum ResumePos?
pkg/sql/row/expr_walker.go, line 368 at r1 (raw file):
// Now that the job progress has been written to, we can use the newly // allocated chunk. seqMetadata.curVal = seqMetadata.curChunk.ChunkStartVal
I did mention this above (re refactoring of this if branch). I think the code here is complex enough. If it is refactored (sufficiently), perhaps we can write some targeted tests around progress update logic.
pkg/sql/row/row_converter.go, line 301 at r1 (raw file):
Quoted 4 lines of code…
seqOpts := seqDesc.SequenceOpts if seqOpts == nil { return errors.Newf("descriptor %s is not a sequence", seqDesc.Name) }
Moves this before if statement above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 18 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the concern that @miretskiy is a valid one and got me thinking if it would be valuable to introduce a SeqChunkProvider
and expose a RequestChunk(sourceID) -> (chunk, err)
type interface. I think it would be nice for testing too, and it could be backed by either implementation. (The current approach would mean holding a job reference and plumbing down the chunk provider instead, but the 2 way-request response could also be handled by this interface if the chunk provider internally held channels to manage the synchronization between the chunks.)
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/sql/row/expr_walker.go, line 304 at r1 (raw file):
// to job progress. if curSeqChunk != nil { if curSeqChunk.ChunkSize >= 100000 {
Let's bring this out 100000 to a const? (Maybe even the 10
below into something like chunkSizeRate?)
pkg/sql/row/expr_walker.go, line 320 at r1 (raw file):
incrementValBy := newChunkSize * seqMetadata.increment // IncrementSequenceByVal keeps retyring until it is able to find a slot
nit: retyring
pkg/sql/row/row_converter.go, line 258 at r1 (raw file):
// Walltime is the import timestamp. Walltime int64
Do we have this info in the evalCtx
itself? It seems like we rely on evalCtx
's timestamp for the evaluation of other default expressions, so could we use that one?
f405a4a
to
66db602
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, added an interface and a job backed implementation of it.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pbardea)
pkg/ccl/importccl/import_stmt_test.go, line 3729 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Probably drop "the very first time" part .
Why do you need this resume at all if you're not overriding the actual resumer in any way?
deleted it was unused.
pkg/ccl/importccl/import_stmt_test.go, line 3743 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
var data string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { _, _ = w.Write([]byte(data)) } })) defer srv.Close()
this doesn't appear to be used?
deleted it was unused.
pkg/ccl/importccl/import_stmt_test.go, line 3764 at r1 (raw file):
instead
we're inserting 3 rows per insert, each of them have one default nextval. So two inserts * 3 = 6 nextval() calls.
pkg/ccl/importccl/import_stmt_test.go, line 3820 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
why have 2 separate loops?
if you're talking about the drop and create sequence loops, I moved the drop statement exec into the create loop.
pkg/sql/execinfrapb/processors_bulk_io.proto, line 136 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Apologies for a long comment...
On one hand, the change in this proto (and the matching change in dist_plan_csv to initialize this field) is symmetric to the way we handle resume position.
However, I'm not sure we actually need it. The resume position has to be communicated
to the import resumer so that it actually skips processing rows. We have to know that position before we start import. On the other hand, the nextval resume position is only needed when we actually call nextval -- which may be never (if import data doesn't have nextval columns).That's kinda strike one. The second issue is that the evaluation of nextval itself (importNextVal function), already has to deal with the business of reserving the next chunk in a retry loop (db.Txn basically). This function already reads progress details for that.
I think importNextVal logic can be modified (slightly) so that if current chunk not found (i.e. the sequence details wasn't even initialized), then we load progress information, initialize sequence details and reserve chunks if needed.If we do that, then we can drop this field, we can maybe even drop the DefaultExprSequeceDetails.
Wdyt?
Good point, thanks for the detailed explanation.
I removed it from the spec, and just read the job.Progress() when checking for existing chunks.
pkg/sql/row/expr_walker.go, line 214 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
does this method need to be exported?
nope, unexported.
pkg/sql/row/expr_walker.go, line 227 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
drop else{}/un-indent the code?
done.
pkg/sql/row/expr_walker.go, line 238 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
val, err = kv.IncrementValRetryable(ctx, db, seqValueKey, incrementBy) if err != nil { if errors.HasType(err, (*roachpb.IntegerOverflowError)(nil)) { return 0, boundsExceededError(descriptor) } return 0, err } if val > seqOpts.MaxValue || val < seqOpts.MinValue { return 0, boundsExceededError(descriptor) }
Looks like this code, as well as the boundsExceededError was copied from sequence.go.
Do you think it would make sense to simply introduce a helper function in sequence.go instead?
Something along the lines offunc IncrementPhysicalSequenceValue() ...
Yeah in my initial implementation I did try a lot but there is pkg cycle, and moving either sequence.go or :/
imports github.com/cockroachdb/cockroach/pkg/sql
imports github.com/cockroachdb/cockroach/pkg/sql/backfill
imports github.com/cockroachdb/cockroach/pkg/sql/row
imports github.com/cockroachdb/cockroach/pkg/sql
pkg/sql/row/expr_walker.go, line 275 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
let's extract the body of this if{} into a
reserveNext()
helper
done as part of the refactor.
pkg/sql/row/expr_walker.go, line 281 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I think this check is superfluous: range works correctly over nil slices.
deleted because we read progress directly now.
pkg/sql/row/expr_walker.go, line 304 at r1 (raw file):
Previously, pbardea (Paul Bardea) wrote…
Let's bring this out 100000 to a const? (Maybe even the
10
below into something like chunkSizeRate?)
done.
pkg/sql/row/expr_walker.go, line 320 at r1 (raw file):
Previously, pbardea (Paul Bardea) wrote…
nit: retyring
done.
pkg/sql/row/expr_walker.go, line 353 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
4TB import (as one of the customers tried)... Let's assume 4KB rows on average.
1B rows.
Let's ignore the ramp up (10, 100, 1000, ...). We will be generating new chunk every 100K rows.
10K chunks. Not horrible, but not great; particularly since we're going to be serializing/deserializing this rather large protocol message; possibly in a txn retry loop.I think there is an easy fix though.. Can't we wipe any previously allocated chunks whose end position is less than the previously checkpointed minimum ResumePos?
good idea, done.
pkg/sql/row/expr_walker.go, line 368 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I did mention this above (re refactoring of this if branch). I think the code here is complex enough. If it is refactored (sufficiently), perhaps we can write some targeted tests around progress update logic.
refactored and wrote a sort of unit test. It still involves a fair bit of setup overhead but I think it is better than before.
pkg/sql/row/row_converter.go, line 258 at r1 (raw file):
Previously, pbardea (Paul Bardea) wrote…
Do we have this info in the
evalCtx
itself? It seems like we rely onevalCtx
's timestamp for the evaluation of other default expressions, so could we use that one?
good point, we inject the import time into the evalCtx so we can use it.
pkg/sql/row/row_converter.go, line 301 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
seqOpts := seqDesc.SequenceOpts if seqOpts == nil { return errors.Newf("descriptor %s is not a sequence", seqDesc.Name) }
Moves this before if statement above?
done.
66db602
to
f5b1d94
Compare
@pbardea @miretskiy this should be RFAL. Addressed all your comments, refactored some of the chunk allocator code, and also tried to write a sort of unit test for the allocator. |
2a1c108
to
9eabb5b
Compare
friendly ping 🙂 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 14 of 19 files at r2, 1 of 1 files at r3.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/jobs/jobspb/jobs.proto, line 239 at r3 (raw file):
int64 chunk_start_row = 3; int64 next_chunk_start_row = 4;
Probably should add comments for these pair of fields?
pkg/jobs/jobspb/jobs.proto, line 246 at r3 (raw file):
message SequenceChunks { repeated SequenceValChunk chunks = 1; }
nit: Should we inline this message inside SequenceDetails?
pkg/sql/row/expr_walker.go, line 228 at r2 (raw file):
// chunk of sequence values corresponding to the row being imported. type SeqChunkProvider interface { RequestChunk(
pretty sure you need to document exported interface methods.
pkg/sql/row/expr_walker.go, line 233 at r2 (raw file):
curSeqChunk *jobspb.SequenceValChunk, seqMetadata *SequenceMetadata, ) error
I have few issues w/ this interface.
I know that @pbardea asked to add this, but I'm not sure it's a good idea in this particular case.
Adding this interface is rather invasive (many places where it has to be passed around). Usually, you add interfaces if you have 2+ different implementations (right now, we don't have anything outside this one -- not even a test based one). I would be in favor of dropping this interface.
If you feel strongly about keeping this interface, then I have to question the RequestChunk signature.
This method takes too many arguments that are very specific to this implementation, such as cell info annotation. I would imagine an interface method that takes eval ctx and probably seq descriptor, and nothing. Everything else (e.g. where the state is stored, what to update etc) should be moved to the implementation (job backed seq provider)
pkg/sql/row/expr_walker.go, line 286 at r2 (raw file):
Quoted 11 lines of code…
getJobProgressQuery := `SELECT progress FROM system.jobs J WHERE J.id = $1 FOR UPDATE` ex := evalCtx.InternalExecutor.(sqlutil.InternalExecutor) row, err := ex.QueryRow(ctx, "nextval-import-job-progress", txn, getJobProgressQuery, j.JobID) if err != nil { return err } progress, err := jobs.UnmarshalProgress(row[0]) if err != nil { return err }
I don't think we should be executing these queries directly; I think we should use jobs for that:
job := &jobs.Job{id: &j.id, registry: xxxx, txn: txn}
job.Update(....)
pkg/sql/row/expr_walker.go, line 111 at r3 (raw file):
curChunk *jobspb.SequenceValChunk curVal int64 increment int64
since increment available in seqDesc.SequenceOpts.Increment
, perhaps drop this field?
I know it's more verbose, but also would make this struct smaller, and more importantly, no chance of increment getting out of sync w/ sequence configuration...
pkg/sql/row/expr_walker.go, line 323 at r3 (raw file):
updateJobProgressQuery := `UPDATE system.jobs SET progress = $1 WHERE id = $2` _, err = ex.Exec(ctx, "nextval-import-job-progress-update", txn, updateJobProgressQuery, progressBytes, j.JobID)
use job.Update
pkg/sql/row/expr_walker.go, line 398 at r3 (raw file):
Quoted 7 lines of code…
getJobProgressQuery := `SELECT progress FROM system.jobs J WHERE J.id = $1` ex := evalCtx.InternalExecutor.(sqlutil.InternalExecutor) row, err := ex.QueryRow(evalCtx.Context, "nextval-import-job-progress", nil, /* txn */ getJobProgressQuery, j.JobID) if err != nil { return found, err }
perhaps registry.LoadJob(...)
instead of writing direct queries?
pkg/sql/row/expr_walker.go, line 504 at r3 (raw file):
// time). This will be handled by the visitorSideEffect field: it will be // called with an annotation passed in that changes the counter. In the case of
can you update comment to reflect signature change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/sql/row/expr_walker.go, line 233 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I have few issues w/ this interface.
I know that @pbardea asked to add this, but I'm not sure it's a good idea in this particular case.
Adding this interface is rather invasive (many places where it has to be passed around). Usually, you add interfaces if you have 2+ different implementations (right now, we don't have anything outside this one -- not even a test based one). I would be in favor of dropping this interface.If you feel strongly about keeping this interface, then I have to question the RequestChunk signature.
This method takes too many arguments that are very specific to this implementation, such as cell info annotation. I would imagine an interface method that takes eval ctx and probably seq descriptor, and nothing. Everything else (e.g. where the state is stored, what to update etc) should be moved to the implementation (job backed seq provider)
I think my comment about this interface may have been too heavy handed -- apologies! My intention was that it seemed like a good sub-component to factor out and test on its own. Having the JobBasedSeqChunkProvider
satisfies that, and I think it can be renamed to just SeqChunkProvider
.
pkg/sql/row/expr_walker.go, line 335 at r3 (raw file):
} func incrementSequenceByVal(
does it make sense for this to live in pkg/sql
along with IncrementSequence
rather than pkg/sq/row
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/sql/row/expr_walker.go, line 335 at r3 (raw file):
Previously, pbardea (Paul Bardea) wrote…
does it make sense for this to live in
pkg/sql
along withIncrementSequence
rather thanpkg/sq/row
?
I would second that -- if this resolves circular dependency issue, that'd be great.
9eabb5b
to
6f90408
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pbardea)
pkg/jobs/jobspb/jobs.proto, line 239 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Probably should add comments for these pair of fields?
done.
pkg/jobs/jobspb/jobs.proto, line 246 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
message SequenceChunks { repeated SequenceValChunk chunks = 1; }
nit: Should we inline this message inside SequenceDetails?
done.
pkg/sql/row/expr_walker.go, line 228 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
pretty sure you need to document exported interface methods.
done, by virtue of the interface not existing anymore.
pkg/sql/row/expr_walker.go, line 233 at r2 (raw file):
Previously, pbardea (Paul Bardea) wrote…
I think my comment about this interface may have been too heavy handed -- apologies! My intention was that it seemed like a good sub-component to factor out and test on its own. Having the
JobBasedSeqChunkProvider
satisfies that, and I think it can be renamed to justSeqChunkProvider
.
done, though I still think plumbing a SeqChunkProvider object is better than plumbing an unwrapped jobID and registry?
pkg/sql/row/expr_walker.go, line 286 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
getJobProgressQuery := `SELECT progress FROM system.jobs J WHERE J.id = $1 FOR UPDATE` ex := evalCtx.InternalExecutor.(sqlutil.InternalExecutor) row, err := ex.QueryRow(ctx, "nextval-import-job-progress", txn, getJobProgressQuery, j.JobID) if err != nil { return err } progress, err := jobs.UnmarshalProgress(row[0]) if err != nil { return err }
I don't think we should be executing these queries directly; I think we should use jobs for that:
job := &jobs.Job{id: &j.id, registry: xxxx, txn: txn} job.Update(....)
done. I'm just waiting on #57742 to merge, then i'll rebase!
pkg/sql/row/expr_walker.go, line 111 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
since increment available in
seqDesc.SequenceOpts.Increment
, perhaps drop this field?
I know it's more verbose, but also would make this struct smaller, and more importantly, no chance of increment getting out of sync w/ sequence configuration...
good point, done.
pkg/sql/row/expr_walker.go, line 323 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
use job.Update
done.
pkg/sql/row/expr_walker.go, line 335 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I would second that -- if this resolves circular dependency issue, that'd be great.
Unless I'm misunderstanding this causes the same cycle as I mentioned above. When I tried sharing logic it leads to:
imports github.com/cockroachdb/cockroach/pkg/sql
imports github.com/cockroachdb/cockroach/pkg/sql/backfill
imports github.com/cockroachdb/cockroach/pkg/sql/row
imports github.com/cockroachdb/cockroach/pkg/sql
row can't seem to rely on sql.
pkg/sql/row/expr_walker.go, line 398 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
getJobProgressQuery := `SELECT progress FROM system.jobs J WHERE J.id = $1` ex := evalCtx.InternalExecutor.(sqlutil.InternalExecutor) row, err := ex.QueryRow(evalCtx.Context, "nextval-import-job-progress", nil, /* txn */ getJobProgressQuery, j.JobID) if err != nil { return found, err }
perhaps
registry.LoadJob(...)
instead of writing direct queries?
done.
pkg/sql/row/expr_walker.go, line 504 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
// time). This will be handled by the visitorSideEffect field: it will be // called with an annotation passed in that changes the counter. In the case of
can you update comment to reflect signature change?
done.
e065bf9
to
a0b7976
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/sql/row/expr_walker.go, line 233 at r2 (raw file):
Previously, adityamaru (Aditya Maru) wrote…
done, though I still think plumbing a SeqChunkProvider object is better than plumbing an unwrapped jobID and registry?
ack.
pkg/sql/row/expr_walker.go, line 237 at r4 (raw file):
curSeqChunk *jobspb.SequenceValChunk, seqMetadata *SequenceMetadata,
do we need to pass both of these fields?
curSeqChunk always points inside seqMetadata, doesn't it?
pkg/sql/row/expr_walker.go, line 263 at r4 (raw file):
Quoted 4 lines of code…
job, err := j.Registry.LoadJob(evalCtx.Context, j.JobID) if err != nil { return err }
I don't think this is necessary, and I think you would be better off with something like:
job := &jobs.Job{id: &j.id, registry: j.registry, txn : txn}
job.Update...
Update loads the data for you.
pkg/sql/row/expr_walker.go, line 276 at r4 (raw file):
Chunks: make([]*jobspb.SequenceValChunk, 0), } } else {
nit: you could probably outdent this else statement (code clarity)?
pkg/sql/row/expr_walker.go, line 289 at r4 (raw file):
Quoted 9 lines of code…
chunksCopy := fileProgress.SeqIdToChunks[int32(seqID)].Chunks[:0] for _, chunk := range fileProgress.SeqIdToChunks[int32(seqID)].Chunks { // If the resumePos is below the max bound of the current chunk we // need to keep this chunk in case the job is re-resumed. if chunk.NextChunkStartRow > resumePos { chunksCopy = append(chunksCopy, chunk) } } fileProgress.SeqIdToChunks[int32(seqID)].Chunks = chunksCopy
nit:A bit shorter, and avoids append, copy:
var trim, chunks := 0, fileProgress.SeqIdToChunks[int32(seqID)].Chunk
for ; trim < len(chunks) && chunks[trim].NextChunkStartRow <= resumePos; trim++ {
}
fileProgress.SeqIdToChunks[int32(seqID)].Chunks[trim:]
pkg/sql/row/expr_walker.go, line 412 at r4 (raw file):
Quoted 5 lines of code…
if curSeqChunk.ChunkSize >= maxChunkSize { newChunkSize = curSeqChunk.ChunkSize } else { newChunkSize = chunkSizeIncrementRate * curSeqChunk.ChunkSize }
we probably want to swap these if/else:
newChunkSize = chunkSizeIncrementRate * curSeqChunk.ChunkSize
if newChunkSize > maxChunkSize {
newChunkSize = maxChunkSize
}
The reason is with the check below (newChunkSize = seqMetadata.instancesPerRow
, when we have more than 10 instances), we could allocate more than 100k chunks. Not a big deal, but still.
pkg/sql/row/expr_walker.go, line 435 at r4 (raw file):
ChunkSize: newChunkSize, ChunkStartRow: c.rowID, NextChunkStartRow: c.rowID + (newChunkSize / seqMetadata.instancesPerRow),
nit: maybe drop this field? It's fully computable, and used in only 2 places (which have access to seqMetadata and c.rowID... You could consider adding a method to add readability.
Smaller proto, plus no chance of this field getting out of sync w/ other values.
func (c *jobspb.SequenceValChunk) nextChunkRow() {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/sql/row/expr_walker.go, line 248 at r4 (raw file):
Quoted 9 lines of code…
// Check if we have already reserved a chunk corresponding to this row in a // previous run of the import job. If we have, we must reuse the value of // the sequence which was designated on this particular invocation of // nextval(). if found, err := j.checkForPreviouslyAllocatedChunks(evalCtx, seqMetadata, c); err != nil { return err } else if found { return nil }
Efficiency nit:
I think (but I maybe wrong) that the extra "LoadJob" call in checkForPreviouslyAllocatedChunks is not needed. We know this method is called either when we resume, or when we need more chunks.
So, we know we have to load job... But, we also know that in majority of the cases we will also want to update Job (after reserving). The only time we won't do update is if we previously allocated enough chunks....
Note hower, that jobs.Update issues update only if you changed anything in the job updater -- if nothing is changed -- no fould: jobs.Update loaded the job for you, you examine the data (checkforPreviouslyAllocatedChunks), and if needed reserved and updated the job.
The only thing to be careful about is that reserve chunks must be called at most once.
WDTY?
a0b7976
to
39bd847
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pbardea)
pkg/sql/row/expr_walker.go, line 237 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
curSeqChunk *jobspb.SequenceValChunk, seqMetadata *SequenceMetadata,
do we need to pass both of these fields?
curSeqChunk always points inside seqMetadata, doesn't it?
good point, done.
pkg/sql/row/expr_walker.go, line 248 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
// Check if we have already reserved a chunk corresponding to this row in a // previous run of the import job. If we have, we must reuse the value of // the sequence which was designated on this particular invocation of // nextval(). if found, err := j.checkForPreviouslyAllocatedChunks(evalCtx, seqMetadata, c); err != nil { return err } else if found { return nil }
Efficiency nit:
I think (but I maybe wrong) that the extra "LoadJob" call in checkForPreviouslyAllocatedChunks is not needed. We know this method is called either when we resume, or when we need more chunks.
So, we know we have to load job... But, we also know that in majority of the cases we will also want to update Job (after reserving). The only time we won't do update is if we previously allocated enough chunks....
Note hower, that jobs.Update issues update only if you changed anything in the job updater -- if nothing is changed -- no fould: jobs.Update loaded the job for you, you examine the data (checkforPreviouslyAllocatedChunks), and if needed reserved and updated the job.The only thing to be careful about is that reserve chunks must be called at most once.
WDTY?
good idea, rejigged the code structure a little to ensure we only have to load once. How does it look?
pkg/sql/row/expr_walker.go, line 263 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
job, err := j.Registry.LoadJob(evalCtx.Context, j.JobID) if err != nil { return err }
I don't think this is necessary, and I think you would be better off with something like:
job := &jobs.Job{id: &j.id, registry: j.registry, txn : txn} job.Update...
Update loads the data for you.
i'd have to export the struct fields since we're not in the jobs package :/
pkg/sql/row/expr_walker.go, line 276 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
nit: you could probably outdent this else statement (code clarity)?
done!
pkg/sql/row/expr_walker.go, line 289 at r4 (raw file):
var trim, chunks := 0, fileProgress.SeqIdToChunks[int32(seqID)].Chunk
for ; trim < len(chunks) && chunks[trim].NextChunkStartRow <= resumePos; trim++ {
}
fileProgress.SeqIdToChunks[int32(seqID)].Chunks[trim:]
neat! done.
pkg/sql/row/expr_walker.go, line 412 at r4 (raw file):
newChunkSize = chunkSizeIncrementRate * curSeqChunk.ChunkSize
if newChunkSize > maxChunkSize {
newChunkSize = maxChunkSize
}
swapped, makes it clearer too.
pkg/sql/row/expr_walker.go, line 435 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
nit: maybe drop this field? It's fully computable, and used in only 2 places (which have access to seqMetadata and c.rowID... You could consider adding a method to add readability.
Smaller proto, plus no chance of this field getting out of sync w/ other values.func (c *jobspb.SequenceValChunk) nextChunkRow() { ... }
I kind of like the fact that the information is stored in the same struct which contains all other related metadata for this chunk. We are storing the lower bound of this chunk in ChunkStartRow
so maybe an argument can be made that the upper bound should be stored too for completeness. It is only being set in one place so maybe getting out of sync is unlikely? I don't feel too strongly so if you suggest otherwise I'll make the swap 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 7 of 12 files at r4, 3 of 4 files at r5.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/jobs/jobspb/jobs.proto, line 256 at r5 (raw file):
// Mapping from sequence ID to allocated sequence chunks. map<int32, SequenceChunks> seq_id_to_chunks = 1;
nit: indentation's off?
pkg/sql/row/expr_walker.go, line 263 at r4 (raw file):
Previously, adityamaru (Aditya Maru) wrote…
i'd have to export the struct fields since we're not in the jobs package :/
I see..
pkg/sql/row/expr_walker.go, line 435 at r4 (raw file):
Previously, adityamaru (Aditya Maru) wrote…
I kind of like the fact that the information is stored in the same struct which contains all other related metadata for this chunk. We are storing the lower bound of this chunk in
ChunkStartRow
so maybe an argument can be made that the upper bound should be stored too for completeness. It is only being set in one place so maybe getting out of sync is unlikely? I don't feel too strongly so if you suggest otherwise I'll make the swap 😄
It's okay...
pkg/sql/row/expr_walker.go, line 238 at r5 (raw file):
var hasAllocatedChunk bool return evalCtx.DB.Txn(evalCtx.Context, func(ctx context.Context, txn *kv.Txn) error { job, err := j.Registry.LoadJob(evalCtx.Context, j.JobID)
Pretty sure we want to LoadWithTxn...
Kinda related to the other comment re constructing job{registry:r} -- it sucks that we have to load here at all. Maybe we don't have to export anything from the jobs.go -- perhaps just add a UpdateJobWithTxn method to registry to create *job and call update on it?
Previously, `nextval` was not supported as a default expression for a non-targeted import into column. This change adds that functionality for a CSV import. There is a lot of great discussion about the approach to this problem at cockroachdb#48253 (comment). At a high level, on encountering a nextval(seqname) for the first time, IMPORT will reserve a chunk of values for this sequence, and tie those values to the (fileIdx, rowNum) which is a unique reference to a particular row in a distributed import. The size of this chunk grows exponentially based on how many times a single processor encounters a nextval call for that particular sequence. The reservation of the chunk piggy backs on existing methods which provide atomic, non-transactional guarantees when it comes to increasing the value of a sequence. Information about the reserved chunks is stored in the import job progress details so as to ensure the following property: If the import job were to be paused and then resumed, assuming all the rows imported were not checkpointed, we need to ensure that the nextval value for a previously processed (fileIdx, rowNum) is identical to the value computed in the first run of the import job. This property is necessary to prevent duplicate entries with the same key but different value. We use the jobs progress details to check if we have a previously reserved chunk of sequence values which can be used for the current (fileIdx, rowNum). Release note (sql change): IMPORT INTO for CSV now supports nextval as a default expression of a non-targeted column.
39bd847
to
6449b99
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 12 files at r4, 2 of 5 files at r6.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @miretskiy, and @pbardea)
pkg/sql/row/expr_walker.go, line 239 at r6 (raw file):
return evalCtx.DB.Txn(evalCtx.Context, func(ctx context.Context, txn *kv.Txn) error { var foundFromPreviouslyAllocatedChunk bool resolveChunkFunc := func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
If I'm not mistaken, this can be moved outside db.Txn() call?
Perhaps even make it into a standalone function (it will have to take some additional arguments)...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @miretskiy)
I moved it around but I feel like keeping it inline makes the methods interaction with the stateful bools |
TFTRs! bors r=miretskiy,pbardea |
Build failed (retrying...): |
Build succeeded: |
Previously,
nextval
was not supported as a default expression for anon-targeted import into column. This change adds that functionality for
a CSV import.
There is a lot of great discussion about the approach to this problem at
#48253 (comment).
At a high level, on encountering a nextval(seqname) for the first time,
IMPORT will reserve a chunk of values for this sequence, and tie those
values to the (fileIdx, rowNum) which is a unique reference to a
particular row in a distributed import. The size of this chunk grows
exponentially based on how many times a single processor encounters a
nextval call for that particular sequence. The reservation of the chunk
piggy backs on existing methods which provide atomic, non-transactional
guarantees when it comes to increasing the value of a sequence.
Information about the reserved chunks is stored in the import job
progress details so as to ensure the following property:
If the import job were to be paused and then resumed, assuming all the
rows imported were not checkpointed, we need to ensure that the nextval
value for a previously processed (fileIdx, rowNum) is identical to the
value computed in the first run of the import job. This property is
necessary to prevent duplicate entries with the same key but different
value. We use the jobs progress details to check if we have a previously
reserved chunk of sequence values which can be used for the current
(fileIdx, rowNum).
Informs: #54797
Release note (sql change): IMPORT INTO for CSV now supports nextval as a
default expression of a non-targeted column.