From 3003a79bac4bce0cfa7452277c3e8ccfd7079dbf Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 21 Nov 2019 10:33:13 -0500 Subject: [PATCH 1/3] sql: stop observing the CommitTimestamp in TRUNCATE In #40581 we stopped observing the commit timestamp to write it into table descriptors. In this change I overlooked (rather forgot) about this additional place in the code where we observed the commit timestamp. As far as I can tell we don't read this field anywhere ever. Furthermore we know that the the table descriptor in question to which we are referring must be alive and equal to the provided value at the timestamp at which it was read due to serializability. In short, this minor change continues to populate the field with a sensible value and will permit TRUNCATE to be pushed. Fixes #41566. Release note (bug fix): Long running transactions which attempt to TRUNCATE can now be pushed and will commit in cases where they previously could fail or retry forever. --- pkg/sql/sqlbase/structured.pb.go | 80 +++++++++++++++++--------------- pkg/sql/sqlbase/structured.proto | 6 +++ pkg/sql/truncate.go | 5 +- 3 files changed, 53 insertions(+), 38 deletions(-) diff --git a/pkg/sql/sqlbase/structured.pb.go b/pkg/sql/sqlbase/structured.pb.go index b2de58d3460d..d2ae66d7d785 100644 --- a/pkg/sql/sqlbase/structured.pb.go +++ b/pkg/sql/sqlbase/structured.pb.go @@ -73,7 +73,7 @@ func (x *ConstraintValidity) UnmarshalJSON(data []byte) error { return nil } func (ConstraintValidity) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{0} + return fileDescriptor_structured_d9b38487027f89a8, []int{0} } type ForeignKeyReference_Action int32 @@ -118,7 +118,7 @@ func (x *ForeignKeyReference_Action) UnmarshalJSON(data []byte) error { return nil } func (ForeignKeyReference_Action) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{0, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{0, 0} } // Match is the algorithm used to compare composite keys. @@ -158,7 +158,7 @@ func (x *ForeignKeyReference_Match) UnmarshalJSON(data []byte) error { return nil } func (ForeignKeyReference_Match) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{0, 1} + return fileDescriptor_structured_d9b38487027f89a8, []int{0, 1} } // The direction of a column in the index. @@ -195,7 +195,7 @@ func (x *IndexDescriptor_Direction) UnmarshalJSON(data []byte) error { return nil } func (IndexDescriptor_Direction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{6, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{6, 0} } // The type of the index. @@ -232,7 +232,7 @@ func (x *IndexDescriptor_Type) UnmarshalJSON(data []byte) error { return nil } func (IndexDescriptor_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{6, 1} + return fileDescriptor_structured_d9b38487027f89a8, []int{6, 1} } type ConstraintToUpdate_ConstraintType int32 @@ -275,7 +275,7 @@ func (x *ConstraintToUpdate_ConstraintType) UnmarshalJSON(data []byte) error { return nil } func (ConstraintToUpdate_ConstraintType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{7, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{7, 0} } // A descriptor within a mutation is unavailable for reads, writes @@ -340,7 +340,7 @@ func (x *DescriptorMutation_State) UnmarshalJSON(data []byte) error { return nil } func (DescriptorMutation_State) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{8, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{8, 0} } // Direction of mutation. @@ -383,7 +383,7 @@ func (x *DescriptorMutation_Direction) UnmarshalJSON(data []byte) error { return nil } func (DescriptorMutation_Direction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{8, 1} + return fileDescriptor_structured_d9b38487027f89a8, []int{8, 1} } // State is set if this TableDescriptor is in the process of being added or deleted. @@ -434,7 +434,7 @@ func (x *TableDescriptor_State) UnmarshalJSON(data []byte) error { return nil } func (TableDescriptor_State) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 0} } // AuditMode indicates which auditing actions to take when this table is used. @@ -471,7 +471,7 @@ func (x *TableDescriptor_AuditMode) UnmarshalJSON(data []byte) error { return nil } func (TableDescriptor_AuditMode) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 1} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 1} } type ForeignKeyReference struct { @@ -493,7 +493,7 @@ func (m *ForeignKeyReference) Reset() { *m = ForeignKeyReference{} } func (m *ForeignKeyReference) String() string { return proto.CompactTextString(m) } func (*ForeignKeyReference) ProtoMessage() {} func (*ForeignKeyReference) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{0} + return fileDescriptor_structured_d9b38487027f89a8, []int{0} } func (m *ForeignKeyReference) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -561,7 +561,7 @@ func (m *ForeignKeyConstraint) Reset() { *m = ForeignKeyConstraint{} } func (m *ForeignKeyConstraint) String() string { return proto.CompactTextString(m) } func (*ForeignKeyConstraint) ProtoMessage() {} func (*ForeignKeyConstraint) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{1} + return fileDescriptor_structured_d9b38487027f89a8, []int{1} } func (m *ForeignKeyConstraint) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -608,7 +608,7 @@ func (m *ColumnDescriptor) Reset() { *m = ColumnDescriptor{} } func (m *ColumnDescriptor) String() string { return proto.CompactTextString(m) } func (*ColumnDescriptor) ProtoMessage() {} func (*ColumnDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{2} + return fileDescriptor_structured_d9b38487027f89a8, []int{2} } func (m *ColumnDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -664,7 +664,7 @@ func (m *ColumnFamilyDescriptor) Reset() { *m = ColumnFamilyDescriptor{} func (m *ColumnFamilyDescriptor) String() string { return proto.CompactTextString(m) } func (*ColumnFamilyDescriptor) ProtoMessage() {} func (*ColumnFamilyDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{3} + return fileDescriptor_structured_d9b38487027f89a8, []int{3} } func (m *ColumnFamilyDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -710,7 +710,7 @@ func (m *InterleaveDescriptor) Reset() { *m = InterleaveDescriptor{} } func (m *InterleaveDescriptor) String() string { return proto.CompactTextString(m) } func (*InterleaveDescriptor) ProtoMessage() {} func (*InterleaveDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{4} + return fileDescriptor_structured_d9b38487027f89a8, []int{4} } func (m *InterleaveDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -754,7 +754,7 @@ func (m *InterleaveDescriptor_Ancestor) Reset() { *m = InterleaveDescrip func (m *InterleaveDescriptor_Ancestor) String() string { return proto.CompactTextString(m) } func (*InterleaveDescriptor_Ancestor) ProtoMessage() {} func (*InterleaveDescriptor_Ancestor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{4, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{4, 0} } func (m *InterleaveDescriptor_Ancestor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -799,7 +799,7 @@ func (m *PartitioningDescriptor) Reset() { *m = PartitioningDescriptor{} func (m *PartitioningDescriptor) String() string { return proto.CompactTextString(m) } func (*PartitioningDescriptor) ProtoMessage() {} func (*PartitioningDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{5} + return fileDescriptor_structured_d9b38487027f89a8, []int{5} } func (m *PartitioningDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -842,7 +842,7 @@ func (m *PartitioningDescriptor_List) Reset() { *m = PartitioningDescrip func (m *PartitioningDescriptor_List) String() string { return proto.CompactTextString(m) } func (*PartitioningDescriptor_List) ProtoMessage() {} func (*PartitioningDescriptor_List) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{5, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{5, 0} } func (m *PartitioningDescriptor_List) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -887,7 +887,7 @@ func (m *PartitioningDescriptor_Range) Reset() { *m = PartitioningDescri func (m *PartitioningDescriptor_Range) String() string { return proto.CompactTextString(m) } func (*PartitioningDescriptor_Range) ProtoMessage() {} func (*PartitioningDescriptor_Range) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{5, 1} + return fileDescriptor_structured_d9b38487027f89a8, []int{5, 1} } func (m *PartitioningDescriptor_Range) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1023,7 +1023,7 @@ func (m *IndexDescriptor) Reset() { *m = IndexDescriptor{} } func (m *IndexDescriptor) String() string { return proto.CompactTextString(m) } func (*IndexDescriptor) ProtoMessage() {} func (*IndexDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{6} + return fileDescriptor_structured_d9b38487027f89a8, []int{6} } func (m *IndexDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1074,7 +1074,7 @@ func (m *ConstraintToUpdate) Reset() { *m = ConstraintToUpdate{} } func (m *ConstraintToUpdate) String() string { return proto.CompactTextString(m) } func (*ConstraintToUpdate) ProtoMessage() {} func (*ConstraintToUpdate) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{7} + return fileDescriptor_structured_d9b38487027f89a8, []int{7} } func (m *ConstraintToUpdate) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1128,7 +1128,7 @@ func (m *DescriptorMutation) Reset() { *m = DescriptorMutation{} } func (m *DescriptorMutation) String() string { return proto.CompactTextString(m) } func (*DescriptorMutation) ProtoMessage() {} func (*DescriptorMutation) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{8} + return fileDescriptor_structured_d9b38487027f89a8, []int{8} } func (m *DescriptorMutation) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1440,7 +1440,7 @@ func (m *TableDescriptor) Reset() { *m = TableDescriptor{} } func (m *TableDescriptor) String() string { return proto.CompactTextString(m) } func (*TableDescriptor) ProtoMessage() {} func (*TableDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9} + return fileDescriptor_structured_d9b38487027f89a8, []int{9} } func (m *TableDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1732,7 +1732,7 @@ func (m *TableDescriptor_SchemaChangeLease) Reset() { *m = TableDescript func (m *TableDescriptor_SchemaChangeLease) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_SchemaChangeLease) ProtoMessage() {} func (*TableDescriptor_SchemaChangeLease) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 0} } func (m *TableDescriptor_SchemaChangeLease) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1770,7 +1770,7 @@ func (m *TableDescriptor_CheckConstraint) Reset() { *m = TableDescriptor func (m *TableDescriptor_CheckConstraint) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_CheckConstraint) ProtoMessage() {} func (*TableDescriptor_CheckConstraint) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 1} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 1} } func (m *TableDescriptor_CheckConstraint) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1873,7 +1873,7 @@ func (m *TableDescriptor_NameInfo) Reset() { *m = TableDescriptor_NameIn func (m *TableDescriptor_NameInfo) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_NameInfo) ProtoMessage() {} func (*TableDescriptor_NameInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 2} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 2} } func (m *TableDescriptor_NameInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1913,7 +1913,7 @@ func (m *TableDescriptor_Reference) Reset() { *m = TableDescriptor_Refer func (m *TableDescriptor_Reference) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_Reference) ProtoMessage() {} func (*TableDescriptor_Reference) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 3} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 3} } func (m *TableDescriptor_Reference) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1950,7 +1950,7 @@ func (m *TableDescriptor_MutationJob) Reset() { *m = TableDescriptor_Mut func (m *TableDescriptor_MutationJob) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_MutationJob) ProtoMessage() {} func (*TableDescriptor_MutationJob) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 4} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 4} } func (m *TableDescriptor_MutationJob) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1993,7 +1993,7 @@ func (m *TableDescriptor_SequenceOpts) Reset() { *m = TableDescriptor_Se func (m *TableDescriptor_SequenceOpts) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_SequenceOpts) ProtoMessage() {} func (*TableDescriptor_SequenceOpts) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 5} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 5} } func (m *TableDescriptor_SequenceOpts) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2033,7 +2033,7 @@ func (m *TableDescriptor_SequenceOpts_SequenceOwner) String() string { } func (*TableDescriptor_SequenceOpts_SequenceOwner) ProtoMessage() {} func (*TableDescriptor_SequenceOpts_SequenceOwner) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 5, 0} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 5, 0} } func (m *TableDescriptor_SequenceOpts_SequenceOwner) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2059,7 +2059,13 @@ func (m *TableDescriptor_SequenceOpts_SequenceOwner) XXX_DiscardUnknown() { var xxx_messageInfo_TableDescriptor_SequenceOpts_SequenceOwner proto.InternalMessageInfo type TableDescriptor_Replacement struct { - ID ID `protobuf:"varint,1,opt,name=id,casttype=ID" json:"id"` + ID ID `protobuf:"varint,1,opt,name=id,casttype=ID" json:"id"` + // Time is just used for debugging purposes. It is not used in business + // logic. It is an HLC rather than just wall time only for historical + // reasons. Prior to 20.1 it was populated with the commit timestamp of the + // transaction which created this replacement. In 20.1 and after it is + // populated with the read timestamp at which the descriptor being + // replaced was read. Time hlc.Timestamp `protobuf:"bytes,2,opt,name=time" json:"time"` } @@ -2067,7 +2073,7 @@ func (m *TableDescriptor_Replacement) Reset() { *m = TableDescriptor_Rep func (m *TableDescriptor_Replacement) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_Replacement) ProtoMessage() {} func (*TableDescriptor_Replacement) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 6} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 6} } func (m *TableDescriptor_Replacement) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2104,7 +2110,7 @@ func (m *TableDescriptor_GCDescriptorMutation) Reset() { *m = TableDescr func (m *TableDescriptor_GCDescriptorMutation) String() string { return proto.CompactTextString(m) } func (*TableDescriptor_GCDescriptorMutation) ProtoMessage() {} func (*TableDescriptor_GCDescriptorMutation) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{9, 7} + return fileDescriptor_structured_d9b38487027f89a8, []int{9, 7} } func (m *TableDescriptor_GCDescriptorMutation) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2143,7 +2149,7 @@ func (m *DatabaseDescriptor) Reset() { *m = DatabaseDescriptor{} } func (m *DatabaseDescriptor) String() string { return proto.CompactTextString(m) } func (*DatabaseDescriptor) ProtoMessage() {} func (*DatabaseDescriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{10} + return fileDescriptor_structured_d9b38487027f89a8, []int{10} } func (m *DatabaseDescriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2201,7 +2207,7 @@ func (m *Descriptor) Reset() { *m = Descriptor{} } func (m *Descriptor) String() string { return proto.CompactTextString(m) } func (*Descriptor) ProtoMessage() {} func (*Descriptor) Descriptor() ([]byte, []int) { - return fileDescriptor_structured_50daf820b4f615f7, []int{11} + return fileDescriptor_structured_d9b38487027f89a8, []int{11} } func (m *Descriptor) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -11156,10 +11162,10 @@ var ( ) func init() { - proto.RegisterFile("sql/sqlbase/structured.proto", fileDescriptor_structured_50daf820b4f615f7) + proto.RegisterFile("sql/sqlbase/structured.proto", fileDescriptor_structured_d9b38487027f89a8) } -var fileDescriptor_structured_50daf820b4f615f7 = []byte{ +var fileDescriptor_structured_d9b38487027f89a8 = []byte{ // 3473 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x5a, 0xcb, 0x6f, 0x23, 0x47, 0x7a, 0x57, 0xf3, 0xcd, 0x8f, 0xaf, 0x66, 0x69, 0x66, 0x4c, 0xcb, 0x5e, 0x91, 0xa2, 0x3d, 0xb6, diff --git a/pkg/sql/sqlbase/structured.proto b/pkg/sql/sqlbase/structured.proto index beb3e9a27ed7..3dca04a2b4e0 100644 --- a/pkg/sql/sqlbase/structured.proto +++ b/pkg/sql/sqlbase/structured.proto @@ -799,6 +799,12 @@ message TableDescriptor { message Replacement { option (gogoproto.equal) = true; optional uint32 id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "ID", (gogoproto.casttype) = "ID"]; + // Time is just used for debugging purposes. It is not used in business + // logic. It is an HLC rather than just wall time only for historical + // reasons. Prior to 20.1 it was populated with the commit timestamp of the + // transaction which created this replacement. In 20.1 and after it is + // populated with the read timestamp at which the descriptor being + // replaced was read. optional util.hlc.Timestamp time = 2 [(gogoproto.nullable) = false]; } diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index 0f8fa49b1488..6f720d02f2c3 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -190,7 +190,10 @@ func (p *planner) truncateTable( tableDesc.DropJobID = dropJobID newTableDesc := sqlbase.NewMutableCreatedTableDescriptor(tableDesc.TableDescriptor) newTableDesc.ReplacementOf = sqlbase.TableDescriptor_Replacement{ - ID: id, Time: p.txn.CommitTimestamp(), + ID: id, + // NB: Time is just used for debugging purposes. See the comment on the + // field for more details. + Time: p.txn.ReadTimestamp(), } newTableDesc.SetID(0) newTableDesc.Version = 1 From 93a806292f07cac2e52df2bcc58bf226dd3e27bd Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 25 Nov 2019 13:12:28 -0500 Subject: [PATCH 2/3] roachtest/cdc: fix cdc/schemareg expected output Partial fix for #42690. I don't think this ever worked. The schema change backfill for the dropped column `b` results in a record for each row that was previously inserted. --- pkg/cmd/roachtest/cdc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 7c67c87b76c1..b0eef6e1de3f 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -453,8 +453,11 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) { `{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`, `{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`, `{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":1},"b":null,"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, `{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":2},"b":{"string":"2"},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, `{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, `{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, } if strings.Join(expected, "\n") != strings.Join(updated, "\n") { From 70443483ec46b70fc5c1e8bfaed3bdaf63bc8e77 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 25 Nov 2019 13:13:09 -0500 Subject: [PATCH 3/3] roachtest/cdc: fix < 19.2 compat for cdc/bank and cdc/schemareg Fixes #42690. Fixes #41177. This was broken by #41793. --- pkg/cmd/roachtest/cdc.go | 71 ++++++++++++++++++++++++++++------------ 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index b0eef6e1de3f..8904b012de24 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -275,9 +275,16 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) { ); err != nil { t.Fatal(err) } + + // NB: the WITH diff option was not supported until v20.1. + withDiff := t.IsBuildVersion("v20.1.0") + var opts = []string{`updated`, `resolved`} + if withDiff { + opts = append(opts, `diff`) + } var jobID string if err := db.QueryRow( - `CREATE CHANGEFEED FOR bank.bank INTO $1 WITH updated, resolved, diff`, kafka.sinkURL(ctx), + `CREATE CHANGEFEED FOR bank.bank INTO $1 WITH `+strings.Join(opts, `, `), kafka.sinkURL(ctx), ).Scan(&jobID); err != nil { t.Fatal(err) } @@ -314,19 +321,22 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) { } const requestedResolved = 100 - baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`) - if err != nil { - return err - } fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0) if err != nil { return err } - v := cdctest.MakeCountValidator(cdctest.Validators{ + validators := cdctest.Validators{ cdctest.NewOrderValidator(`bank`), - baV, fprintV, - }) + } + if withDiff { + baV, err := cdctest.NewBeforeAfterValidator(db, `bank.bank`) + if err != nil { + return err + } + validators = append(validators, baV) + } + v := cdctest.MakeCountValidator(validators) for { m := tc.Next(ctx) @@ -387,10 +397,16 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) { if _, err := db.Exec(`CREATE TABLE foo (a INT PRIMARY KEY)`); err != nil { t.Fatal(err) } + + // NB: the WITH diff option was not supported until v20.1. + withDiff := t.IsBuildVersion("v20.1.0") + var opts = []string{`updated`, `resolved`, `format=experimental_avro`, `confluent_schema_registry=$2`} + if withDiff { + opts = append(opts, `diff`) + } var jobID string if err := db.QueryRow( - `CREATE CHANGEFEED FOR foo INTO $1`+ - `WITH updated, resolved, diff, format=experimental_avro, confluent_schema_registry=$2`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH `+strings.Join(opts, `, `), kafka.sinkURL(ctx), kafka.schemaRegistryURL(ctx), ).Scan(&jobID); err != nil { t.Fatal(err) @@ -448,17 +464,30 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) { } sort.Strings(updated) - expected := []string{ - `{"before":null,"after":{"foo":{"a":{"long":1}}},"updated":{"string":""}}`, - `{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`, - `{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`, - `{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":1},"b":null,"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":2},"b":{"string":"2"},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, - `{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, + var expected []string + if withDiff { + expected = []string{ + `{"before":null,"after":{"foo":{"a":{"long":1}}},"updated":{"string":""}}`, + `{"before":null,"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}},"updated":{"string":""}}`, + `{"before":null,"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"updated":{"string":""}}`, + `{"before":null,"after":{"foo":{"a":{"long":4},"c":{"long":4}}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":1},"b":null,"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":1},"c":null}},"after":{"foo":{"a":{"long":1},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":2},"b":{"string":"2"},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":2},"c":null}},"after":{"foo":{"a":{"long":2},"c":null}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, + `{"before":{"foo_before":{"a":{"long":3},"c":{"long":3}}},"after":{"foo":{"a":{"long":3},"c":{"long":3}}},"updated":{"string":""}}`, + } + } else { + expected = []string{ + `{"updated":{"string":""},"after":{"foo":{"a":{"long":1},"c":null}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":1}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"b":{"string":"2"}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":2},"c":null}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"b":{"string":"3"},"c":{"long":3}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":3},"c":{"long":3}}}}`, + `{"updated":{"string":""},"after":{"foo":{"a":{"long":4},"c":{"long":4}}}}`, + } } if strings.Join(expected, "\n") != strings.Join(updated, "\n") { t.Fatalf("expected\n%s\n\ngot\n%s\n\n",