From 63d832cf6d4b3c696c0d5ab44a443f7bc7e27363 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 21 Jun 2019 16:57:39 -0400 Subject: [PATCH 1/2] sql: Add a BulkRowWriter processor to be initially used by CTAS. This change introduces a BulkRowWriter processor which uses the AddSSTable method to write rows from a RowSource to the target table. The processor is required to make the CTAS statement scalable. Release note: None --- pkg/sql/distsql_plan_ctas.go | 57 ++++ pkg/sql/distsqlpb/flow_diagram.go | 5 + pkg/sql/distsqlpb/processors.pb.go | 208 +++++++++------ pkg/sql/distsqlpb/processors.proto | 1 + pkg/sql/distsqlpb/processors_bulk_io.pb.go | 291 ++++++++++++++++----- pkg/sql/distsqlpb/processors_bulk_io.proto | 6 + pkg/sql/distsqlrun/bulk_row_writer.go | 227 ++++++++++++++++ pkg/sql/distsqlrun/processors.go | 6 + 8 files changed, 657 insertions(+), 144 deletions(-) create mode 100644 pkg/sql/distsql_plan_ctas.go create mode 100644 pkg/sql/distsqlrun/bulk_row_writer.go diff --git a/pkg/sql/distsql_plan_ctas.go b/pkg/sql/distsql_plan_ctas.go new file mode 100644 index 000000000000..98869212d830 --- /dev/null +++ b/pkg/sql/distsql_plan_ctas.go @@ -0,0 +1,57 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/pkg/errors" +) + +// PlanAndRunCTAS plans and runs the CREATE TABLE AS command. +func PlanAndRunCTAS( + ctx context.Context, + dsp *DistSQLPlanner, + planner *planner, + txn *client.Txn, + isLocal bool, + in planNode, + out distsqlpb.ProcessorCoreUnion, + recv *DistSQLReceiver, +) { + planCtx := dsp.NewPlanningCtx(ctx, planner.ExtendedEvalContext(), txn) + planCtx.isLocal = isLocal + planCtx.planner = planner + planCtx.stmtType = tree.Rows + + p, err := dsp.createPlanForNode(planCtx, in) + if err != nil { + recv.SetError(errors.Wrapf(err, "constructing distSQL plan")) + return + } + + p.AddNoGroupingStage( + out, distsqlpb.PostProcessSpec{}, distsqlrun.CTASPlanResultTypes, distsqlpb.Ordering{}, + ) + + // The bulk row writers will emit a binary encoded BulkOpSummary. + p.PlanToStreamColMap = []int{0} + p.ResultTypes = distsqlrun.CTASPlanResultTypes + + // Make copy of evalCtx as Run might modify it. + evalCtxCopy := planner.ExtendedEvalContextCopy() + dsp.FinalizePlan(planCtx, &p) + dsp.Run(planCtx, txn, &p, recv, evalCtxCopy, nil /* finishedSetupFn */) +} diff --git a/pkg/sql/distsqlpb/flow_diagram.go b/pkg/sql/distsqlpb/flow_diagram.go index d337d47bd674..9d9522ed11c5 100644 --- a/pkg/sql/distsqlpb/flow_diagram.go +++ b/pkg/sql/distsqlpb/flow_diagram.go @@ -458,6 +458,11 @@ func (s *CSVWriterSpec) summary() (string, []string) { return "CSVWriter", []string{s.Destination} } +// summary implements the diagramCellType interface. +func (s *BulkRowWriterSpec) summary() (string, []string) { + return "BulkRowWriterSpec", []string{} +} + // summary implements the diagramCellType interface. func (w *WindowerSpec) summary() (string, []string) { details := make([]string, 0, len(w.WindowFns)) diff --git a/pkg/sql/distsqlpb/processors.pb.go b/pkg/sql/distsqlpb/processors.pb.go index 2cfe7bbffe5b..f803935f1389 100644 --- a/pkg/sql/distsqlpb/processors.pb.go +++ b/pkg/sql/distsqlpb/processors.pb.go @@ -70,7 +70,7 @@ func (m *ProcessorSpec) Reset() { *m = ProcessorSpec{} } func (m *ProcessorSpec) String() string { return proto.CompactTextString(m) } func (*ProcessorSpec) ProtoMessage() {} func (*ProcessorSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_c9f8ec7ee260e7db, []int{0} + return fileDescriptor_processors_a58be0f1c61be3a0, []int{0} } func (m *ProcessorSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -121,13 +121,14 @@ type ProcessorCoreUnion struct { ChangeAggregator *ChangeAggregatorSpec `protobuf:"bytes,25,opt,name=changeAggregator" json:"changeAggregator,omitempty"` ChangeFrontier *ChangeFrontierSpec `protobuf:"bytes,26,opt,name=changeFrontier" json:"changeFrontier,omitempty"` Ordinality *OrdinalitySpec `protobuf:"bytes,27,opt,name=ordinality" json:"ordinality,omitempty"` + BulkRowWriter *BulkRowWriterSpec `protobuf:"bytes,28,opt,name=bulkRowWriter" json:"bulkRowWriter,omitempty"` } func (m *ProcessorCoreUnion) Reset() { *m = ProcessorCoreUnion{} } func (m *ProcessorCoreUnion) String() string { return proto.CompactTextString(m) } func (*ProcessorCoreUnion) ProtoMessage() {} func (*ProcessorCoreUnion) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_c9f8ec7ee260e7db, []int{1} + return fileDescriptor_processors_a58be0f1c61be3a0, []int{1} } func (m *ProcessorCoreUnion) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -162,7 +163,7 @@ func (m *NoopCoreSpec) Reset() { *m = NoopCoreSpec{} } func (m *NoopCoreSpec) String() string { return proto.CompactTextString(m) } func (*NoopCoreSpec) ProtoMessage() {} func (*NoopCoreSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_c9f8ec7ee260e7db, []int{2} + return fileDescriptor_processors_a58be0f1c61be3a0, []int{2} } func (m *NoopCoreSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -204,7 +205,7 @@ func (m *LocalPlanNodeSpec) Reset() { *m = LocalPlanNodeSpec{} } func (m *LocalPlanNodeSpec) String() string { return proto.CompactTextString(m) } func (*LocalPlanNodeSpec) ProtoMessage() {} func (*LocalPlanNodeSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_c9f8ec7ee260e7db, []int{3} + return fileDescriptor_processors_a58be0f1c61be3a0, []int{3} } func (m *LocalPlanNodeSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -237,7 +238,7 @@ func (m *MetadataTestSenderSpec) Reset() { *m = MetadataTestSenderSpec{} func (m *MetadataTestSenderSpec) String() string { return proto.CompactTextString(m) } func (*MetadataTestSenderSpec) ProtoMessage() {} func (*MetadataTestSenderSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_c9f8ec7ee260e7db, []int{4} + return fileDescriptor_processors_a58be0f1c61be3a0, []int{4} } func (m *MetadataTestSenderSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -270,7 +271,7 @@ func (m *MetadataTestReceiverSpec) Reset() { *m = MetadataTestReceiverSp func (m *MetadataTestReceiverSpec) String() string { return proto.CompactTextString(m) } func (*MetadataTestReceiverSpec) ProtoMessage() {} func (*MetadataTestReceiverSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_c9f8ec7ee260e7db, []int{5} + return fileDescriptor_processors_a58be0f1c61be3a0, []int{5} } func (m *MetadataTestReceiverSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -656,6 +657,18 @@ func (m *ProcessorCoreUnion) MarshalTo(dAtA []byte) (int, error) { } i += n27 } + if m.BulkRowWriter != nil { + dAtA[i] = 0xe2 + i++ + dAtA[i] = 0x1 + i++ + i = encodeVarintProcessors(dAtA, i, uint64(m.BulkRowWriter.Size())) + n28, err := m.BulkRowWriter.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n28 + } return i, nil } @@ -908,6 +921,10 @@ func (m *ProcessorCoreUnion) Size() (n int) { l = m.Ordinality.Size() n += 2 + l + sovProcessors(uint64(l)) } + if m.BulkRowWriter != nil { + l = m.BulkRowWriter.Size() + n += 2 + l + sovProcessors(uint64(l)) + } return n } @@ -1054,6 +1071,9 @@ func (this *ProcessorCoreUnion) GetValue() interface{} { if this.Ordinality != nil { return this.Ordinality } + if this.BulkRowWriter != nil { + return this.BulkRowWriter + } return nil } @@ -1109,6 +1129,8 @@ func (this *ProcessorCoreUnion) SetValue(value interface{}) bool { this.ChangeFrontier = vt case *OrdinalitySpec: this.Ordinality = vt + case *BulkRowWriterSpec: + this.BulkRowWriter = vt default: return false } @@ -2178,6 +2200,39 @@ func (m *ProcessorCoreUnion) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 28: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BulkRowWriter", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessors + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessors + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.BulkRowWriter == nil { + m.BulkRowWriter = &BulkRowWriterSpec{} + } + if err := m.BulkRowWriter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipProcessors(dAtA[iNdEx:]) @@ -2633,76 +2688,77 @@ var ( ) func init() { - proto.RegisterFile("sql/distsqlpb/processors.proto", fileDescriptor_processors_c9f8ec7ee260e7db) + proto.RegisterFile("sql/distsqlpb/processors.proto", fileDescriptor_processors_a58be0f1c61be3a0) } -var fileDescriptor_processors_c9f8ec7ee260e7db = []byte{ - // 1059 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x96, 0xdd, 0x52, 0x1b, 0x37, - 0x14, 0xc7, 0x31, 0x98, 0x0f, 0xcb, 0x18, 0x88, 0x42, 0x13, 0x95, 0x76, 0x0c, 0xe3, 0xb6, 0x29, - 0x25, 0x19, 0xd3, 0x61, 0xda, 0x5e, 0x64, 0x3a, 0xd3, 0xd6, 0x76, 0x33, 0x98, 0xa6, 0x84, 0xee, - 0xd2, 0x64, 0x86, 0x1b, 0x2a, 0x76, 0x85, 0x51, 0x58, 0xaf, 0x16, 0x49, 0x86, 0x26, 0x2f, 0xd1, - 0x3e, 0x42, 0x1f, 0x87, 0xde, 0xe5, 0x32, 0x57, 0x4c, 0x6b, 0x5e, 0xa4, 0xa3, 0xa3, 0x65, 0x77, - 0xf9, 0x58, 0x6f, 0xef, 0xd6, 0xf2, 0xff, 0xff, 0x3b, 0x47, 0xd2, 0x39, 0x92, 0x50, 0x5d, 0x9d, - 0x04, 0xeb, 0x3e, 0x57, 0x5a, 0x9d, 0x04, 0xd1, 0xc1, 0x7a, 0x24, 0x85, 0xc7, 0x94, 0x12, 0x52, - 0x35, 0x23, 0x29, 0xb4, 0xc0, 0xc4, 0x13, 0xde, 0xb1, 0x14, 0xd4, 0x3b, 0x6a, 0xaa, 0x93, 0xa0, - 0x19, 0x2b, 0xe5, 0x20, 0x5c, 0x22, 0xd7, 0x9d, 0x3e, 0xd5, 0xd4, 0x7a, 0x96, 0x3e, 0xc9, 0x63, - 0xee, 0x1f, 0x50, 0xc5, 0x62, 0x51, 0x23, 0x57, 0x64, 0xe2, 0x58, 0xcd, 0xa3, 0x7c, 0xd0, 0x20, - 0x38, 0xde, 0xe7, 0x22, 0xd6, 0xad, 0xe5, 0xea, 0xbc, 0x23, 0x1a, 0xf6, 0xd8, 0x21, 0x63, 0xbe, - 0x2a, 0xd4, 0x6a, 0x7a, 0x10, 0xb0, 0x7d, 0xa5, 0xa9, 0xbe, 0xd2, 0x2e, 0xf6, 0x44, 0x4f, 0xc0, - 0xe7, 0xba, 0xf9, 0xb2, 0xa3, 0x8d, 0x3f, 0x26, 0x50, 0x6d, 0xe7, 0xca, 0xe6, 0x46, 0xcc, 0xc3, - 0x6d, 0x34, 0xc9, 0xc3, 0x68, 0xa0, 0x49, 0x69, 0x65, 0x62, 0xb5, 0xba, 0xf1, 0x79, 0x33, 0x6f, - 0xd1, 0x9a, 0x5d, 0x23, 0x73, 0xdf, 0x84, 0x9e, 0xf1, 0xb5, 0xca, 0xe7, 0x17, 0xcb, 0x63, 0x8e, - 0xf5, 0xe2, 0x67, 0xa8, 0xec, 0x09, 0xc9, 0xc8, 0xf8, 0x4a, 0x69, 0xb5, 0xba, 0xf1, 0x24, 0x9f, - 0x91, 0xc4, 0x6e, 0x0b, 0xc9, 0x7e, 0x0d, 0xb9, 0x08, 0x63, 0x10, 0xf8, 0xf1, 0x26, 0x9a, 0x12, - 0x03, 0x6d, 0xb2, 0x99, 0x80, 0x6c, 0xd6, 0xf2, 0x49, 0x2f, 0x40, 0xe7, 0x88, 0x81, 0x66, 0x32, - 0x93, 0x50, 0xec, 0xc7, 0x6d, 0x54, 0x8e, 0x84, 0xd2, 0xa4, 0x0c, 0x19, 0x7d, 0x31, 0x22, 0x23, - 0xa1, 0x74, 0x9c, 0x55, 0x06, 0x03, 0x66, 0xbc, 0x86, 0x66, 0x94, 0xa6, 0x3d, 0xb6, 0xcf, 0x7d, - 0x32, 0xb9, 0x52, 0x5a, 0x9d, 0x6c, 0xcd, 0x9b, 0x7f, 0x87, 0x17, 0xcb, 0xd3, 0xae, 0x19, 0xef, - 0x76, 0x9c, 0x69, 0x10, 0x74, 0x7d, 0xfc, 0x0d, 0x9a, 0x4d, 0xf6, 0xc3, 0xe8, 0xa7, 0x40, 0x7f, - 0x3f, 0xd6, 0x57, 0x93, 0x89, 0x77, 0x3b, 0x4e, 0x35, 0x11, 0x76, 0xfd, 0xc6, 0xdf, 0xf3, 0x08, - 0xdf, 0x5e, 0x15, 0xfc, 0x14, 0x95, 0x43, 0x21, 0x22, 0x52, 0x82, 0xfc, 0x1f, 0xe5, 0xe7, 0xbf, - 0x2d, 0x44, 0x64, 0x6c, 0x26, 0x79, 0x07, 0x3c, 0xf8, 0x27, 0x54, 0x85, 0x7a, 0x70, 0x18, 0xf5, - 0x99, 0x8c, 0x37, 0x65, 0xc4, 0x12, 0xec, 0xa6, 0x62, 0xa0, 0x64, 0xdd, 0x78, 0x13, 0xa1, 0xd7, - 0x82, 0x87, 0x31, 0x6b, 0x02, 0x58, 0xab, 0xf9, 0xac, 0xad, 0x44, 0x0b, 0xa8, 0x8c, 0x17, 0x7f, - 0x8b, 0xa6, 0x94, 0x90, 0x9a, 0xc9, 0x78, 0x53, 0x3e, 0xcd, 0xa7, 0xb8, 0xa0, 0x03, 0x42, 0xec, - 0x31, 0x79, 0xd0, 0x5e, 0x4f, 0xb2, 0x1e, 0xd5, 0x42, 0xc2, 0x6e, 0x8c, 0xcc, 0xe3, 0x87, 0x44, - 0x6b, 0xf3, 0x48, 0xbd, 0xb8, 0x85, 0x66, 0x8c, 0x90, 0x87, 0x9e, 0x26, 0xd3, 0x45, 0xcb, 0xdb, - 0x89, 0x95, 0x40, 0x49, 0x7c, 0x66, 0x89, 0xfb, 0x4c, 0xf6, 0x98, 0x99, 0x2e, 0x93, 0x64, 0xa6, - 0x68, 0x89, 0x7f, 0x4e, 0xc5, 0x76, 0x89, 0x33, 0x6e, 0x33, 0xb5, 0x23, 0xaa, 0x8e, 0x62, 0x56, - 0xa5, 0x68, 0x6a, 0x9b, 0x89, 0xd6, 0x4e, 0x2d, 0xf5, 0xe2, 0xef, 0xd1, 0xd4, 0x29, 0x0d, 0x06, - 0x4c, 0x11, 0x54, 0x44, 0x79, 0x09, 0xba, 0xa4, 0x72, 0x62, 0x9f, 0xc9, 0xe5, 0x80, 0x7a, 0xc7, - 0x87, 0x3c, 0x08, 0x98, 0x24, 0xd5, 0x22, 0x4a, 0x2b, 0xd1, 0xda, 0x5c, 0x52, 0x2f, 0x7e, 0x8e, - 0x90, 0x64, 0xd4, 0xef, 0xf6, 0x23, 0x21, 0x35, 0xa9, 0x15, 0x9d, 0x0c, 0x4e, 0xa2, 0xed, 0x50, - 0x4d, 0x2d, 0x2d, 0xf5, 0xe3, 0x1f, 0x51, 0xc5, 0x75, 0x77, 0x5f, 0x49, 0x6e, 0xea, 0x67, 0x0e, - 0x60, 0x23, 0x8e, 0xaa, 0x44, 0x0a, 0x9c, 0xd4, 0x89, 0xbf, 0x43, 0xd3, 0x2e, 0xed, 0x47, 0x66, - 0x6e, 0xf3, 0x00, 0xf9, 0x6c, 0x04, 0xc4, 0x0a, 0x01, 0x71, 0xe5, 0xc2, 0x7b, 0x68, 0xc1, 0x7e, - 0xa6, 0x05, 0x46, 0x16, 0x80, 0xd4, 0x2c, 0x22, 0xdd, 0x28, 0xc9, 0x5b, 0x1c, 0x2c, 0xd0, 0x43, - 0x1e, 0x6a, 0x26, 0x03, 0x46, 0x4f, 0x99, 0x6f, 0xbb, 0x26, 0x2e, 0x8a, 0x7b, 0x10, 0xe2, 0xeb, - 0x51, 0x87, 0xf3, 0x9d, 0x46, 0x88, 0x94, 0x47, 0xc5, 0xbf, 0x21, 0xdc, 0x67, 0x9a, 0x9a, 0xeb, - 0x6f, 0x97, 0x29, 0xed, 0xb2, 0xd0, 0xf4, 0x38, 0x86, 0x58, 0x5f, 0x8e, 0x2a, 0xe6, 0x9b, 0x1e, - 0x08, 0x73, 0x07, 0x0b, 0x1f, 0xa2, 0xc5, 0xec, 0xa8, 0xc3, 0x3c, 0xc6, 0x4f, 0x99, 0x24, 0xf7, - 0x21, 0xc6, 0xc6, 0xff, 0x8b, 0x71, 0xe5, 0x82, 0x28, 0x77, 0xf2, 0x4c, 0x79, 0xb4, 0xdd, 0x97, - 0x71, 0x79, 0x2c, 0x16, 0x95, 0x47, 0x22, 0xb5, 0xe5, 0x91, 0xfc, 0xc4, 0xdb, 0x68, 0xf6, 0x2d, - 0xef, 0xbd, 0xa5, 0xbd, 0x78, 0xd9, 0x3f, 0x00, 0xd2, 0x88, 0x5b, 0x68, 0x2f, 0xa3, 0x06, 0xd8, - 0x35, 0xbf, 0xe9, 0xa6, 0x48, 0x8a, 0xd7, 0xcc, 0xd3, 0x2e, 0xd3, 0xe4, 0x41, 0x51, 0x37, 0xed, - 0x24, 0x5a, 0x5b, 0xff, 0xa9, 0xd7, 0x1c, 0x5a, 0x67, 0x3c, 0xf4, 0xc5, 0x19, 0x93, 0xe4, 0x61, - 0xd1, 0xa1, 0xf5, 0x2a, 0x56, 0xda, 0x43, 0xeb, 0xca, 0x87, 0x7f, 0x41, 0xb5, 0x40, 0x78, 0x34, - 0xd8, 0x09, 0x68, 0xb8, 0x2d, 0x7c, 0x46, 0x08, 0x80, 0x1e, 0xe7, 0x83, 0x9e, 0x67, 0xe5, 0x40, - 0xbb, 0x4e, 0x30, 0xed, 0x60, 0x9f, 0x29, 0x99, 0x76, 0xf8, 0xb0, 0xa8, 0x1d, 0xda, 0x37, 0x1c, - 0xb6, 0x1d, 0x6e, 0x72, 0xf0, 0x2e, 0x9a, 0xb3, 0x63, 0xcf, 0xa4, 0x08, 0x35, 0x67, 0x92, 0x2c, - 0x15, 0x1d, 0x22, 0xed, 0x6b, 0x7a, 0xe0, 0xde, 0x60, 0x98, 0x2d, 0x11, 0xd2, 0xe7, 0x21, 0x0d, - 0xb8, 0x7e, 0x43, 0x3e, 0x2a, 0xda, 0x92, 0x17, 0x89, 0xd6, 0x6e, 0x49, 0xea, 0x7d, 0x5a, 0x3e, - 0xff, 0x6b, 0xb9, 0xb4, 0x55, 0x9e, 0x99, 0x5a, 0x98, 0xde, 0x2a, 0xcf, 0xcc, 0x2e, 0xd4, 0x1a, - 0x73, 0x68, 0x36, 0x7b, 0x1d, 0x37, 0x38, 0xba, 0x77, 0x6b, 0x05, 0x71, 0x03, 0xcd, 0x3a, 0xe2, - 0xcc, 0x15, 0x03, 0xe9, 0xb1, 0xae, 0xff, 0x3b, 0xdc, 0xf0, 0x35, 0xe7, 0xda, 0x18, 0xfe, 0x18, - 0x55, 0xb6, 0x07, 0x7d, 0x78, 0x70, 0x29, 0xb8, 0xbf, 0x6b, 0x4e, 0x3a, 0x80, 0x31, 0x2a, 0x6f, - 0xd3, 0x3e, 0x83, 0xcb, 0xb8, 0xe2, 0xc0, 0x77, 0xe3, 0x2b, 0xf4, 0xe0, 0xee, 0xb6, 0xc4, 0x4b, - 0x68, 0x9c, 0xfb, 0x10, 0xa5, 0xd2, 0x42, 0xf1, 0x73, 0x64, 0xbc, 0xdb, 0x71, 0xc6, 0xb9, 0xdf, - 0xd8, 0x44, 0x24, 0xaf, 0xd1, 0xf0, 0x13, 0x84, 0x14, 0x50, 0xf6, 0xb9, 0xaf, 0xe0, 0x75, 0x58, - 0x69, 0xd5, 0x86, 0x17, 0xcb, 0x15, 0xcb, 0xee, 0x76, 0x94, 0x53, 0xb1, 0x82, 0xae, 0xaf, 0x5a, - 0x8f, 0xcf, 0xff, 0xad, 0x8f, 0x9d, 0x0f, 0xeb, 0xa5, 0x77, 0xc3, 0x7a, 0xe9, 0xfd, 0xb0, 0x5e, - 0xfa, 0x67, 0x58, 0x2f, 0xfd, 0x79, 0x59, 0x1f, 0x7b, 0x77, 0x59, 0x1f, 0x7b, 0x7f, 0x59, 0x1f, - 0xdb, 0xab, 0x24, 0x0f, 0xd7, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x96, 0xf0, 0x0c, 0x48, 0xb9, - 0x0b, 0x00, 0x00, +var fileDescriptor_processors_a58be0f1c61be3a0 = []byte{ + // 1080 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x96, 0xdf, 0x52, 0xdb, 0x46, + 0x14, 0xc6, 0x31, 0x98, 0x3f, 0x5e, 0x63, 0x42, 0x36, 0x34, 0xd9, 0xd2, 0x8c, 0x61, 0xdc, 0x36, + 0xa5, 0x24, 0x63, 0x3a, 0x4c, 0xdb, 0x8b, 0x4c, 0x67, 0xda, 0xda, 0x6e, 0x06, 0xd3, 0x94, 0x50, + 0x89, 0x26, 0x33, 0xdc, 0xd0, 0x45, 0x5a, 0x84, 0x82, 0xac, 0x15, 0xbb, 0x6b, 0x68, 0xf2, 0x12, + 0xed, 0x03, 0xf4, 0xa2, 0x8f, 0xc3, 0x65, 0x2e, 0x73, 0xc5, 0xb4, 0xe6, 0x45, 0x3a, 0x7b, 0x56, + 0x48, 0xb2, 0x41, 0x56, 0xee, 0xcc, 0xf2, 0x7d, 0xbf, 0x73, 0xf6, 0xec, 0x39, 0xab, 0x45, 0x75, + 0x79, 0x1a, 0x6c, 0xb8, 0xbe, 0x54, 0xf2, 0x34, 0x88, 0x0e, 0x37, 0x22, 0xc1, 0x1d, 0x26, 0x25, + 0x17, 0xb2, 0x19, 0x09, 0xae, 0x38, 0x26, 0x0e, 0x77, 0x4e, 0x04, 0xa7, 0xce, 0x71, 0x53, 0x9e, + 0x06, 0xcd, 0x58, 0x29, 0xfa, 0xe1, 0x32, 0x19, 0x76, 0xba, 0x54, 0x51, 0xe3, 0x59, 0xfe, 0x34, + 0x8f, 0x79, 0x70, 0x48, 0x25, 0x8b, 0x45, 0x8d, 0x5c, 0x91, 0x8e, 0x63, 0x34, 0x8f, 0xf2, 0x41, + 0xfd, 0xe0, 0xe4, 0xc0, 0xe7, 0xb1, 0x6e, 0x3d, 0x57, 0xe7, 0x1c, 0xd3, 0xd0, 0x63, 0x47, 0x8c, + 0xb9, 0xb2, 0x50, 0xab, 0xe8, 0x61, 0xc0, 0x0e, 0xa4, 0xa2, 0xea, 0x5a, 0xbb, 0xe4, 0x71, 0x8f, + 0xc3, 0xcf, 0x0d, 0xfd, 0xcb, 0xac, 0x36, 0xfe, 0x9c, 0x42, 0xb5, 0xdd, 0x6b, 0x9b, 0x1d, 0x31, + 0x07, 0xb7, 0xd1, 0xb4, 0x1f, 0x46, 0x7d, 0x45, 0x4a, 0xab, 0x53, 0x6b, 0xd5, 0xcd, 0x2f, 0x9a, + 0x79, 0x45, 0x6b, 0x76, 0xb5, 0xcc, 0x7e, 0x13, 0x3a, 0xda, 0xd7, 0x2a, 0x5f, 0x5c, 0xae, 0x4c, + 0x58, 0xc6, 0x8b, 0x9f, 0xa1, 0xb2, 0xc3, 0x05, 0x23, 0x93, 0xab, 0xa5, 0xb5, 0xea, 0xe6, 0x93, + 0x7c, 0x46, 0x12, 0xbb, 0xcd, 0x05, 0xfb, 0x2d, 0xf4, 0x79, 0x18, 0x83, 0xc0, 0x8f, 0xb7, 0xd0, + 0x0c, 0xef, 0x2b, 0x9d, 0xcd, 0x14, 0x64, 0xb3, 0x9e, 0x4f, 0x7a, 0x01, 0x3a, 0x8b, 0xf7, 0x15, + 0x13, 0x99, 0x84, 0x62, 0x3f, 0x6e, 0xa3, 0x72, 0xc4, 0xa5, 0x22, 0x65, 0xc8, 0xe8, 0xcb, 0x31, + 0x19, 0x71, 0xa9, 0xe2, 0xac, 0x32, 0x18, 0x30, 0xe3, 0x75, 0x34, 0x27, 0x15, 0xf5, 0xd8, 0x81, + 0xef, 0x92, 0xe9, 0xd5, 0xd2, 0xda, 0x74, 0xeb, 0x8e, 0xfe, 0xef, 0xe0, 0x72, 0x65, 0xd6, 0xd6, + 0xeb, 0xdd, 0x8e, 0x35, 0x0b, 0x82, 0xae, 0x8b, 0xbf, 0x45, 0xf3, 0xc9, 0x79, 0x68, 0xfd, 0x0c, + 0xe8, 0xef, 0xc5, 0xfa, 0x6a, 0xb2, 0xf1, 0x6e, 0xc7, 0xaa, 0x26, 0xc2, 0xae, 0xdb, 0xf8, 0x7b, + 0x11, 0xe1, 0x9b, 0x55, 0xc1, 0x4f, 0x51, 0x39, 0xe4, 0x3c, 0x22, 0x25, 0xc8, 0xff, 0x51, 0x7e, + 0xfe, 0x3b, 0x9c, 0x47, 0xda, 0xa6, 0x93, 0xb7, 0xc0, 0x83, 0x7f, 0x46, 0x55, 0xe8, 0x07, 0x8b, + 0x51, 0x97, 0x89, 0xf8, 0x50, 0xc6, 0x94, 0x60, 0x2f, 0x15, 0x03, 0x25, 0xeb, 0xc6, 0x5b, 0x08, + 0xbd, 0xe6, 0x7e, 0x18, 0xb3, 0xa6, 0x80, 0xb5, 0x96, 0xcf, 0xda, 0x4e, 0xb4, 0x80, 0xca, 0x78, + 0xf1, 0x77, 0x68, 0x46, 0x72, 0xa1, 0x98, 0x88, 0x0f, 0xe5, 0xb3, 0x7c, 0x8a, 0x0d, 0x3a, 0x20, + 0xc4, 0x1e, 0x9d, 0x07, 0xf5, 0x3c, 0xc1, 0x3c, 0xaa, 0xb8, 0x80, 0xd3, 0x18, 0x9b, 0xc7, 0x8f, + 0x89, 0xd6, 0xe4, 0x91, 0x7a, 0x71, 0x0b, 0xcd, 0x69, 0xa1, 0x1f, 0x3a, 0x8a, 0xcc, 0x16, 0x95, + 0xb7, 0x13, 0x2b, 0x81, 0x92, 0xf8, 0x74, 0x89, 0x7b, 0x4c, 0x78, 0x4c, 0x6f, 0x97, 0x09, 0x32, + 0x57, 0x54, 0xe2, 0x5f, 0x52, 0xb1, 0x29, 0x71, 0xc6, 0xad, 0xb7, 0x76, 0x4c, 0xe5, 0x71, 0xcc, + 0xaa, 0x14, 0x6d, 0x6d, 0x2b, 0xd1, 0x9a, 0xad, 0xa5, 0x5e, 0xfc, 0x03, 0x9a, 0x39, 0xa3, 0x41, + 0x9f, 0x49, 0x82, 0x8a, 0x28, 0x2f, 0x41, 0x97, 0x74, 0x4e, 0xec, 0xd3, 0xb9, 0x1c, 0x52, 0xe7, + 0xe4, 0xc8, 0x0f, 0x02, 0x26, 0x48, 0xb5, 0x88, 0xd2, 0x4a, 0xb4, 0x26, 0x97, 0xd4, 0x8b, 0x9f, + 0x23, 0x24, 0x18, 0x75, 0xbb, 0xbd, 0x88, 0x0b, 0x45, 0x6a, 0x45, 0x37, 0x83, 0x95, 0x68, 0x3b, + 0x54, 0x51, 0x43, 0x4b, 0xfd, 0xf8, 0x27, 0x54, 0xb1, 0xed, 0xbd, 0x57, 0xc2, 0xd7, 0xfd, 0xb3, + 0x00, 0xb0, 0x31, 0x57, 0x55, 0x22, 0x05, 0x4e, 0xea, 0xc4, 0xdf, 0xa3, 0x59, 0x9b, 0xf6, 0x22, + 0xbd, 0xb7, 0x3b, 0x00, 0xf9, 0x7c, 0x0c, 0xc4, 0x08, 0x01, 0x71, 0xed, 0xc2, 0xfb, 0x68, 0xd1, + 0xfc, 0x4c, 0x1b, 0x8c, 0x2c, 0x02, 0xa9, 0x59, 0x44, 0x1a, 0x69, 0xc9, 0x1b, 0x1c, 0xcc, 0xd1, + 0x03, 0x3f, 0x54, 0x4c, 0x04, 0x8c, 0x9e, 0x31, 0xd7, 0x4c, 0x4d, 0xdc, 0x14, 0x77, 0x21, 0xc4, + 0x37, 0xe3, 0x2e, 0xe7, 0x5b, 0x8d, 0x10, 0x29, 0x8f, 0x8a, 0x7f, 0x47, 0xb8, 0xc7, 0x14, 0xd5, + 0x9f, 0xbf, 0x3d, 0x26, 0x95, 0xcd, 0x42, 0x3d, 0xe3, 0x18, 0x62, 0x7d, 0x35, 0xae, 0x99, 0x47, + 0x3d, 0x10, 0xe6, 0x16, 0x16, 0x3e, 0x42, 0x4b, 0xd9, 0x55, 0x8b, 0x39, 0xcc, 0x3f, 0x63, 0x82, + 0xdc, 0x83, 0x18, 0x9b, 0x1f, 0x16, 0xe3, 0xda, 0x05, 0x51, 0x6e, 0xe5, 0xe9, 0xf6, 0x68, 0xdb, + 0x2f, 0xe3, 0xf6, 0x58, 0x2a, 0x6a, 0x8f, 0x44, 0x6a, 0xda, 0x23, 0xf9, 0x13, 0xef, 0xa0, 0xf9, + 0xb7, 0xbe, 0xf7, 0x96, 0x7a, 0x71, 0xd9, 0x3f, 0x02, 0xd2, 0x98, 0xaf, 0xd0, 0x7e, 0x46, 0x0d, + 0xb0, 0x21, 0xbf, 0x9e, 0xa6, 0x48, 0xf0, 0xd7, 0xcc, 0x51, 0x36, 0x53, 0xe4, 0x7e, 0xd1, 0x34, + 0xed, 0x26, 0x5a, 0xd3, 0xff, 0xa9, 0x57, 0x5f, 0x5a, 0xe7, 0x7e, 0xe8, 0xf2, 0x73, 0x26, 0xc8, + 0x83, 0xa2, 0x4b, 0xeb, 0x55, 0xac, 0x34, 0x97, 0xd6, 0xb5, 0x0f, 0xff, 0x8a, 0x6a, 0x01, 0x77, + 0x68, 0xb0, 0x1b, 0xd0, 0x70, 0x87, 0xbb, 0x8c, 0x10, 0x00, 0x3d, 0xce, 0x07, 0x3d, 0xcf, 0xca, + 0x81, 0x36, 0x4c, 0xd0, 0xe3, 0x60, 0x9e, 0x29, 0x99, 0x71, 0xf8, 0xb8, 0x68, 0x1c, 0xda, 0x23, + 0x0e, 0x33, 0x0e, 0xa3, 0x1c, 0xbc, 0x87, 0x16, 0xcc, 0xda, 0x33, 0xc1, 0x43, 0xe5, 0x33, 0x41, + 0x96, 0x8b, 0x2e, 0x91, 0xf6, 0x90, 0x1e, 0xb8, 0x23, 0x0c, 0x7d, 0x24, 0x5c, 0xb8, 0x7e, 0x48, + 0x03, 0x5f, 0xbd, 0x21, 0x9f, 0x14, 0x1d, 0xc9, 0x8b, 0x44, 0x6b, 0x8e, 0x24, 0xf5, 0xea, 0x72, + 0xea, 0xa7, 0x9c, 0xc5, 0xcf, 0xe3, 0xbe, 0x7b, 0x58, 0x54, 0xce, 0x56, 0x56, 0x6e, 0xca, 0x39, + 0x44, 0x78, 0x5a, 0xbe, 0xf8, 0x67, 0xa5, 0xb4, 0x5d, 0x9e, 0x9b, 0x59, 0x9c, 0xdd, 0x2e, 0xcf, + 0xcd, 0x2f, 0xd6, 0x1a, 0x0b, 0x68, 0x3e, 0xfb, 0x85, 0x6f, 0xf8, 0xe8, 0xee, 0x8d, 0x43, 0xc1, + 0x0d, 0x34, 0x6f, 0xf1, 0x73, 0x9b, 0xf7, 0x85, 0xc3, 0xba, 0xee, 0x1f, 0xf0, 0x68, 0xa8, 0x59, + 0x43, 0x6b, 0xf8, 0x21, 0xaa, 0xec, 0xf4, 0x7b, 0xf0, 0x86, 0x93, 0xf0, 0x24, 0xa8, 0x59, 0xe9, + 0x02, 0xc6, 0xa8, 0xbc, 0x43, 0x7b, 0x0c, 0xbe, 0xef, 0x15, 0x0b, 0x7e, 0x37, 0xbe, 0x46, 0xf7, + 0x6f, 0x9f, 0x74, 0xbc, 0x8c, 0x26, 0x7d, 0x17, 0xa2, 0x54, 0x5a, 0x28, 0x7e, 0xe1, 0x4c, 0x76, + 0x3b, 0xd6, 0xa4, 0xef, 0x36, 0xb6, 0x10, 0xc9, 0x9b, 0x5d, 0xfc, 0x04, 0x21, 0x09, 0x94, 0x03, + 0xdf, 0x95, 0xf0, 0xe0, 0xac, 0xb4, 0x6a, 0x83, 0xcb, 0x95, 0x8a, 0x61, 0x77, 0x3b, 0xd2, 0xaa, + 0x18, 0x41, 0xd7, 0x95, 0xad, 0xc7, 0x17, 0xff, 0xd5, 0x27, 0x2e, 0x06, 0xf5, 0xd2, 0xbb, 0x41, + 0xbd, 0xf4, 0x7e, 0x50, 0x2f, 0xfd, 0x3b, 0xa8, 0x97, 0xfe, 0xba, 0xaa, 0x4f, 0xbc, 0xbb, 0xaa, + 0x4f, 0xbc, 0xbf, 0xaa, 0x4f, 0xec, 0x57, 0x92, 0xb7, 0xf0, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff, + 0x19, 0x27, 0xa8, 0x77, 0x0c, 0x0c, 0x00, 0x00, } diff --git a/pkg/sql/distsqlpb/processors.proto b/pkg/sql/distsqlpb/processors.proto index 080f5d0e7731..8bf9ea4d566e 100644 --- a/pkg/sql/distsqlpb/processors.proto +++ b/pkg/sql/distsqlpb/processors.proto @@ -102,6 +102,7 @@ message ProcessorCoreUnion { optional ChangeAggregatorSpec changeAggregator = 25; optional ChangeFrontierSpec changeFrontier = 26; optional OrdinalitySpec ordinality = 27; + optional BulkRowWriterSpec bulkRowWriter = 28; reserved 6, 12; } diff --git a/pkg/sql/distsqlpb/processors_bulk_io.pb.go b/pkg/sql/distsqlpb/processors_bulk_io.pb.go index 9c9ead0f8842..f5188cda4c01 100644 --- a/pkg/sql/distsqlpb/processors_bulk_io.pb.go +++ b/pkg/sql/distsqlpb/processors_bulk_io.pb.go @@ -71,7 +71,7 @@ func (x *BackfillerSpec_Type) UnmarshalJSON(data []byte) error { return nil } func (BackfillerSpec_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_3600efcb340068e6, []int{0, 0} + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{0, 0} } // BackfillerSpec is the specification for a "schema change backfiller". @@ -107,7 +107,7 @@ func (m *BackfillerSpec) Reset() { *m = BackfillerSpec{} } func (m *BackfillerSpec) String() string { return proto.CompactTextString(m) } func (*BackfillerSpec) ProtoMessage() {} func (*BackfillerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_3600efcb340068e6, []int{0} + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{0} } func (m *BackfillerSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -147,7 +147,7 @@ func (m *JobProgress) Reset() { *m = JobProgress{} } func (m *JobProgress) String() string { return proto.CompactTextString(m) } func (*JobProgress) ProtoMessage() {} func (*JobProgress) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_3600efcb340068e6, []int{1} + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{1} } func (m *JobProgress) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -205,7 +205,7 @@ func (m *ReadImportDataSpec) Reset() { *m = ReadImportDataSpec{} } func (m *ReadImportDataSpec) String() string { return proto.CompactTextString(m) } func (*ReadImportDataSpec) ProtoMessage() {} func (*ReadImportDataSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_3600efcb340068e6, []int{2} + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{2} } func (m *ReadImportDataSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -250,7 +250,7 @@ func (m *SSTWriterSpec) Reset() { *m = SSTWriterSpec{} } func (m *SSTWriterSpec) String() string { return proto.CompactTextString(m) } func (*SSTWriterSpec) ProtoMessage() {} func (*SSTWriterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_3600efcb340068e6, []int{3} + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{3} } func (m *SSTWriterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -286,7 +286,7 @@ func (m *SSTWriterSpec_SpanName) Reset() { *m = SSTWriterSpec_SpanName{} func (m *SSTWriterSpec_SpanName) String() string { return proto.CompactTextString(m) } func (*SSTWriterSpec_SpanName) ProtoMessage() {} func (*SSTWriterSpec_SpanName) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_3600efcb340068e6, []int{3, 0} + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{3, 0} } func (m *SSTWriterSpec_SpanName) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -328,7 +328,7 @@ func (m *CSVWriterSpec) Reset() { *m = CSVWriterSpec{} } func (m *CSVWriterSpec) String() string { return proto.CompactTextString(m) } func (*CSVWriterSpec) ProtoMessage() {} func (*CSVWriterSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_bulk_io_3600efcb340068e6, []int{4} + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{4} } func (m *CSVWriterSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -353,6 +353,41 @@ func (m *CSVWriterSpec) XXX_DiscardUnknown() { var xxx_messageInfo_CSVWriterSpec proto.InternalMessageInfo +// BulkRowWriterSpec is the specification for a processor that consumes rows and +// writes them to a target table using AddSSTable. It outputs a BulkOpSummary. +type BulkRowWriterSpec struct { + Table sqlbase.TableDescriptor `protobuf:"bytes,1,opt,name=table" json:"table"` +} + +func (m *BulkRowWriterSpec) Reset() { *m = BulkRowWriterSpec{} } +func (m *BulkRowWriterSpec) String() string { return proto.CompactTextString(m) } +func (*BulkRowWriterSpec) ProtoMessage() {} +func (*BulkRowWriterSpec) Descriptor() ([]byte, []int) { + return fileDescriptor_processors_bulk_io_a862d1ed78e8528f, []int{5} +} +func (m *BulkRowWriterSpec) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *BulkRowWriterSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *BulkRowWriterSpec) XXX_Merge(src proto.Message) { + xxx_messageInfo_BulkRowWriterSpec.Merge(dst, src) +} +func (m *BulkRowWriterSpec) XXX_Size() int { + return m.Size() +} +func (m *BulkRowWriterSpec) XXX_DiscardUnknown() { + xxx_messageInfo_BulkRowWriterSpec.DiscardUnknown(m) +} + +var xxx_messageInfo_BulkRowWriterSpec proto.InternalMessageInfo + func init() { proto.RegisterType((*BackfillerSpec)(nil), "cockroach.sql.distsqlrun.BackfillerSpec") proto.RegisterType((*JobProgress)(nil), "cockroach.sql.distsqlrun.JobProgress") @@ -362,6 +397,7 @@ func init() { proto.RegisterType((*SSTWriterSpec)(nil), "cockroach.sql.distsqlrun.SSTWriterSpec") proto.RegisterType((*SSTWriterSpec_SpanName)(nil), "cockroach.sql.distsqlrun.SSTWriterSpec.SpanName") proto.RegisterType((*CSVWriterSpec)(nil), "cockroach.sql.distsqlrun.CSVWriterSpec") + proto.RegisterType((*BulkRowWriterSpec)(nil), "cockroach.sql.distsqlrun.BulkRowWriterSpec") proto.RegisterEnum("cockroach.sql.distsqlrun.BackfillerSpec_Type", BackfillerSpec_Type_name, BackfillerSpec_Type_value) } func (m *BackfillerSpec) Marshal() (dAtA []byte, err error) { @@ -679,6 +715,32 @@ func (m *CSVWriterSpec) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *BulkRowWriterSpec) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BulkRowWriterSpec) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintProcessorsBulkIo(dAtA, i, uint64(m.Table.Size())) + n8, err := m.Table.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n8 + return i, nil +} + func encodeVarintProcessorsBulkIo(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -817,6 +879,17 @@ func (m *CSVWriterSpec) Size() (n int) { return n } +func (m *BulkRowWriterSpec) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.Table.Size() + n += 1 + l + sovProcessorsBulkIo(uint64(l)) + return n +} + func sovProcessorsBulkIo(x uint64) (n int) { for { n++ @@ -2002,6 +2075,86 @@ func (m *CSVWriterSpec) Unmarshal(dAtA []byte) error { } return nil } +func (m *BulkRowWriterSpec) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessorsBulkIo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: BulkRowWriterSpec: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: BulkRowWriterSpec: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Table", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProcessorsBulkIo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProcessorsBulkIo + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Table.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipProcessorsBulkIo(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProcessorsBulkIo + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipProcessorsBulkIo(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -2108,71 +2261,73 @@ var ( ) func init() { - proto.RegisterFile("sql/distsqlpb/processors_bulk_io.proto", fileDescriptor_processors_bulk_io_3600efcb340068e6) + proto.RegisterFile("sql/distsqlpb/processors_bulk_io.proto", fileDescriptor_processors_bulk_io_a862d1ed78e8528f) } -var fileDescriptor_processors_bulk_io_3600efcb340068e6 = []byte{ - // 992 bytes of a gzipped FileDescriptorProto +var fileDescriptor_processors_bulk_io_a862d1ed78e8528f = []byte{ + // 1009 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcf, 0x6e, 0xdb, 0x36, - 0x1c, 0xb6, 0x2c, 0xc9, 0x91, 0xe9, 0x24, 0x30, 0x88, 0xae, 0xd0, 0x82, 0xcd, 0x31, 0xdc, 0x35, + 0x1c, 0xb6, 0x2c, 0xc9, 0x91, 0xe9, 0xa4, 0xf0, 0x88, 0xae, 0xd0, 0x82, 0xcd, 0x31, 0xdc, 0x35, 0xf3, 0xba, 0x55, 0xde, 0x02, 0x6c, 0x28, 0x86, 0x15, 0xc5, 0x9c, 0x34, 0x81, 0xbd, 0xad, 0x29, - 0xe4, 0x2c, 0x03, 0x76, 0x31, 0x28, 0x89, 0x71, 0x18, 0xd3, 0xa2, 0x42, 0x52, 0xcd, 0xdc, 0xa7, + 0xe4, 0x2c, 0x05, 0x76, 0x31, 0x28, 0x89, 0x71, 0x18, 0xd3, 0xa2, 0x42, 0x52, 0xcd, 0xdc, 0xa7, 0xd8, 0xb3, 0xec, 0xb4, 0x07, 0xd8, 0x21, 0xc7, 0x1e, 0x7b, 0x0a, 0xb6, 0xe4, 0x2d, 0x7a, 0x1a, - 0x48, 0x49, 0x99, 0xdc, 0xc2, 0x87, 0xac, 0x97, 0x84, 0xe0, 0xef, 0xf7, 0x7d, 0xfa, 0xf8, 0xfd, - 0xfe, 0x18, 0x6c, 0x89, 0x33, 0xda, 0x8b, 0x88, 0x90, 0xe2, 0x8c, 0x26, 0x41, 0x2f, 0xe1, 0x2c, + 0x48, 0x49, 0x99, 0xdc, 0xc2, 0x87, 0x74, 0x17, 0x9b, 0xd0, 0xef, 0xf7, 0x7d, 0xfa, 0xf8, 0xfd, + 0xfe, 0x08, 0x6c, 0x8a, 0x53, 0xda, 0x8b, 0x88, 0x90, 0xe2, 0x94, 0x26, 0x41, 0x2f, 0xe1, 0x2c, 0xc4, 0x42, 0x30, 0x2e, 0xc6, 0x41, 0x4a, 0xa7, 0x63, 0xc2, 0xbc, 0x84, 0x33, 0xc9, 0xa0, 0x1b, - 0xb2, 0x70, 0xca, 0x19, 0x0a, 0x4f, 0x3c, 0x71, 0x46, 0xbd, 0x1c, 0xc1, 0xd3, 0x78, 0xe3, 0xee, - 0x29, 0x0b, 0x44, 0x4f, 0xfd, 0x49, 0x02, 0xfd, 0x2f, 0x43, 0x6c, 0xb8, 0x3a, 0x3b, 0x09, 0x7a, - 0x84, 0x3d, 0x3c, 0x66, 0x7c, 0x86, 0x64, 0x11, 0xf9, 0x48, 0x7d, 0x53, 0x9c, 0xd1, 0x00, 0x09, - 0xdc, 0x13, 0x92, 0xa7, 0xa1, 0x4c, 0x39, 0x8e, 0xf2, 0xe8, 0xbd, 0xe5, 0x8a, 0x90, 0xc0, 0x05, - 0x79, 0x2a, 0x09, 0xed, 0x9d, 0xd0, 0xb0, 0x27, 0xc9, 0x0c, 0x0b, 0x89, 0x66, 0x49, 0x1e, 0xb9, - 0x33, 0x61, 0x13, 0xa6, 0x8f, 0x3d, 0x75, 0xca, 0x6e, 0x3b, 0x6f, 0x4c, 0xb0, 0xde, 0x47, 0xe1, - 0xf4, 0x98, 0x50, 0x8a, 0xf9, 0x28, 0xc1, 0x21, 0xdc, 0x07, 0x96, 0x9c, 0x27, 0xd8, 0x35, 0xda, - 0x46, 0x77, 0x7d, 0xfb, 0xa1, 0xb7, 0xec, 0x81, 0xde, 0x22, 0xce, 0x3b, 0x9c, 0x27, 0xb8, 0x6f, - 0x5d, 0x5c, 0x6e, 0x56, 0x7c, 0x4d, 0x00, 0xfb, 0xc0, 0x96, 0x28, 0xa0, 0xd8, 0xad, 0xb6, 0x8d, - 0x6e, 0x63, 0x7b, 0xeb, 0x2d, 0xa6, 0xfc, 0xa1, 0xde, 0xa1, 0xca, 0xd9, 0xc5, 0x22, 0xe4, 0x24, - 0x91, 0x8c, 0xe7, 0x14, 0x19, 0x14, 0x3e, 0x05, 0xb6, 0x48, 0x50, 0x2c, 0x5c, 0xb3, 0x6d, 0x76, - 0x1b, 0xdb, 0x9f, 0x2d, 0x57, 0xa3, 0x69, 0x7c, 0x8c, 0x22, 0x25, 0x07, 0xc5, 0x05, 0x8d, 0x46, - 0xc3, 0xaf, 0x80, 0x13, 0xa5, 0x1c, 0x49, 0xc2, 0x62, 0xd7, 0x6a, 0x1b, 0x5d, 0xb3, 0xff, 0x81, - 0x0a, 0xbf, 0xb9, 0xdc, 0x5c, 0x53, 0x3e, 0x79, 0xbb, 0x79, 0xd0, 0xbf, 0x49, 0x83, 0xf7, 0x00, - 0x08, 0x4f, 0xd2, 0x78, 0x3a, 0x16, 0xe4, 0x25, 0x76, 0x6d, 0x0d, 0xca, 0x38, 0xeb, 0xfa, 0x7e, - 0x44, 0x5e, 0x62, 0x78, 0x00, 0x56, 0x99, 0x3c, 0xc1, 0x7c, 0xac, 0xd5, 0x0a, 0xb7, 0xa6, 0x55, - 0xde, 0xee, 0xa5, 0x0d, 0xcd, 0xa0, 0x63, 0x02, 0x3e, 0x01, 0x0e, 0xc7, 0x28, 0xfa, 0x5e, 0x1c, - 0x1c, 0xbb, 0x2b, 0xda, 0xb6, 0x8f, 0x4b, 0x64, 0xaa, 0xb8, 0xde, 0x09, 0x0d, 0xbd, 0xc3, 0xa2, - 0xb8, 0x39, 0xc7, 0x0d, 0xa8, 0xf3, 0x00, 0x58, 0xaa, 0x10, 0xb0, 0x01, 0x56, 0x06, 0xf1, 0x0b, - 0x44, 0x49, 0xd4, 0xac, 0x40, 0x00, 0x6a, 0x3b, 0x8c, 0xa6, 0xb3, 0xb8, 0x69, 0xc0, 0x3a, 0xb0, - 0x07, 0x71, 0x84, 0x7f, 0x6b, 0x56, 0x3b, 0xe7, 0xa0, 0x31, 0x64, 0xc1, 0x73, 0xce, 0x26, 0x1c, - 0x0b, 0x01, 0x3f, 0x01, 0xb5, 0x53, 0x16, 0x8c, 0x49, 0xa4, 0x4b, 0x6f, 0xf6, 0xd7, 0x14, 0xf5, - 0xd5, 0xe5, 0xa6, 0x3d, 0x64, 0xc1, 0x60, 0xd7, 0xb7, 0x4f, 0x59, 0x30, 0x88, 0x60, 0x17, 0xac, - 0x86, 0x2c, 0x96, 0x9c, 0x04, 0xa9, 0xb6, 0x53, 0x15, 0xb7, 0x9a, 0xcb, 0x58, 0x88, 0x40, 0x17, - 0x58, 0x82, 0x32, 0xe9, 0x9a, 0x6d, 0xa3, 0x6b, 0x17, 0x9d, 0xa1, 0x6e, 0x3a, 0x7f, 0xd8, 0x00, - 0xaa, 0x52, 0x0d, 0x66, 0x09, 0xe3, 0x72, 0x17, 0x49, 0xa4, 0x3b, 0xef, 0x3e, 0x68, 0x08, 0x34, - 0x4b, 0x28, 0xce, 0x3c, 0xaf, 0x96, 0x70, 0x20, 0x0b, 0x68, 0xd3, 0xf7, 0x81, 0x93, 0xe4, 0x9a, - 0xdd, 0x9a, 0xf6, 0xe8, 0xfe, 0xf2, 0xb6, 0x28, 0x3d, 0xb0, 0xf0, 0xaa, 0x00, 0xc3, 0x7d, 0x60, - 0xa6, 0x9c, 0xb8, 0x2b, 0xba, 0x68, 0x5f, 0x2f, 0xe7, 0x78, 0x57, 0xaa, 0xf7, 0x33, 0x27, 0x4f, - 0x63, 0xc9, 0xe7, 0xbe, 0x62, 0x80, 0x8f, 0x41, 0x2d, 0x9b, 0x64, 0xd7, 0xd1, 0x7a, 0x36, 0x4b, - 0x5c, 0xf9, 0xb4, 0x7b, 0x83, 0x83, 0x3d, 0x42, 0xf1, 0x9e, 0x4e, 0xcb, 0x95, 0xe4, 0x20, 0x78, - 0x04, 0x6a, 0x79, 0xff, 0xd4, 0xb5, 0x94, 0x47, 0xb7, 0x92, 0x92, 0x75, 0x8e, 0x56, 0xa3, 0x79, - 0x0d, 0x3f, 0x67, 0x83, 0x4f, 0xc0, 0x87, 0x62, 0x4a, 0x92, 0xf1, 0x8c, 0x08, 0x41, 0xe2, 0xc9, - 0xf8, 0x98, 0x71, 0x4c, 0x26, 0xf1, 0x78, 0x8a, 0xe7, 0xc2, 0x05, 0x6d, 0xa3, 0xeb, 0xe4, 0x42, - 0xee, 0xaa, 0xb4, 0x9f, 0xb2, 0xac, 0xbd, 0x2c, 0xe9, 0x07, 0x3c, 0x17, 0xf0, 0x01, 0x58, 0x3b, - 0x47, 0x94, 0xaa, 0x11, 0x79, 0x86, 0x62, 0x26, 0xdc, 0x46, 0x69, 0x0c, 0x16, 0x43, 0xf0, 0x0b, - 0xb0, 0x4e, 0xe2, 0x09, 0x16, 0x72, 0x97, 0x70, 0x1c, 0x4a, 0x3a, 0x77, 0x57, 0x4b, 0x5f, 0x78, - 0x2b, 0xb6, 0x81, 0x40, 0xa3, 0xa4, 0x1b, 0x36, 0x81, 0x39, 0xc5, 0x73, 0xdd, 0x77, 0x75, 0x5f, - 0x1d, 0xe1, 0x77, 0xc0, 0x7e, 0x81, 0x68, 0x7a, 0xcb, 0xe5, 0xe1, 0x67, 0xa0, 0x6f, 0xab, 0x8f, - 0x8c, 0x8d, 0x6f, 0x80, 0x53, 0x54, 0xa9, 0xcc, 0x6f, 0x67, 0xfc, 0x77, 0xca, 0xfc, 0xf5, 0x12, - 0x6e, 0x68, 0x39, 0x46, 0xb3, 0x3a, 0xb4, 0x1c, 0xb3, 0x69, 0x0d, 0x2d, 0xc7, 0x6a, 0xda, 0x43, - 0xcb, 0xb1, 0x9b, 0xb5, 0xce, 0x9f, 0x55, 0xb0, 0x36, 0x1a, 0x1d, 0xfe, 0xc2, 0x89, 0xcc, 0x37, - 0xe5, 0x16, 0x68, 0x44, 0x58, 0x48, 0x12, 0x67, 0x8b, 0x45, 0xab, 0x2f, 0x86, 0xba, 0x14, 0x78, - 0xd7, 0x46, 0x73, 0xb9, 0x8d, 0x3f, 0x16, 0x0b, 0xcf, 0xd2, 0xad, 0xf0, 0xe5, 0xf2, 0x56, 0x58, - 0xd0, 0xe2, 0xa9, 0x9d, 0xf7, 0x0c, 0xcd, 0xf0, 0xe2, 0xde, 0x2b, 0x8f, 0x8a, 0xfd, 0x1e, 0xa3, - 0xa2, 0xcc, 0x2c, 0xbe, 0xa0, 0xe6, 0x3a, 0x46, 0x33, 0xbc, 0xf0, 0x5e, 0x7d, 0xa3, 0x6c, 0xc6, - 0x71, 0xa4, 0x2d, 0x5d, 0xf5, 0xd5, 0x71, 0x68, 0x39, 0xd5, 0xa6, 0xd9, 0xf9, 0xcb, 0x00, 0x6b, - 0x3b, 0xa3, 0xa3, 0xff, 0x61, 0xdd, 0xa7, 0x60, 0x55, 0x31, 0x8f, 0x13, 0x24, 0x25, 0xe6, 0xd9, - 0xb6, 0xb9, 0x49, 0x54, 0x91, 0xe7, 0x59, 0x00, 0x3e, 0x06, 0x2b, 0x2c, 0x51, 0x90, 0xcc, 0xdd, - 0xc5, 0xbd, 0x59, 0xcc, 0xe0, 0xce, 0xe8, 0xe8, 0x20, 0x4b, 0xca, 0x29, 0x0a, 0xcc, 0x7f, 0xdb, - 0x9e, 0xb3, 0x73, 0x91, 0xff, 0x44, 0x94, 0xb7, 0xbd, 0xcf, 0xce, 0x45, 0xff, 0xf3, 0x8b, 0x7f, - 0x5a, 0x95, 0x8b, 0xab, 0x96, 0xf1, 0xea, 0xaa, 0x65, 0xbc, 0xbe, 0x6a, 0x19, 0x7f, 0x5f, 0xb5, - 0x8c, 0xdf, 0xaf, 0x5b, 0x95, 0x57, 0xd7, 0xad, 0xca, 0xeb, 0xeb, 0x56, 0xe5, 0xd7, 0xfa, 0xcd, - 0xef, 0xf3, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x95, 0x32, 0xc5, 0xd2, 0x41, 0x08, 0x00, 0x00, + 0xb2, 0x70, 0xca, 0x19, 0x0a, 0x8f, 0x3d, 0x71, 0x4a, 0xbd, 0x1c, 0xc1, 0xd3, 0x78, 0xfd, 0xce, + 0x09, 0x0b, 0x44, 0x4f, 0xfd, 0x24, 0x81, 0xfe, 0xcb, 0x10, 0xeb, 0xae, 0xce, 0x4e, 0x82, 0x1e, + 0x61, 0x0f, 0x8e, 0x18, 0x9f, 0x21, 0x59, 0x44, 0x3e, 0x56, 0xef, 0x14, 0xa7, 0x34, 0x40, 0x02, + 0xf7, 0x84, 0xe4, 0x69, 0x28, 0x53, 0x8e, 0xa3, 0x3c, 0x7a, 0x77, 0xb9, 0x22, 0x24, 0x70, 0x41, + 0x9e, 0x4a, 0x42, 0x7b, 0xc7, 0x34, 0xec, 0x49, 0x32, 0xc3, 0x42, 0xa2, 0x59, 0x92, 0x47, 0x6e, + 0x4f, 0xd8, 0x84, 0xe9, 0x63, 0x4f, 0x9d, 0xb2, 0xa7, 0x9d, 0x37, 0x26, 0xb8, 0xd5, 0x47, 0xe1, + 0xf4, 0x88, 0x50, 0x8a, 0xf9, 0x28, 0xc1, 0x21, 0xdc, 0x03, 0x96, 0x9c, 0x27, 0xd8, 0x35, 0xda, + 0x46, 0xf7, 0xd6, 0xd6, 0x03, 0x6f, 0xd9, 0x05, 0xbd, 0x45, 0x9c, 0x77, 0x30, 0x4f, 0x70, 0xdf, + 0x3a, 0xbf, 0xd8, 0xa8, 0xf8, 0x9a, 0x00, 0xf6, 0x81, 0x2d, 0x51, 0x40, 0xb1, 0x5b, 0x6d, 0x1b, + 0xdd, 0xc6, 0xd6, 0xe6, 0x5b, 0x4c, 0xf9, 0x45, 0xbd, 0x03, 0x95, 0xb3, 0x83, 0x45, 0xc8, 0x49, + 0x22, 0x19, 0xcf, 0x29, 0x32, 0x28, 0x7c, 0x02, 0x6c, 0x91, 0xa0, 0x58, 0xb8, 0x66, 0xdb, 0xec, + 0x36, 0xb6, 0x3e, 0x5f, 0xae, 0x46, 0xd3, 0xf8, 0x18, 0x45, 0x4a, 0x0e, 0x8a, 0x0b, 0x1a, 0x8d, + 0x86, 0x5f, 0x03, 0x27, 0x4a, 0x39, 0x92, 0x84, 0xc5, 0xae, 0xd5, 0x36, 0xba, 0x66, 0xff, 0x43, + 0x15, 0x7e, 0x73, 0xb1, 0xb1, 0xa6, 0x7c, 0xf2, 0x76, 0xf2, 0xa0, 0x7f, 0x9d, 0x06, 0xef, 0x02, + 0x10, 0x1e, 0xa7, 0xf1, 0x74, 0x2c, 0xc8, 0x4b, 0xec, 0xda, 0x1a, 0x94, 0x71, 0xd6, 0xf5, 0xf3, + 0x11, 0x79, 0x89, 0xe1, 0x3e, 0x58, 0x65, 0xf2, 0x18, 0xf3, 0xb1, 0x56, 0x2b, 0xdc, 0x9a, 0x56, + 0x79, 0xb3, 0x9b, 0x36, 0x34, 0x83, 0x8e, 0x09, 0xf8, 0x18, 0x38, 0x1c, 0xa3, 0xe8, 0x07, 0xb1, + 0x7f, 0xe4, 0xae, 0x68, 0xdb, 0x3e, 0x29, 0x91, 0xa9, 0xe2, 0x7a, 0xc7, 0x34, 0xf4, 0x0e, 0x8a, + 0xe2, 0xe6, 0x1c, 0xd7, 0xa0, 0xce, 0x7d, 0x60, 0xa9, 0x42, 0xc0, 0x06, 0x58, 0x19, 0xc4, 0x2f, + 0x10, 0x25, 0x51, 0xb3, 0x02, 0x01, 0xa8, 0x6d, 0x33, 0x9a, 0xce, 0xe2, 0xa6, 0x01, 0xeb, 0xc0, + 0x1e, 0xc4, 0x11, 0xfe, 0xad, 0x59, 0xed, 0x9c, 0x81, 0xc6, 0x90, 0x05, 0xcf, 0x38, 0x9b, 0x70, + 0x2c, 0x04, 0xfc, 0x14, 0xd4, 0x4e, 0x58, 0x30, 0x26, 0x91, 0x2e, 0xbd, 0xd9, 0x5f, 0x53, 0xd4, + 0x97, 0x17, 0x1b, 0xf6, 0x90, 0x05, 0x83, 0x1d, 0xdf, 0x3e, 0x61, 0xc1, 0x20, 0x82, 0x5d, 0xb0, + 0x1a, 0xb2, 0x58, 0x72, 0x12, 0xa4, 0xda, 0x4e, 0x55, 0xdc, 0x6a, 0x2e, 0x63, 0x21, 0x02, 0x5d, + 0x60, 0x09, 0xca, 0xa4, 0x6b, 0xb6, 0x8d, 0xae, 0x5d, 0x74, 0x86, 0x7a, 0xd2, 0xf9, 0xc3, 0x06, + 0x50, 0x95, 0x6a, 0x30, 0x4b, 0x18, 0x97, 0x3b, 0x48, 0x22, 0xdd, 0x79, 0xf7, 0x40, 0x43, 0xa0, + 0x59, 0x42, 0x71, 0xe6, 0x79, 0xb5, 0x84, 0x03, 0x59, 0x40, 0x9b, 0xbe, 0x07, 0x9c, 0x24, 0xd7, + 0xec, 0xd6, 0xb4, 0x47, 0xf7, 0x96, 0xb7, 0x45, 0xe9, 0x82, 0x85, 0x57, 0x05, 0x18, 0xee, 0x01, + 0x33, 0xe5, 0xc4, 0x5d, 0xd1, 0x45, 0xfb, 0x66, 0x39, 0xc7, 0xbb, 0x52, 0xbd, 0x5f, 0x38, 0x79, + 0x12, 0x4b, 0x3e, 0xf7, 0x15, 0x03, 0x7c, 0x04, 0x6a, 0xd9, 0x24, 0xbb, 0x8e, 0xd6, 0xb3, 0x51, + 0xe2, 0xca, 0xa7, 0xdd, 0x1b, 0xec, 0xef, 0x12, 0x8a, 0x77, 0x75, 0x5a, 0xae, 0x24, 0x07, 0xc1, + 0x43, 0x50, 0xcb, 0xfb, 0xa7, 0xae, 0xa5, 0x3c, 0xbc, 0x91, 0x94, 0xac, 0x73, 0xb4, 0x1a, 0xcd, + 0x6b, 0xf8, 0x39, 0x1b, 0x7c, 0x0c, 0x3e, 0x12, 0x53, 0x92, 0x8c, 0x67, 0x44, 0x08, 0x12, 0x4f, + 0xc6, 0x47, 0x8c, 0x63, 0x32, 0x89, 0xc7, 0x53, 0x3c, 0x17, 0x2e, 0x68, 0x1b, 0x5d, 0x27, 0x17, + 0x72, 0x47, 0xa5, 0xfd, 0x9c, 0x65, 0xed, 0x66, 0x49, 0x3f, 0xe2, 0xb9, 0x80, 0xf7, 0xc1, 0xda, + 0x19, 0xa2, 0x54, 0x8d, 0xc8, 0x53, 0x14, 0x33, 0xe1, 0x36, 0x4a, 0x63, 0xb0, 0x18, 0x82, 0x5f, + 0x82, 0x5b, 0x24, 0x9e, 0x60, 0x21, 0x77, 0x08, 0xc7, 0xa1, 0xa4, 0x73, 0x77, 0xb5, 0xf4, 0x86, + 0xb7, 0x62, 0xeb, 0x08, 0x34, 0x4a, 0xba, 0x61, 0x13, 0x98, 0x53, 0x3c, 0xd7, 0x7d, 0x57, 0xf7, + 0xd5, 0x11, 0x7e, 0x0f, 0xec, 0x17, 0x88, 0xa6, 0x37, 0x5c, 0x1e, 0x7e, 0x06, 0xfa, 0xae, 0xfa, + 0xd0, 0x58, 0xff, 0x16, 0x38, 0x45, 0x95, 0xca, 0xfc, 0x76, 0xc6, 0x7f, 0xbb, 0xcc, 0x5f, 0x2f, + 0xe1, 0x86, 0x96, 0x63, 0x34, 0xab, 0x43, 0xcb, 0x31, 0x9b, 0xd6, 0xd0, 0x72, 0xac, 0xa6, 0x3d, + 0xb4, 0x1c, 0xbb, 0x59, 0xeb, 0xfc, 0x59, 0x05, 0x6b, 0xa3, 0xd1, 0xc1, 0x73, 0x4e, 0x64, 0xbe, + 0x29, 0x37, 0x41, 0x23, 0xc2, 0x42, 0x92, 0x38, 0x5b, 0x2c, 0x5a, 0x7d, 0x31, 0xd4, 0xa5, 0xc0, + 0xbb, 0x36, 0x9a, 0xcb, 0x6d, 0xfc, 0xa9, 0x58, 0x78, 0x96, 0x6e, 0x85, 0xaf, 0x96, 0xb7, 0xc2, + 0x82, 0x16, 0x4f, 0xed, 0xbc, 0xa7, 0x68, 0x86, 0x17, 0xf7, 0x5e, 0x79, 0x54, 0xec, 0xff, 0x31, + 0x2a, 0xca, 0xcc, 0xe2, 0x0d, 0x6a, 0xae, 0x63, 0x34, 0xc3, 0x0b, 0xf7, 0xd5, 0x4f, 0x94, 0xcd, + 0x38, 0x8e, 0xb4, 0xa5, 0xab, 0xbe, 0x3a, 0x0e, 0x2d, 0xa7, 0xda, 0x34, 0x3b, 0x7f, 0x19, 0x60, + 0x6d, 0x7b, 0x74, 0xf8, 0x1e, 0xd6, 0x7d, 0x06, 0x56, 0x15, 0xf3, 0x38, 0x41, 0x52, 0x62, 0x9e, + 0x6d, 0x9b, 0xeb, 0x44, 0x15, 0x79, 0x96, 0x05, 0xe0, 0x23, 0xb0, 0xc2, 0x12, 0x05, 0xc9, 0xdc, + 0x5d, 0xdc, 0x9b, 0xc5, 0x0c, 0x6e, 0x8f, 0x0e, 0xf7, 0xb3, 0xa4, 0x9c, 0xa2, 0xc0, 0xfc, 0xb7, + 0xed, 0x39, 0x3b, 0x13, 0xf9, 0x27, 0xa2, 0xbc, 0xed, 0x7d, 0x76, 0x26, 0x3a, 0xcf, 0xc1, 0x07, + 0xfd, 0x94, 0xaa, 0x73, 0xe9, 0x26, 0xd7, 0x5f, 0x39, 0xe3, 0xbd, 0xbf, 0x72, 0xfd, 0x2f, 0xce, + 0xff, 0x69, 0x55, 0xce, 0x2f, 0x5b, 0xc6, 0xab, 0xcb, 0x96, 0xf1, 0xfa, 0xb2, 0x65, 0xfc, 0x7d, + 0xd9, 0x32, 0x7e, 0xbf, 0x6a, 0x55, 0x5e, 0x5d, 0xb5, 0x2a, 0xaf, 0xaf, 0x5a, 0x95, 0x5f, 0xeb, + 0xd7, 0x1f, 0xfe, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x42, 0x60, 0xe5, 0xdd, 0x9a, 0x08, 0x00, + 0x00, } diff --git a/pkg/sql/distsqlpb/processors_bulk_io.proto b/pkg/sql/distsqlpb/processors_bulk_io.proto index 43c0eaf3c568..ff244cfc6bba 100644 --- a/pkg/sql/distsqlpb/processors_bulk_io.proto +++ b/pkg/sql/distsqlpb/processors_bulk_io.proto @@ -155,3 +155,9 @@ message CSVWriterSpec { // chunk_rows is num rows to write per file. 0 = no limit. optional int64 chunk_rows = 4 [(gogoproto.nullable) = false]; } + +// BulkRowWriterSpec is the specification for a processor that consumes rows and +// writes them to a target table using AddSSTable. It outputs a BulkOpSummary. +message BulkRowWriterSpec { + optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/distsqlrun/bulk_row_writer.go b/pkg/sql/distsqlrun/bulk_row_writer.go new file mode 100644 index 000000000000..8e5ea2b09020 --- /dev/null +++ b/pkg/sql/distsqlrun/bulk_row_writer.go @@ -0,0 +1,227 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package distsqlrun + +import ( + "context" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" +) + +// CTASPlanResultTypes is the result types for EXPORT plans. +var CTASPlanResultTypes = []types.T{ + *types.Bytes, // rows +} + +type bulkRowWriter struct { + flowCtx *FlowCtx + processorID int32 + batchIdxAtomic int64 + spec distsqlpb.BulkRowWriterSpec + input RowSource + out ProcOutputHelper + output RowReceiver + summary roachpb.BulkOpSummary +} + +var _ Processor = &bulkRowWriter{} + +func newBulkRowWriterProcessor( + flowCtx *FlowCtx, + processorID int32, + spec distsqlpb.BulkRowWriterSpec, + input RowSource, + output RowReceiver, +) (Processor, error) { + c := &bulkRowWriter{ + flowCtx: flowCtx, + processorID: processorID, + batchIdxAtomic: 0, + spec: spec, + input: input, + output: output, + } + if err := c.out.Init(&distsqlpb.PostProcessSpec{}, CTASPlanResultTypes, + flowCtx.NewEvalCtx(), output); err != nil { + return nil, err + } + return c, nil +} + +func (sp *bulkRowWriter) OutputTypes() []types.T { + return CTASPlanResultTypes +} + +func (sp *bulkRowWriter) ingestLoop(ctx context.Context, kvCh chan []roachpb.KeyValue) error { + writeTS := sp.spec.Table.CreateAsOfTime + const bufferSize, flushSize = 64 << 20, 16 << 20 + adder, err := sp.flowCtx.BulkAdder(ctx, sp.flowCtx.ClientDB, + bufferSize, flushSize, writeTS) + if err != nil { + return err + } + defer adder.Close(ctx) + + // ingestKvs drains kvs from the channel until it closes, ingesting them using + // the BulkAdder. It handles the required buffering/sorting/etc. + ingestKvs := func() error { + for kvBatch := range kvCh { + for _, kv := range kvBatch { + if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil { + if _, ok := err.(storagebase.DuplicateKeyError); ok { + return errors.WithStack(err) + } + return err + } + } + } + + if err := adder.Flush(ctx); err != nil { + if err, ok := err.(storagebase.DuplicateKeyError); ok { + return errors.WithStack(err) + } + return err + } + return nil + } + + // Drain the kvCh using the BulkAdder until it closes. + if err := ingestKvs(); err != nil { + return err + } + + sp.summary = adder.GetSummary() + return nil +} + +func (sp *bulkRowWriter) convertLoop( + ctx context.Context, kvCh chan []roachpb.KeyValue, conv *row.DatumRowConverter, +) error { + defer close(kvCh) + + done := false + alloc := &sqlbase.DatumAlloc{} + input := MakeNoMetadataRowSource(sp.input, sp.output) + typs := sp.input.OutputTypes() + + for { + var rows int64 + for { + row, err := input.NextRow() + if err != nil { + return err + } + if row == nil { + done = true + break + } + rows++ + + for i, ed := range row { + if ed.IsNull() { + conv.Datums[i] = tree.DNull + continue + } + if err := ed.EnsureDecoded(&typs[i], alloc); err != nil { + return err + } + conv.Datums[i] = ed.Datum + } + + // `conv.Row` uses these as arguments to GenerateUniqueID to generate + // hidden primary keys, when necessary. We want them to be ascending per + // to reduce overlap in the resulting kvs and non-conflicting (because + // of primary key uniqueness). The ids that come out of GenerateUniqueID + // are sorted by (fileIndex, rowIndex) and unique as long as the two + // inputs are a unique combo, so using the processor ID and a + // monotonically increasing batch index should do what we want. + if err := conv.Row(ctx, sp.processorID, sp.batchIdxAtomic); err != nil { + return err + } + atomic.AddInt64(&sp.batchIdxAtomic, 1) + } + if rows < 1 { + break + } + + if err := conv.SendBatch(ctx); err != nil { + return err + } + + if done { + break + } + } + + return nil +} + +func (sp *bulkRowWriter) Run(ctx context.Context) { + ctx, span := tracing.ChildSpan(ctx, "bulkRowWriter") + defer tracing.FinishSpan(span) + + var kvCh chan []roachpb.KeyValue + var g ctxgroup.Group + + // Create a new evalCtx per converter so each go routine gets its own + // collationenv, which can't be accessed in parallel. + evalCtx := sp.flowCtx.EvalCtx.Copy() + kvCh = make(chan []roachpb.KeyValue, 10) + + sp.input.Start(ctx) + + conv, err := row.NewDatumRowConverter(&sp.spec.Table, evalCtx, kvCh) + if err != nil { + DrainAndClose( + ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input) + return + } + if conv.EvalCtx.SessionData == nil { + panic("uninitialized session data") + } + + g = ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + return sp.ingestLoop(ctx, kvCh) + }) + g.GoCtx(func(ctx context.Context) error { + return sp.convertLoop(ctx, kvCh, conv) + }) + err = g.Wait() + + // Emit a row with the BulkOpSummary from the processor after the rows in the + // new table have been written. + if err == nil { + if countsBytes, marshalErr := protoutil.Marshal(&sp.summary); marshalErr != nil { + err = marshalErr + } else if cs, emitErr := sp.out.EmitRow(ctx, sqlbase.EncDatumRow{ + sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))), + }); emitErr != nil { + err = emitErr + } else if cs != NeedMoreRows { + err = errors.New("unexpected closure of consumer") + } + } + + DrainAndClose( + ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input) +} diff --git a/pkg/sql/distsqlrun/processors.go b/pkg/sql/distsqlrun/processors.go index 7b25cc29ca47..fb3af4053f14 100644 --- a/pkg/sql/distsqlrun/processors.go +++ b/pkg/sql/distsqlrun/processors.go @@ -1125,6 +1125,12 @@ func newProcessor( } return NewCSVWriterProcessor(flowCtx, processorID, *core.CSVWriter, inputs[0], outputs[0]) } + if core.BulkRowWriter != nil { + if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { + return nil, err + } + return newBulkRowWriterProcessor(flowCtx, processorID, *core.BulkRowWriter, inputs[0], outputs[0]) + } if core.MetadataTestSender != nil { if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { return nil, err From 3e1f7f70ebf57b4edfebab9c1724e8e7950f2246 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 21 Jun 2019 16:58:02 -0400 Subject: [PATCH 2/2] sql: Updated CTAS logic to utilize the BulkRowWriter processor. This change integrates the BulkRowWriter into the distsql PlanAndRun phase for CTAS statements. Release Note: None --- .../testdata/planner_test/show_trace | 6 +- .../opt/exec/execbuilder/testdata/show_trace | 6 +- pkg/sql/schema_changer.go | 153 ++++-------------- 3 files changed, 37 insertions(+), 128 deletions(-) diff --git a/pkg/sql/logictest/testdata/planner_test/show_trace b/pkg/sql/logictest/testdata/planner_test/show_trace index 9f386e4d72a6..d5444499241a 100644 --- a/pkg/sql/logictest/testdata/planner_test/show_trace +++ b/pkg/sql/logictest/testdata/planner_test/show_trace @@ -134,7 +134,7 @@ statement ok SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off query TT -SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE message NOT LIKE '%Z/%' AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%' @@ -145,8 +145,8 @@ WHERE message NOT LIKE '%Z/%' ---- [async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1 table reader Scan /Table/55/{1-2} -table reader fetched: /kv2/primary/...PK.../k/v -> /1/2 -flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4 +table reader fetched: /kv2/primary/1/k/v -> /1/2 +flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4 flow fast path completed exec cmd: exec stmt rows affected: 1 diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace b/pkg/sql/opt/exec/execbuilder/testdata/show_trace index aadc7c90c27e..d7326868a465 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace @@ -130,7 +130,7 @@ statement ok SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; SET tracing = off query TT -SELECT operation, regexp_replace(message, '(\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.)?\d\d\d\d\d+', '...PK...') as message +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE message NOT LIKE '%Z/%' AND message NOT SIMILAR TO '%(PushTxn|ResolveIntent|SystemConfigSpan)%' @@ -141,8 +141,8 @@ WHERE message NOT LIKE '%Z/%' ---- [async] kv.DistSender: sending pre-commit query intents r7: sending batch 1 QueryIntent to (n1,s1):1 table reader Scan /Table/55/{1-2} -table reader fetched: /kv2/primary/...PK.../k/v -> /1/2 -flow Put /Table/55/1/...PK.../0 -> /TUPLE/1:1:Int/1/1:2:Int/4 +table reader fetched: /kv2/primary/1/k/v -> /1/2 +flow Put /Table/55/1/1/0 -> /TUPLE/1:1:Int/1/1:2:Int/4 flow fast path completed exec cmd: exec stmt rows affected: 1 diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 2c381cf120c6..bad74cd1406b 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -29,21 +29,20 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/row" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -144,9 +143,7 @@ type SchemaChanger struct { clock *hlc.Clock settings *cluster.Settings execCfg *ExecutorConfig - // Placeholder information used by CTAS execution in the SchemaChanger. - placeholders *tree.PlaceholderInfo - ieFactory sqlutil.SessionBoundInternalExecutorFactory + ieFactory sqlutil.SessionBoundInternalExecutorFactory } // NewSchemaChangerForTesting only for tests. @@ -574,10 +571,7 @@ func (sc *SchemaChanger) maybeDropTable( // maybe backfill a created table by executing the AS query. Return nil if // successfully backfilled. func (sc *SchemaChanger) maybeBackfillCreateTableAs( - ctx context.Context, - table *sqlbase.TableDescriptor, - evalCtx *extendedEvalContext, - placeholders *tree.PlaceholderInfo, + ctx context.Context, table *sqlbase.TableDescriptor, evalCtx *extendedEvalContext, ) error { if table.Adding() && table.IsAs() { if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { @@ -612,21 +606,21 @@ func (sc *SchemaChanger) maybeBackfillCreateTableAs( } defer localPlanner.curPlan.close(ctx) - colTypes := make([]types.T, len(table.VisibleColumns())) - for i, t := range table.VisibleColumns() { - colTypes[i] = t.Type - } - ci := sqlbase.ColTypeInfoFromColTypes(colTypes) - rows := rowcontainer.NewRowContainer( - localPlanner.EvalContext().Mon.MakeBoundAccount(), ci, 0, - ) - defer rows.Close(ctx) - - rw := NewRowResultWriter(rows) + res := roachpb.BulkOpSummary{} + rw := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { + // TODO(adityamaru): Use the BulkOpSummary for either telemetry or to + // return to user. + var counts roachpb.BulkOpSummary + if err := protoutil.Unmarshal([]byte(*row[0].(*tree.DBytes)), &counts); err != nil { + return err + } + res.Add(counts) + return nil + }) recv := MakeDistSQLReceiver( ctx, rw, - stmt.AST.StatementType(), + tree.Rows, sc.execCfg.RangeDescriptorCache, sc.execCfg.LeaseHolderCache, txn, @@ -637,125 +631,40 @@ func (sc *SchemaChanger) maybeBackfillCreateTableAs( ) defer recv.Release() - planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, localPlanner.ExtendedEvalContext(), txn) rec, err := sc.distSQLPlanner.checkSupportForNode(localPlanner.curPlan.plan) - planCtx.isLocal = err != nil || rec == cannotDistribute - planCtx.planner = localPlanner - planCtx.stmtType = recv.stmtType - var planAndRunErr error localPlanner.runWithOptions(resolveFlags{skipCache: true}, func() { + // Resolve subqueries before running the queries' physical plan. if len(localPlanner.curPlan.subqueryPlans) != 0 { if !sc.distSQLPlanner.PlanAndRunSubqueries( - planCtx.ctx, localPlanner, localPlanner.ExtendedEvalContextCopy, + ctx, localPlanner, localPlanner.ExtendedEvalContextCopy, localPlanner.curPlan.subqueryPlans, recv, rec == canDistribute, ) { - if planAndRunErr = rw.Err(); err != nil { + if planAndRunErr = rw.Err(); planAndRunErr != nil { return } - if recv.commErr != nil { - planAndRunErr = recv.commErr + if planAndRunErr = recv.commErr; planAndRunErr != nil { return } } } - // Copy the evalCtx as it might be modified. - evalCtxCopy := localPlanner.ExtendedEvalContextCopy() - sc.distSQLPlanner.PlanAndRun(ctx, evalCtxCopy, planCtx, txn, localPlanner.curPlan.plan, recv) - if recv.commErr != nil { - planAndRunErr = recv.commErr + isLocal := err != nil || rec == cannotDistribute + out := distsqlpb.ProcessorCoreUnion{BulkRowWriter: &distsqlpb.BulkRowWriterSpec{ + Table: *table, + }} + + PlanAndRunCTAS(ctx, sc.distSQLPlanner, localPlanner, + txn, isLocal, localPlanner.curPlan.plan, out, recv) + if planAndRunErr = rw.Err(); planAndRunErr != nil { return } - if rw.Err() != nil { - planAndRunErr = rw.Err() + if planAndRunErr = recv.commErr; planAndRunErr != nil { return } }) - if planAndRunErr != nil { - return planAndRunErr - } - - // This is a very simplified version of the INSERT logic: no CHECK - // expressions, no FK checks, no arbitrary insertion order, no - // RETURNING, etc. - - // Instantiate a row inserter and table writer. - ri, err := row.MakeInserter( - txn, - sqlbase.NewImmutableTableDescriptor(*table), - nil, - table.Columns, - row.SkipFKs, - &localPlanner.alloc) - if err != nil { - return err - } - ti := tableInserterPool.Get().(*tableInserter) - *ti = tableInserter{ri: ri} - tw := tableWriter(ti) - defer func() { - tw.close(ctx) - *ti = tableInserter{} - tableInserterPool.Put(ti) - }() - if err := tw.init(txn, localPlanner.EvalContext()); err != nil { - return err - } - - // Prepare the buffer for row values. At this point, one more - // column has been added by ensurePrimaryKey() to the list of - // columns stored in table.Columns. - rowBuffer := make(tree.Datums, len(table.Columns)) - pkColIdx := len(table.Columns) - 1 - - // The optimizer includes the rowID expression as part of the input - // expression. But the heuristic planner does not do this, so construct a - // rowID expression to be evaluated separately. - // - // TODO(adityamaru): This could be redundant as it is only required when - // the heuristic planner is used, but currently there is no way of knowing - // this from the SchemaChanger. - var defTypedExpr tree.TypedExpr - // Prepare the rowID expression. - defExprSQL := *table.Columns[pkColIdx].DefaultExpr - defExpr, err := parser.ParseExpr(defExprSQL) - if err != nil { - return err - } - defTypedExpr, err = localPlanner.analyzeExpr( - ctx, - defExpr, - nil, /*sources*/ - tree.IndexedVarHelper{}, - types.Any, - false, /*requireType*/ - "CREATE TABLE AS") - if err != nil { - return err - } - - for i := 0; i < rows.Len(); i++ { - copy(rowBuffer, rows.At(i)) - - rowBuffer[pkColIdx], err = defTypedExpr.Eval(localPlanner.EvalContext()) - if err != nil { - return err - } - - err := tw.row(ctx, rowBuffer, evalCtx.Tracing.KVTracingEnabled()) - if err != nil { - return err - } - } - - _, err = tw.finalize( - ctx, evalCtx.Tracing.KVTracingEnabled()) - if err != nil { - return err - } - return rw.Err() + return planAndRunErr }); err != nil { return err } @@ -1018,7 +927,7 @@ func (sc *SchemaChanger) exec( return err } - if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc, evalCtx, sc.placeholders); err != nil { + if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc, evalCtx); err != nil { return err }