diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index e04c74ddddf4..d27b8376d06d 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -19,9 +19,6 @@ import ( "sync/atomic" "unsafe" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/base" @@ -33,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/shuffle" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -1176,14 +1174,7 @@ func (ds *DistSender) sendToReplicas( // leader). If the original attempt merely timed out or was // lost, then the batch will succeed and we can be assured the // commit was applied just once. - // - // The Unavailable code is used by GRPC to indicate that a - // request fails fast and is not sent, so we can be sure there - // is no ambiguity on these errors. Note that these are common - // if a node is down. - // See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/call.go#L182 - // See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/stream.go#L158 - if haveCommit && grpc.Code(err) != codes.Unavailable { + if haveCommit && !netutil.ErrIsGRPCUnavailable(err) { ambiguousError = err } log.ErrEventf(ctx, "RPC error: %s", err) diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 756998ed3721..e5708c0ffa37 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -15,6 +15,9 @@ package sql import ( + "strings" + "time" + "golang.org/x/net/context" opentracing "github.com/opentracing/opentracing-go" @@ -29,9 +32,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) +// poisonedFlowDefaultTimeout is the amount of time that a poisoned flow (a +// flow that will not actually be scheduled) lives in the FlowRegistry. +const poisonedFlowDefaultTimeout time.Duration = time.Second + // To allow queries to send out flow RPCs in parallel, we use a pool of workers // that can issue the RPCs on behalf of the running code. The pool is shared by // multiple queries. @@ -128,6 +136,7 @@ func (dsp *distSQLPlanner) Run( recv.resultToStreamColMap = plan.planToStreamColMap thisNodeID := dsp.nodeDesc.NodeID + thisNodeAddr := planCtx.nodeAddresses[thisNodeID] // DistSQL needs to initialize the Transaction proto before we put it in the // FlowRequest's below. This is because we might not have used the txn do to @@ -151,6 +160,10 @@ func (dsp *distSQLPlanner) Run( // Skip this node. continue } + + addFallbackToOutputStreams(&flowSpec, thisNodeAddr) + flows[nodeID] = flowSpec + req := &distsqlrun.SetupFlowRequest{ Version: distsqlrun.Version, Txn: *txn.Proto(), @@ -175,25 +188,63 @@ func (dsp *distSQLPlanner) Run( } var firstErr error + var flowsToRunLocally []roachpb.NodeID + anyScheduled := false // Now wait for all the flows to be scheduled on remote nodes. Note that we // are not waiting for the flows themselves to complete. for i := 0; i < len(flows)-1; i++ { res := <-resultChan - if firstErr == nil { - firstErr = res.err + if res.err != nil { + // Flows for which we've gotten an error will be run locally iff we know for + // sure that the server where we attempted to schedule them did not, in + // fact, schedule them (we don't want to schedule the same flow on two + // nodes, as that might corrupt results in cases some producers connect to + // one node and other to the other). + // Note that admission control errors are not recovered in this way - we + // don't want to overload the local node in case another node is overloaded. + // TODO(andrei, radu): this should be reconsidered when we integrate the + // local flow with local admission control. + if isVersionMismatchErr(res.err) || netutil.ErrIsGRPCUnavailable(res.err) { + // Flows that failed to schedule because of version mismatch will be run + // locally. + flowsToRunLocally = append(flowsToRunLocally, res.nodeID) + continue + } + if firstErr == nil { + firstErr = res.err + } + continue } - // TODO(radu): accumulate the flows that we failed to set up and move them - // into the local flow. + anyScheduled = true } + localFlow := flows[thisNodeID] if firstErr != nil { + if anyScheduled { + // If any flows were scheduled, poison the local FlowRegistry so those + // remote flows are likely to catch an error soon. + dsp.distSQLSrv.PoisonFlow(localFlow.FlowID, poisonedFlowDefaultTimeout) + } return firstErr } + // Merge the flowsToRunLocally (if any) into the local flow. + if len(flowsToRunLocally) > 0 { + addressesToRewrite := make(map[string]struct{}) + for _, failedNodeID := range flowsToRunLocally { + log.VEventf(ctx, 2, "scheduling locally flow for node: %d", failedNodeID) + // Copy over the processors to the local flow. + localFlow.Processors = append(localFlow.Processors, flows[failedNodeID].Processors...) + addressesToRewrite[planCtx.nodeAddresses[failedNodeID]] = struct{}{} + } + // Rewrite the streams that are now local. + rewriteStreamsToLocal(&localFlow, addressesToRewrite) + } + // Set up the flow on this node. localReq := distsqlrun.SetupFlowRequest{ Version: distsqlrun.Version, Txn: *txn.Proto(), - Flow: flows[thisNodeID], + Flow: localFlow, EvalContext: evalCtxProto, } ctx, flow, err := dsp.distSQLSrv.SetupSyncFlow(ctx, &localReq, recv) @@ -208,6 +259,77 @@ func (dsp *distSQLPlanner) Run( return nil } +// rewriteStreamToLocal rewrites all the streams in flow that are either inbound +// from or outbound to any node in addressesToRewrite (identified by the node's +// address) to local streams. This is called after the respective processors +// have been moved to run locally. +func rewriteStreamsToLocal(flow *distsqlrun.FlowSpec, addressesToRewrite map[string]struct{}) { + // Two steps: + // 1. Rewrite the outgoing endpoints, identifying them by + // addresses of failed nodes. Also collect all the stream IDs. These need + // collecting because only outgoing endpoints have addresses in the spec, but + // we also need to rewrite the incoming endpoints. + // 2. Rewrite the incoming endpoints. + streamIDs := make(map[distsqlrun.StreamID]struct{}) + for i := range flow.Processors { + proc := &flow.Processors[i] + for j := range proc.Output { + output := &proc.Output[j] + for k := range output.Streams { + outputStream := &output.Streams[k] + if _, ok := addressesToRewrite[outputStream.TargetAddr]; ok { + outputStream.Type = distsqlrun.StreamEndpointSpec_LOCAL + outputStream.TargetAddr = "" + streamIDs[outputStream.StreamID] = struct{}{} + } + } + } + } + + for i := range flow.Processors { + proc := &flow.Processors[i] + for j := range proc.Input { + input := &proc.Input[j] + for k := range input.Streams { + inputStream := &input.Streams[k] + if _, ok := streamIDs[inputStream.StreamID]; ok { + inputStream.Type = distsqlrun.StreamEndpointSpec_LOCAL + } + } + } + } +} + +// addFallbackToOutputStreams fills in the FallbackAddr field of all the output +// streams in the flow to the specified gatewayAddr +func addFallbackToOutputStreams(flow *distsqlrun.FlowSpec, gatewayAddr string) { + for i := range flow.Processors { + proc := &flow.Processors[i] + for j := range proc.Output { + output := &proc.Output[j] + for k := range output.Streams { + outputStream := &output.Streams[k] + if outputStream.Type == distsqlrun.StreamEndpointSpec_REMOTE { + outputStream.FallbackAddr = &gatewayAddr + } + } + } + } +} + +// isVersionMismatchErr checks whether the error is a DistSQL version mismatch +// error, indicating that a SetupFlow request failed because a remote node is +// incompatible. +func isVersionMismatchErr(err error) bool { + // We check both the error string and the error type. We'd like to check just + // the type, but 1.0 didn't have a typed error. + if strings.HasPrefix(err.Error(), distsqlrun.VersionMismatchErrorPrefix) { + return true + } + _, ok := err.(*distsqlrun.VersionMismatchError) + return ok +} + // distSQLReceiver is a RowReceiver that stores incoming rows in a RowContainer. // This is where the DistSQL execution meets the SQL Session - the RowContainer // comes from a client Session. diff --git a/pkg/sql/distsqlrun/api.pb.go b/pkg/sql/distsqlrun/api.pb.go index 1a78dcf18a7f..53820643f501 100644 --- a/pkg/sql/distsqlrun/api.pb.go +++ b/pkg/sql/distsqlrun/api.pb.go @@ -17,6 +17,7 @@ ConsumerSignal DrainRequest Error + VersionMismatchError Expression Ordering StreamEndpointSpec diff --git a/pkg/sql/distsqlrun/base.go b/pkg/sql/distsqlrun/base.go index 456588bda32f..ac3edee86916 100644 --- a/pkg/sql/distsqlrun/base.go +++ b/pkg/sql/distsqlrun/base.go @@ -627,13 +627,17 @@ func NewError(err error) *Error { Detail: &Error_RetryableTxnError{ RetryableTxnError: retryErr, }} - } else { - // Anything unrecognized is an "internal error". + } else if versionMismatchError, ok := err.(*VersionMismatchError); ok { return &Error{ - Detail: &Error_PGError{ - PGError: pgerror.NewError( - pgerror.CodeInternalError, err.Error())}} + Detail: &Error_VersionMismatchError{ + VersionMismatchError: versionMismatchError, + }} } + // Anything unrecognized is an "internal error". + return &Error{ + Detail: &Error_PGError{ + PGError: pgerror.NewError( + pgerror.CodeInternalError, err.Error())}} } // ErrorDetail returns the payload as a Go error. @@ -646,6 +650,8 @@ func (e *Error) ErrorDetail() error { return t.PGError case *Error_RetryableTxnError: return t.RetryableTxnError + case *Error_VersionMismatchError: + return t.VersionMismatchError default: panic(fmt.Sprintf("bad error detail: %+v", t)) } diff --git a/pkg/sql/distsqlrun/data.go b/pkg/sql/distsqlrun/data.go index f56679f867b6..e8a6e9c07716 100644 --- a/pkg/sql/distsqlrun/data.go +++ b/pkg/sql/distsqlrun/data.go @@ -15,6 +15,8 @@ package distsqlrun import ( + "fmt" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/encoding" ) @@ -49,3 +51,25 @@ func convertToSpecOrdering(columnOrdering sqlbase.ColumnOrdering) Ordering { } return specOrdering } + +// VersionMismatchErrorPrefix is a prefix of the VersionMismatchError's message. +// This can be used to check for this error even when it's coming from servers +// that were returning it as an untyped "internal error". +const VersionMismatchErrorPrefix = "version mismatch in flow request:" + +// NewVersionMismatchError creates a new VersionMismatchError. +func NewVersionMismatchError( + requestedVersion uint32, serverMinVersion uint32, serverVersion uint32, +) error { + return &VersionMismatchError{ + RequestedVersion: uint32(requestedVersion), + ServerMinVersion: uint32(serverMinVersion), + ServerVersion: uint32(serverVersion), + } +} + +// Error implements the error interface. +func (e *VersionMismatchError) Error() string { + return fmt.Sprintf("%s %d; this node accepts %d through %d", + VersionMismatchErrorPrefix, e.RequestedVersion, e.ServerMinVersion, e.ServerVersion) +} diff --git a/pkg/sql/distsqlrun/data.pb.go b/pkg/sql/distsqlrun/data.pb.go index 34b9383729eb..4603b927a3df 100644 --- a/pkg/sql/distsqlrun/data.pb.go +++ b/pkg/sql/distsqlrun/data.pb.go @@ -55,7 +55,7 @@ func (x *Ordering_Column_Direction) UnmarshalJSON(data []byte) error { return nil } func (Ordering_Column_Direction) EnumDescriptor() ([]byte, []int) { - return fileDescriptorData, []int{2, 0, 0} + return fileDescriptorData, []int{3, 0, 0} } type StreamEndpointSpec_Type int32 @@ -100,7 +100,7 @@ func (x *StreamEndpointSpec_Type) UnmarshalJSON(data []byte) error { return nil } func (StreamEndpointSpec_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptorData, []int{3, 0} + return fileDescriptorData, []int{4, 0} } type InputSyncSpec_Type int32 @@ -139,7 +139,7 @@ func (x *InputSyncSpec_Type) UnmarshalJSON(data []byte) error { *x = InputSyncSpec_Type(value) return nil } -func (InputSyncSpec_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorData, []int{4, 0} } +func (InputSyncSpec_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorData, []int{5, 0} } type OutputRouterSpec_Type int32 @@ -186,13 +186,14 @@ func (x *OutputRouterSpec_Type) UnmarshalJSON(data []byte) error { *x = OutputRouterSpec_Type(value) return nil } -func (OutputRouterSpec_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorData, []int{5, 0} } +func (OutputRouterSpec_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorData, []int{6, 0} } // Error is a generic representation including a string message. type Error struct { // Types that are valid to be assigned to Detail: // *Error_PGError // *Error_RetryableTxnError + // *Error_VersionMismatchError Detail isError_Detail `protobuf_oneof:"detail"` } @@ -212,9 +213,13 @@ type Error_PGError struct { type Error_RetryableTxnError struct { RetryableTxnError *cockroach_roachpb2.UnhandledRetryableError `protobuf:"bytes,2,opt,name=retryableTxnError,oneof"` } +type Error_VersionMismatchError struct { + VersionMismatchError *VersionMismatchError `protobuf:"bytes,3,opt,name=version_mismatch_error,json=versionMismatchError,oneof"` +} -func (*Error_PGError) isError_Detail() {} -func (*Error_RetryableTxnError) isError_Detail() {} +func (*Error_PGError) isError_Detail() {} +func (*Error_RetryableTxnError) isError_Detail() {} +func (*Error_VersionMismatchError) isError_Detail() {} func (m *Error) GetDetail() isError_Detail { if m != nil { @@ -237,11 +242,19 @@ func (m *Error) GetRetryableTxnError() *cockroach_roachpb2.UnhandledRetryableErr return nil } +func (m *Error) GetVersionMismatchError() *VersionMismatchError { + if x, ok := m.GetDetail().(*Error_VersionMismatchError); ok { + return x.VersionMismatchError + } + return nil +} + // XXX_OneofFuncs is for the internal use of the proto package. func (*Error) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { return _Error_OneofMarshaler, _Error_OneofUnmarshaler, _Error_OneofSizer, []interface{}{ (*Error_PGError)(nil), (*Error_RetryableTxnError)(nil), + (*Error_VersionMismatchError)(nil), } } @@ -259,6 +272,11 @@ func _Error_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { if err := b.EncodeMessage(x.RetryableTxnError); err != nil { return err } + case *Error_VersionMismatchError: + _ = b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.VersionMismatchError); err != nil { + return err + } case nil: default: return fmt.Errorf("Error.Detail has unexpected type %T", x) @@ -285,6 +303,14 @@ func _Error_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) err := b.DecodeMessage(msg) m.Detail = &Error_RetryableTxnError{msg} return true, err + case 3: // detail.version_mismatch_error + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(VersionMismatchError) + err := b.DecodeMessage(msg) + m.Detail = &Error_VersionMismatchError{msg} + return true, err default: return false, nil } @@ -304,6 +330,11 @@ func _Error_OneofSizer(msg proto.Message) (n int) { n += proto.SizeVarint(2<<3 | proto.WireBytes) n += proto.SizeVarint(uint64(s)) n += s + case *Error_VersionMismatchError: + s := proto.Size(x.VersionMismatchError) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s case nil: default: panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) @@ -311,6 +342,19 @@ func _Error_OneofSizer(msg proto.Message) (n int) { return n } +// VersionMismatchError is returned by SetupFlowRequest when the requested +// version is outside of [server_min_version..server_version]. +type VersionMismatchError struct { + RequestedVersion uint32 `protobuf:"varint,1,opt,name=requested_version,json=requestedVersion" json:"requested_version"` + ServerVersion uint32 `protobuf:"varint,2,opt,name=server_version,json=serverVersion" json:"server_version"` + ServerMinVersion uint32 `protobuf:"varint,3,opt,name=server_min_version,json=serverMinVersion" json:"server_min_version"` +} + +func (m *VersionMismatchError) Reset() { *m = VersionMismatchError{} } +func (m *VersionMismatchError) String() string { return proto.CompactTextString(m) } +func (*VersionMismatchError) ProtoMessage() {} +func (*VersionMismatchError) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{1} } + type Expression struct { // TODO(radu): TBD how this will be used Version string `protobuf:"bytes,1,opt,name=version" json:"version"` @@ -322,7 +366,7 @@ type Expression struct { func (m *Expression) Reset() { *m = Expression{} } func (m *Expression) String() string { return proto.CompactTextString(m) } func (*Expression) ProtoMessage() {} -func (*Expression) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{1} } +func (*Expression) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{2} } // Ordering defines an order - specifically a list of column indices and // directions. See sqlbase.ColumnOrdering. @@ -333,7 +377,7 @@ type Ordering struct { func (m *Ordering) Reset() { *m = Ordering{} } func (m *Ordering) String() string { return proto.CompactTextString(m) } func (*Ordering) ProtoMessage() {} -func (*Ordering) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{2} } +func (*Ordering) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{3} } type Ordering_Column struct { ColIdx uint32 `protobuf:"varint,1,opt,name=col_idx,json=colIdx" json:"col_idx"` @@ -343,7 +387,7 @@ type Ordering_Column struct { func (m *Ordering_Column) Reset() { *m = Ordering_Column{} } func (m *Ordering_Column) String() string { return proto.CompactTextString(m) } func (*Ordering_Column) ProtoMessage() {} -func (*Ordering_Column) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{2, 0} } +func (*Ordering_Column) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{3, 0} } // StreamEndpointSpec describes one of the endpoints (input or output) of a physical // stream. @@ -361,12 +405,20 @@ type StreamEndpointSpec struct { StreamID StreamID `protobuf:"varint,2,opt,name=stream_id,json=streamId,casttype=StreamID" json:"stream_id"` // Serving address for the target host, only used for outgoing REMOTE streams. TargetAddr string `protobuf:"bytes,3,opt,name=target_addr,json=targetAddr" json:"target_addr"` + // Address to attempt if connecting to target_addr fails or is slow. Only used + // for outgoing remote streams. + // It's guaranteed that at most one of the attempts to connect to + // {target_addr, fallback_addr} will succeed (so it's safe to attempt them in + // parallel) - they will not both accept the incoming stream. + // + // If the field is left null, then no fallback will be used. + FallbackAddr *string `protobuf:"bytes,4,opt,name=fallback_addr,json=fallbackAddr" json:"fallback_addr,omitempty"` } func (m *StreamEndpointSpec) Reset() { *m = StreamEndpointSpec{} } func (m *StreamEndpointSpec) String() string { return proto.CompactTextString(m) } func (*StreamEndpointSpec) ProtoMessage() {} -func (*StreamEndpointSpec) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{3} } +func (*StreamEndpointSpec) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{4} } // InputSyncSpec is the specification for an input synchronizer; it decides how // to interleave rows from multiple input streams. @@ -381,7 +433,7 @@ type InputSyncSpec struct { func (m *InputSyncSpec) Reset() { *m = InputSyncSpec{} } func (m *InputSyncSpec) String() string { return proto.CompactTextString(m) } func (*InputSyncSpec) ProtoMessage() {} -func (*InputSyncSpec) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{4} } +func (*InputSyncSpec) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{5} } // OutputRouterSpec is the specification for the output router of a processor; // it decides how to send results to multiple output streams. @@ -396,7 +448,7 @@ type OutputRouterSpec struct { func (m *OutputRouterSpec) Reset() { *m = OutputRouterSpec{} } func (m *OutputRouterSpec) String() string { return proto.CompactTextString(m) } func (*OutputRouterSpec) ProtoMessage() {} -func (*OutputRouterSpec) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{5} } +func (*OutputRouterSpec) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{6} } type DatumInfo struct { Encoding cockroach_sql_sqlbase2.DatumEncoding `protobuf:"varint,1,opt,name=encoding,enum=cockroach.sql.sqlbase.DatumEncoding" json:"encoding"` @@ -406,7 +458,7 @@ type DatumInfo struct { func (m *DatumInfo) Reset() { *m = DatumInfo{} } func (m *DatumInfo) String() string { return proto.CompactTextString(m) } func (*DatumInfo) ProtoMessage() {} -func (*DatumInfo) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{6} } +func (*DatumInfo) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{7} } // ProducerHeader is a message that is sent once at the beginning of a stream. type ProducerHeader struct { @@ -417,7 +469,7 @@ type ProducerHeader struct { func (m *ProducerHeader) Reset() { *m = ProducerHeader{} } func (m *ProducerHeader) String() string { return proto.CompactTextString(m) } func (*ProducerHeader) ProtoMessage() {} -func (*ProducerHeader) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{7} } +func (*ProducerHeader) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{8} } // ProducerData is a message that can be sent multiple times as part of a stream // from a producer to a consumer. It contains 0 or more rows and/or 0 or more @@ -436,7 +488,7 @@ type ProducerData struct { func (m *ProducerData) Reset() { *m = ProducerData{} } func (m *ProducerData) String() string { return proto.CompactTextString(m) } func (*ProducerData) ProtoMessage() {} -func (*ProducerData) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{8} } +func (*ProducerData) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{9} } type ProducerMessage struct { Header *ProducerHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` @@ -453,7 +505,7 @@ type ProducerMessage struct { func (m *ProducerMessage) Reset() { *m = ProducerMessage{} } func (m *ProducerMessage) String() string { return proto.CompactTextString(m) } func (*ProducerMessage) ProtoMessage() {} -func (*ProducerMessage) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{9} } +func (*ProducerMessage) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{10} } // RemoteProducerMetadata represents records that a producer wants to pass to // a consumer, other than data rows. It's named RemoteProducerMetadata to not @@ -470,7 +522,7 @@ type RemoteProducerMetadata struct { func (m *RemoteProducerMetadata) Reset() { *m = RemoteProducerMetadata{} } func (m *RemoteProducerMetadata) String() string { return proto.CompactTextString(m) } func (*RemoteProducerMetadata) ProtoMessage() {} -func (*RemoteProducerMetadata) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{10} } +func (*RemoteProducerMetadata) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{11} } type isRemoteProducerMetadata_Value interface { isRemoteProducerMetadata_Value() @@ -621,7 +673,7 @@ func (m *RemoteProducerMetadata_RangeInfos) Reset() { *m = RemoteProduce func (m *RemoteProducerMetadata_RangeInfos) String() string { return proto.CompactTextString(m) } func (*RemoteProducerMetadata_RangeInfos) ProtoMessage() {} func (*RemoteProducerMetadata_RangeInfos) Descriptor() ([]byte, []int) { - return fileDescriptorData, []int{10, 0} + return fileDescriptorData, []int{11, 0} } type RemoteProducerMetadata_TraceData struct { @@ -632,11 +684,12 @@ func (m *RemoteProducerMetadata_TraceData) Reset() { *m = RemoteProducer func (m *RemoteProducerMetadata_TraceData) String() string { return proto.CompactTextString(m) } func (*RemoteProducerMetadata_TraceData) ProtoMessage() {} func (*RemoteProducerMetadata_TraceData) Descriptor() ([]byte, []int) { - return fileDescriptorData, []int{10, 1} + return fileDescriptorData, []int{11, 1} } func init() { proto.RegisterType((*Error)(nil), "cockroach.sql.distsqlrun.Error") + proto.RegisterType((*VersionMismatchError)(nil), "cockroach.sql.distsqlrun.VersionMismatchError") proto.RegisterType((*Expression)(nil), "cockroach.sql.distsqlrun.Expression") proto.RegisterType((*Ordering)(nil), "cockroach.sql.distsqlrun.Ordering") proto.RegisterType((*Ordering_Column)(nil), "cockroach.sql.distsqlrun.Ordering.Column") @@ -708,6 +761,47 @@ func (m *Error_RetryableTxnError) MarshalTo(dAtA []byte) (int, error) { } return i, nil } +func (m *Error_VersionMismatchError) MarshalTo(dAtA []byte) (int, error) { + i := 0 + if m.VersionMismatchError != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintData(dAtA, i, uint64(m.VersionMismatchError.Size())) + n4, err := m.VersionMismatchError.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 + } + return i, nil +} +func (m *VersionMismatchError) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *VersionMismatchError) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0x8 + i++ + i = encodeVarintData(dAtA, i, uint64(m.RequestedVersion)) + dAtA[i] = 0x10 + i++ + i = encodeVarintData(dAtA, i, uint64(m.ServerVersion)) + dAtA[i] = 0x18 + i++ + i = encodeVarintData(dAtA, i, uint64(m.ServerMinVersion)) + return i, nil +} + func (m *Expression) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -813,6 +907,12 @@ func (m *StreamEndpointSpec) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintData(dAtA, i, uint64(len(m.TargetAddr))) i += copy(dAtA[i:], m.TargetAddr) + if m.FallbackAddr != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintData(dAtA, i, uint64(len(*m.FallbackAddr))) + i += copy(dAtA[i:], *m.FallbackAddr) + } return i, nil } @@ -837,11 +937,11 @@ func (m *InputSyncSpec) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintData(dAtA, i, uint64(m.Ordering.Size())) - n4, err := m.Ordering.MarshalTo(dAtA[i:]) + n5, err := m.Ordering.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n4 + i += n5 if len(m.Streams) > 0 { for _, msg := range m.Streams { dAtA[i] = 0x1a @@ -930,11 +1030,11 @@ func (m *DatumInfo) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintData(dAtA, i, uint64(m.Type.Size())) - n5, err := m.Type.MarshalTo(dAtA[i:]) + n6, err := m.Type.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n5 + i += n6 return i, nil } @@ -956,11 +1056,11 @@ func (m *ProducerHeader) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintData(dAtA, i, uint64(m.FlowID.Size())) - n6, err := m.FlowID.MarshalTo(dAtA[i:]) + n7, err := m.FlowID.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n7 dAtA[i] = 0x10 i++ i = encodeVarintData(dAtA, i, uint64(m.StreamID)) @@ -1025,11 +1125,11 @@ func (m *ProducerMessage) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintData(dAtA, i, uint64(m.Header.Size())) - n7, err := m.Header.MarshalTo(dAtA[i:]) + n8, err := m.Header.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n7 + i += n8 } if len(m.Typing) > 0 { for _, msg := range m.Typing { @@ -1046,11 +1146,11 @@ func (m *ProducerMessage) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x1a i++ i = encodeVarintData(dAtA, i, uint64(m.Data.Size())) - n8, err := m.Data.MarshalTo(dAtA[i:]) + n9, err := m.Data.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n8 + i += n9 return i, nil } @@ -1070,11 +1170,11 @@ func (m *RemoteProducerMetadata) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if m.Value != nil { - nn9, err := m.Value.MarshalTo(dAtA[i:]) + nn10, err := m.Value.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += nn9 + i += nn10 } return i, nil } @@ -1085,11 +1185,11 @@ func (m *RemoteProducerMetadata_RangeInfo) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintData(dAtA, i, uint64(m.RangeInfo.Size())) - n10, err := m.RangeInfo.MarshalTo(dAtA[i:]) + n11, err := m.RangeInfo.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n10 + i += n11 } return i, nil } @@ -1099,11 +1199,11 @@ func (m *RemoteProducerMetadata_Error) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintData(dAtA, i, uint64(m.Error.Size())) - n11, err := m.Error.MarshalTo(dAtA[i:]) + n12, err := m.Error.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n11 + i += n12 } return i, nil } @@ -1113,11 +1213,11 @@ func (m *RemoteProducerMetadata_TraceData_) MarshalTo(dAtA []byte) (int, error) dAtA[i] = 0x1a i++ i = encodeVarintData(dAtA, i, uint64(m.TraceData.Size())) - n12, err := m.TraceData.MarshalTo(dAtA[i:]) + n13, err := m.TraceData.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n12 + i += n13 } return i, nil } @@ -1235,6 +1335,24 @@ func (m *Error_RetryableTxnError) Size() (n int) { } return n } +func (m *Error_VersionMismatchError) Size() (n int) { + var l int + _ = l + if m.VersionMismatchError != nil { + l = m.VersionMismatchError.Size() + n += 1 + l + sovData(uint64(l)) + } + return n +} +func (m *VersionMismatchError) Size() (n int) { + var l int + _ = l + n += 1 + sovData(uint64(m.RequestedVersion)) + n += 1 + sovData(uint64(m.ServerVersion)) + n += 1 + sovData(uint64(m.ServerMinVersion)) + return n +} + func (m *Expression) Size() (n int) { var l int _ = l @@ -1272,6 +1390,10 @@ func (m *StreamEndpointSpec) Size() (n int) { n += 1 + sovData(uint64(m.StreamID)) l = len(m.TargetAddr) n += 1 + l + sovData(uint64(l)) + if m.FallbackAddr != nil { + l = len(*m.FallbackAddr) + n += 1 + l + sovData(uint64(l)) + } return n } @@ -1533,6 +1655,145 @@ func (m *Error) Unmarshal(dAtA []byte) error { } m.Detail = &Error_RetryableTxnError{v} iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field VersionMismatchError", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowData + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthData + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &VersionMismatchError{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Detail = &Error_VersionMismatchError{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipData(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthData + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *VersionMismatchError) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowData + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: VersionMismatchError: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: VersionMismatchError: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestedVersion", wireType) + } + m.RequestedVersion = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowData + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RequestedVersion |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ServerVersion", wireType) + } + m.ServerVersion = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowData + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ServerVersion |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ServerMinVersion", wireType) + } + m.ServerMinVersion = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowData + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ServerMinVersion |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipData(dAtA[iNdEx:]) @@ -1927,6 +2188,36 @@ func (m *StreamEndpointSpec) Unmarshal(dAtA []byte) error { } m.TargetAddr = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field FallbackAddr", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowData + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthData + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(dAtA[iNdEx:postIndex]) + m.FallbackAddr = &s + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipData(dAtA[iNdEx:]) @@ -3160,82 +3451,89 @@ var ( func init() { proto.RegisterFile("cockroach/pkg/sql/distsqlrun/data.proto", fileDescriptorData) } var fileDescriptorData = []byte{ - // 1230 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4f, 0x8f, 0xdb, 0x44, - 0x1c, 0x8d, 0x37, 0xd9, 0xfc, 0xf9, 0x25, 0xbb, 0x4d, 0x47, 0x08, 0x45, 0x4b, 0x49, 0x5a, 0x53, - 0xa0, 0x54, 0xc5, 0x69, 0x97, 0x03, 0xa2, 0x3d, 0xd0, 0x64, 0xe3, 0x6d, 0x02, 0xed, 0x66, 0x65, - 0x6f, 0x85, 0x5a, 0x90, 0x8c, 0xd7, 0x9e, 0x66, 0xad, 0x3a, 0xb6, 0x77, 0x3c, 0x6e, 0x36, 0x17, - 0xbe, 0x02, 0x3d, 0x72, 0xec, 0x9d, 0x0f, 0xc0, 0x57, 0xd8, 0x23, 0xc7, 0x0a, 0xa4, 0x15, 0x84, - 0xcf, 0x80, 0x84, 0x38, 0xa1, 0x19, 0xcf, 0xe4, 0x4f, 0x77, 0x43, 0xbb, 0xea, 0x6d, 0x3c, 0xf3, - 0xde, 0xf3, 0xfb, 0xbd, 0xf9, 0x79, 0x3c, 0xf0, 0xb1, 0x13, 0x3a, 0x4f, 0x49, 0x68, 0x3b, 0x07, - 0xcd, 0xe8, 0xe9, 0xa0, 0x19, 0x1f, 0xfa, 0x4d, 0xd7, 0x8b, 0x69, 0x7c, 0xe8, 0x93, 0x24, 0x68, - 0xba, 0x36, 0xb5, 0xb5, 0x88, 0x84, 0x34, 0x44, 0xb5, 0x29, 0x50, 0x8b, 0x0f, 0x7d, 0x6d, 0x06, - 0xda, 0x68, 0x2c, 0x4a, 0xf0, 0x51, 0xb4, 0xdf, 0xb4, 0x23, 0x2f, 0xa5, 0x6e, 0x5c, 0x3e, 0x1b, - 0x30, 0x13, 0xdf, 0x50, 0xcf, 0x46, 0x60, 0x42, 0x42, 0x12, 0x0b, 0xcc, 0xa7, 0xa7, 0x9d, 0x46, - 0x83, 0x91, 0x47, 0x70, 0x33, 0x1a, 0x70, 0xe0, 0x22, 0xfc, 0xfa, 0x69, 0x78, 0x7c, 0xe8, 0xef, - 0xdb, 0x31, 0x6e, 0xc6, 0x94, 0x24, 0x0e, 0x4d, 0x08, 0x76, 0x97, 0x4b, 0x4b, 0x2c, 0x0e, 0x9c, - 0xd0, 0xc5, 0xae, 0xe5, 0xda, 0x34, 0x19, 0x0a, 0xb8, 0xb6, 0x08, 0x4f, 0xa8, 0xe7, 0x37, 0x29, - 0xb1, 0x1d, 0x2f, 0x18, 0x34, 0x09, 0x76, 0x42, 0xc2, 0x08, 0x71, 0x64, 0x07, 0x02, 0xff, 0xce, - 0x20, 0x1c, 0x84, 0x7c, 0xd8, 0x64, 0xa3, 0x74, 0x56, 0xfd, 0x45, 0x81, 0x55, 0x9d, 0x39, 0x46, - 0x6d, 0x28, 0x46, 0x03, 0x8b, 0xbb, 0xaf, 0x29, 0x97, 0x95, 0x6b, 0xe5, 0xcd, 0xda, 0xec, 0x15, - 0x9a, 0xa8, 0x4e, 0xe3, 0xd8, 0x76, 0x79, 0x72, 0xd2, 0x28, 0xec, 0xde, 0xe3, 0x0f, 0xdd, 0x8c, - 0x51, 0x88, 0x06, 0xa9, 0xc6, 0x63, 0xb8, 0x48, 0x30, 0x25, 0x63, 0x7b, 0xdf, 0xc7, 0x7b, 0x47, - 0x01, 0x9f, 0xac, 0xad, 0x70, 0xb1, 0xeb, 0x73, 0x62, 0x22, 0x59, 0xed, 0x61, 0x70, 0x60, 0x07, - 0xae, 0x8f, 0x5d, 0x43, 0x92, 0xa4, 0xe2, 0x69, 0x99, 0xdb, 0xb9, 0x9f, 0x5e, 0x34, 0x32, 0xed, - 0x22, 0xe4, 0x5d, 0x4c, 0x6d, 0xcf, 0x57, 0xb7, 0x01, 0xf4, 0xa3, 0x88, 0xe0, 0x38, 0xf6, 0xc2, - 0x00, 0xd5, 0xa1, 0xf0, 0x0c, 0x13, 0x36, 0xe4, 0xe6, 0x4b, 0xed, 0xdc, 0xf1, 0x49, 0x23, 0x63, - 0xc8, 0x49, 0x54, 0x83, 0x1c, 0x3e, 0x8a, 0x52, 0x33, 0x72, 0x91, 0xcf, 0xa8, 0xff, 0x28, 0x50, - 0xec, 0x13, 0x17, 0x13, 0x2f, 0x18, 0xa0, 0x1e, 0x14, 0x9c, 0xd0, 0x4f, 0x86, 0x41, 0x5c, 0x53, - 0x2e, 0x67, 0xaf, 0x95, 0x37, 0x3f, 0xd1, 0x96, 0x75, 0x9c, 0x26, 0x49, 0xda, 0x16, 0x67, 0xc8, - 0x37, 0x0a, 0xfe, 0xc6, 0x0b, 0x05, 0xf2, 0xe9, 0x0a, 0x7a, 0x9f, 0xab, 0x5a, 0x9e, 0x7b, 0xc4, - 0xcd, 0xad, 0x09, 0x68, 0xde, 0x09, 0xfd, 0x9e, 0x7b, 0x84, 0xbe, 0x81, 0x92, 0xeb, 0x11, 0xec, - 0x50, 0xe6, 0x9e, 0x19, 0x5c, 0xdf, 0xfc, 0xec, 0x8d, 0x5f, 0xab, 0x75, 0x24, 0x55, 0xa8, 0xce, - 0xb4, 0xd4, 0x3a, 0x94, 0xa6, 0xab, 0xa8, 0x00, 0xd9, 0x96, 0xb9, 0x55, 0xcd, 0xa0, 0x22, 0xe4, - 0x3a, 0xba, 0xb9, 0x55, 0x55, 0xd4, 0xbf, 0x15, 0x40, 0x26, 0x25, 0xd8, 0x1e, 0xea, 0x81, 0x1b, - 0x85, 0x5e, 0x40, 0xcd, 0x08, 0x3b, 0xe8, 0x6b, 0xc8, 0xd1, 0x71, 0x84, 0xb9, 0xd7, 0xf5, 0xcd, - 0x5b, 0xcb, 0xad, 0x9c, 0xe6, 0x6a, 0x7b, 0xe3, 0x08, 0xcb, 0x78, 0x99, 0x08, 0xfa, 0x02, 0x4a, - 0x31, 0x87, 0x59, 0x9e, 0xcb, 0x8b, 0x5b, 0x6d, 0x5f, 0x62, 0xcb, 0x93, 0x93, 0x46, 0x31, 0xe5, - 0xf7, 0x3a, 0xff, 0xce, 0x8d, 0x8d, 0x62, 0x0a, 0xef, 0xb9, 0xe8, 0x43, 0x28, 0x53, 0x9b, 0x0c, - 0x30, 0xb5, 0x6c, 0xd7, 0x25, 0xb5, 0xec, 0xdc, 0xd6, 0x41, 0xba, 0xd0, 0x72, 0x5d, 0xa2, 0xde, - 0x84, 0x1c, 0x7b, 0x2b, 0x2a, 0xc1, 0xea, 0xfd, 0xfe, 0x56, 0xeb, 0x7e, 0x35, 0x83, 0x00, 0xf2, - 0x86, 0xfe, 0xa0, 0xbf, 0xa7, 0x57, 0x15, 0x74, 0x11, 0xd6, 0xcc, 0x47, 0x3b, 0x5b, 0x96, 0xa1, - 0x9b, 0xbb, 0xfd, 0x1d, 0x53, 0xaf, 0xae, 0xa8, 0xbf, 0xaf, 0xc0, 0x5a, 0x2f, 0x88, 0x12, 0x6a, - 0x8e, 0x03, 0x87, 0x97, 0xbc, 0xbd, 0x50, 0xf2, 0x8d, 0xe5, 0x25, 0x2f, 0xd0, 0x4e, 0x57, 0xdb, - 0x81, 0x62, 0x28, 0xf6, 0x47, 0xf4, 0xbd, 0xfa, 0xfa, 0x9d, 0x14, 0x0a, 0x53, 0x26, 0xba, 0x0f, - 0x85, 0x34, 0x84, 0xb8, 0x96, 0xe5, 0x5d, 0x78, 0xe3, 0x3c, 0x7b, 0x20, 0x1b, 0x51, 0x48, 0xa0, - 0xaf, 0xa0, 0x92, 0xf6, 0xa4, 0xc5, 0x2c, 0xc6, 0xb5, 0x1c, 0x97, 0xbc, 0xf2, 0x8a, 0xa4, 0x38, - 0x6a, 0x44, 0x57, 0xcd, 0x15, 0x56, 0x76, 0xa6, 0x33, 0xb1, 0xaa, 0x8a, 0xac, 0xd7, 0xa0, 0xf4, - 0x70, 0xa7, 0x6f, 0x74, 0x74, 0x43, 0xef, 0x54, 0x33, 0xa8, 0x0c, 0x05, 0xf9, 0xa0, 0xa8, 0x3f, - 0xae, 0x40, 0xb5, 0x9f, 0xd0, 0x28, 0xa1, 0x46, 0x98, 0x50, 0x4c, 0x78, 0xc0, 0xbd, 0x85, 0x80, - 0x9b, 0xff, 0x13, 0xca, 0x2b, 0xcc, 0xd3, 0x19, 0xcf, 0xa5, 0xb3, 0xf2, 0xf6, 0xe9, 0x5c, 0x81, - 0xca, 0x81, 0x1d, 0x1f, 0x58, 0xf2, 0xb3, 0x67, 0x81, 0xaf, 0x19, 0x65, 0x36, 0x97, 0x46, 0x11, - 0xab, 0x5f, 0x8a, 0xa2, 0xab, 0x50, 0xd9, 0x6d, 0x99, 0xa6, 0xb5, 0xd7, 0x35, 0xfa, 0x0f, 0xef, - 0x75, 0xd3, 0x3e, 0x7b, 0xd0, 0x33, 0x8c, 0xbe, 0x51, 0x55, 0x58, 0x06, 0xed, 0x47, 0x56, 0xb7, - 0x65, 0x76, 0xab, 0x2b, 0xa8, 0x02, 0xc5, 0xf6, 0x23, 0xcb, 0x68, 0xed, 0xdc, 0xd3, 0xab, 0x59, - 0xf5, 0xb9, 0x02, 0xa5, 0x0e, 0x3b, 0xba, 0x7b, 0xc1, 0x93, 0x10, 0x6d, 0x43, 0x91, 0x9f, 0xe7, - 0xac, 0x47, 0xd2, 0x38, 0xae, 0x2e, 0xd9, 0x0b, 0xce, 0xd1, 0x05, 0x56, 0x76, 0x89, 0xe4, 0xa2, - 0x3b, 0x22, 0xd2, 0xb4, 0xcf, 0xde, 0x78, 0x3f, 0x39, 0x49, 0xfd, 0x01, 0xd6, 0x77, 0x49, 0xe8, - 0x26, 0x0e, 0x26, 0x5d, 0x6c, 0xbb, 0x98, 0xa0, 0x5b, 0x50, 0x78, 0xe2, 0x87, 0x23, 0xf6, 0x99, - 0x32, 0x57, 0x95, 0x76, 0x8d, 0xc1, 0x7f, 0x3b, 0x69, 0xe4, 0xb7, 0xfd, 0x70, 0xd4, 0xeb, 0x4c, - 0xa6, 0x23, 0x23, 0xcf, 0x80, 0x3d, 0xf7, 0x2d, 0xbe, 0x6d, 0xf5, 0x67, 0x05, 0x2a, 0xd2, 0x40, - 0xc7, 0xa6, 0x36, 0x7a, 0x0f, 0x4a, 0xc4, 0x1e, 0x59, 0xfb, 0x63, 0x8a, 0xe3, 0xd4, 0x80, 0x51, - 0x24, 0xf6, 0xa8, 0xcd, 0x9e, 0x91, 0x01, 0xc5, 0x21, 0xa6, 0x36, 0xfb, 0x57, 0x8b, 0x3d, 0xbf, - 0xb9, 0x7c, 0xcf, 0x0d, 0x3c, 0x0c, 0x29, 0x96, 0xe2, 0x0f, 0x04, 0x4f, 0xc6, 0x27, 0x75, 0xd0, - 0x75, 0x58, 0x0f, 0x92, 0xa1, 0x85, 0x87, 0x11, 0x1d, 0x5b, 0x24, 0x1c, 0xc5, 0xfc, 0x80, 0x59, - 0x15, 0xb8, 0x4a, 0x90, 0x0c, 0x75, 0xb6, 0x64, 0x84, 0xa3, 0x58, 0x7d, 0xa9, 0xc0, 0x85, 0x99, - 0x60, 0x1c, 0xdb, 0x03, 0x8c, 0xee, 0x42, 0xfe, 0x80, 0x27, 0x27, 0xfe, 0x96, 0xd7, 0x96, 0x3b, - 0x5a, 0x4c, 0xda, 0x10, 0x3c, 0xd4, 0x82, 0x3c, 0x1d, 0x47, 0xe9, 0x51, 0xc1, 0x6a, 0xfa, 0x60, - 0xb9, 0xc2, 0xb4, 0x7b, 0xe4, 0xaf, 0x23, 0x25, 0xa2, 0xbb, 0x90, 0xe3, 0xa1, 0x64, 0xb9, 0x85, - 0x8f, 0x5e, 0x6f, 0xa1, 0x33, 0x8b, 0x82, 0x33, 0xd5, 0xe3, 0x2c, 0xbc, 0x7b, 0x76, 0x62, 0xe8, - 0x3b, 0x00, 0x62, 0x07, 0x03, 0x6c, 0x79, 0xc1, 0x93, 0x50, 0x54, 0x79, 0xe7, 0xbc, 0xb9, 0x6b, - 0x06, 0x93, 0x60, 0xd6, 0xe3, 0x6e, 0xc6, 0x28, 0x11, 0xf9, 0x84, 0x3e, 0x87, 0x55, 0x3c, 0x77, - 0x3f, 0x68, 0x2c, 0x17, 0x96, 0x97, 0x82, 0x14, 0x8f, 0xbe, 0x05, 0x60, 0xf7, 0x1c, 0x6c, 0xcd, - 0x55, 0x7e, 0xfb, 0xdc, 0xb6, 0xf6, 0x98, 0x04, 0x4b, 0x83, 0xb9, 0xa2, 0xf2, 0x61, 0xa3, 0x0f, - 0x30, 0x33, 0x8c, 0x5a, 0xaf, 0x24, 0xc0, 0x76, 0xe9, 0xd2, 0x19, 0x17, 0x99, 0x29, 0x45, 0xfe, - 0x83, 0xa7, 0x65, 0x6e, 0x7c, 0x0f, 0xa5, 0xe9, 0xab, 0x90, 0x09, 0x17, 0x9c, 0xd0, 0xf7, 0xb1, - 0x43, 0xc5, 0xdd, 0x4c, 0x5e, 0x33, 0xe6, 0x4f, 0x00, 0x76, 0x93, 0xd3, 0xc4, 0x4d, 0x4e, 0x33, - 0xc4, 0x4d, 0xce, 0x8c, 0x6c, 0xf9, 0x83, 0x5f, 0x9f, 0x4a, 0xb0, 0xc9, 0xb8, 0x5d, 0x80, 0xd5, - 0x67, 0xb6, 0x9f, 0xe0, 0xf6, 0xd5, 0xe3, 0x3f, 0xeb, 0x99, 0xe3, 0x49, 0x5d, 0xf9, 0x75, 0x52, - 0x57, 0x5e, 0x4e, 0xea, 0xca, 0x1f, 0x93, 0xba, 0xf2, 0xfc, 0xaf, 0x7a, 0xe6, 0x31, 0xcc, 0xf2, - 0xf8, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xfd, 0xc1, 0xb4, 0x27, 0x6c, 0x0b, 0x00, 0x00, + // 1336 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4d, 0x6f, 0x1b, 0xb7, + 0x16, 0xd5, 0xc8, 0xb2, 0x3e, 0xae, 0x65, 0x47, 0x21, 0x82, 0x40, 0xf0, 0xcb, 0x93, 0x93, 0x49, + 0xde, 0x6b, 0x9a, 0xa6, 0xa3, 0xc4, 0x5d, 0x14, 0x4d, 0x16, 0x8d, 0x64, 0xc9, 0xb1, 0xda, 0xd8, + 0x32, 0x46, 0x4e, 0x8b, 0xa4, 0x05, 0xa6, 0xf4, 0x0c, 0x2d, 0x0f, 0x32, 0x9a, 0x19, 0x93, 0x1c, + 0x7f, 0x6c, 0xfa, 0x17, 0x9a, 0x65, 0xd1, 0x4d, 0xb3, 0x6e, 0xff, 0x88, 0x97, 0x5d, 0x06, 0x2d, + 0x60, 0xb4, 0xee, 0x9f, 0x28, 0xba, 0x2a, 0xc8, 0x21, 0xf5, 0x11, 0x5b, 0x4d, 0x82, 0xec, 0x86, + 0xe4, 0x39, 0x87, 0x97, 0xe7, 0xde, 0x21, 0x2f, 0xbc, 0xe7, 0x46, 0xee, 0x33, 0x1a, 0x61, 0x77, + 0xb7, 0x1e, 0x3f, 0xeb, 0xd7, 0xd9, 0x5e, 0x50, 0xf7, 0x7c, 0xc6, 0xd9, 0x5e, 0x40, 0x93, 0xb0, + 0xee, 0x61, 0x8e, 0xad, 0x98, 0x46, 0x3c, 0x42, 0xd5, 0x21, 0xd0, 0x62, 0x7b, 0x81, 0x35, 0x02, + 0x2d, 0x2e, 0x4d, 0x4a, 0xc8, 0xaf, 0x78, 0xbb, 0x8e, 0x63, 0x3f, 0xa5, 0x2e, 0x5e, 0x3d, 0x1f, + 0x30, 0x12, 0x5f, 0x34, 0xcf, 0x47, 0x10, 0x4a, 0x23, 0xca, 0x14, 0xe6, 0xc3, 0xb3, 0x91, 0xc6, + 0xfd, 0x03, 0x9f, 0x92, 0x7a, 0xdc, 0x97, 0xc0, 0x49, 0xf8, 0xad, 0xb3, 0x70, 0xb6, 0x17, 0x6c, + 0x63, 0x46, 0xea, 0x8c, 0xd3, 0xc4, 0xe5, 0x09, 0x25, 0xde, 0x74, 0x69, 0x8d, 0x25, 0xa1, 0x1b, + 0x79, 0xc4, 0x73, 0x3c, 0xcc, 0x93, 0x81, 0x82, 0x5b, 0x93, 0xf0, 0x84, 0xfb, 0x41, 0x9d, 0x53, + 0xec, 0xfa, 0x61, 0xbf, 0x4e, 0x89, 0x1b, 0x51, 0x41, 0x60, 0x31, 0x0e, 0x15, 0xfe, 0x52, 0x3f, + 0xea, 0x47, 0xf2, 0xb3, 0x2e, 0xbe, 0xd2, 0x59, 0xf3, 0xc7, 0x2c, 0xcc, 0xb6, 0x45, 0xc4, 0xa8, + 0x09, 0xc5, 0xb8, 0xef, 0xc8, 0xe8, 0xab, 0xc6, 0x55, 0xe3, 0xe6, 0xdc, 0x72, 0x75, 0xb4, 0x85, + 0xa5, 0x4e, 0x67, 0x49, 0x6c, 0x73, 0xee, 0xf4, 0x64, 0xa9, 0xb0, 0xf9, 0x50, 0x0e, 0xd6, 0x32, + 0x76, 0x21, 0xee, 0xa7, 0x1a, 0x4f, 0xe1, 0x22, 0x25, 0x9c, 0x1e, 0xe1, 0xed, 0x80, 0x6c, 0x1d, + 0x86, 0x72, 0xb2, 0x9a, 0x95, 0x62, 0xb7, 0xc6, 0xc4, 0x94, 0xb3, 0xd6, 0xe3, 0x70, 0x17, 0x87, + 0x5e, 0x40, 0x3c, 0x5b, 0x93, 0xb4, 0xe2, 0x59, 0x19, 0xb4, 0x03, 0x97, 0xf7, 0x09, 0x65, 0x7e, + 0x14, 0x3a, 0x03, 0x9f, 0x0d, 0x30, 0x77, 0x77, 0x55, 0xb4, 0x33, 0x72, 0x03, 0xcb, 0x9a, 0x56, + 0x1b, 0xd6, 0x17, 0x29, 0x6f, 0x5d, 0xd1, 0xf4, 0x26, 0x97, 0xf6, 0xcf, 0x99, 0xbf, 0x97, 0xfb, + 0xfe, 0xc5, 0x52, 0xa6, 0x59, 0x84, 0xbc, 0x47, 0x38, 0xf6, 0x03, 0xf3, 0x27, 0x03, 0x2e, 0x9d, + 0x27, 0x80, 0xee, 0x8a, 0xc3, 0xee, 0x25, 0x84, 0x71, 0xe2, 0x39, 0x4a, 0x4a, 0x3a, 0x37, 0xdf, + 0xcc, 0x1d, 0x9f, 0x2c, 0x65, 0xec, 0xca, 0x70, 0x59, 0xf1, 0xd1, 0x07, 0xb0, 0xc0, 0x08, 0xdd, + 0x27, 0x74, 0x88, 0xcf, 0x8e, 0xe1, 0xe7, 0xd3, 0x35, 0x0d, 0x5e, 0x06, 0xa4, 0xc0, 0x03, 0x3f, + 0x1c, 0x12, 0x66, 0xc6, 0x37, 0x48, 0xd7, 0xd7, 0xfd, 0x50, 0x71, 0xcc, 0x55, 0x80, 0xf6, 0x61, + 0x4c, 0x09, 0x93, 0x0a, 0x35, 0x28, 0x8c, 0xc7, 0x55, 0x52, 0x34, 0x3d, 0x89, 0xaa, 0x90, 0x23, + 0x87, 0x71, 0x9a, 0x21, 0xbd, 0x28, 0x67, 0xcc, 0xbf, 0x0c, 0x28, 0x76, 0xa9, 0x47, 0xa8, 0x1f, + 0xf6, 0x51, 0x07, 0x0a, 0x6e, 0x14, 0x24, 0x83, 0x90, 0x55, 0x8d, 0xab, 0x33, 0x37, 0xe7, 0x96, + 0xdf, 0x9f, 0x6e, 0xb5, 0x26, 0x59, 0x2b, 0x92, 0xa1, 0x77, 0x54, 0xfc, 0xc5, 0x17, 0x06, 0xe4, + 0xd3, 0x15, 0xf4, 0x5f, 0xa9, 0xea, 0xf8, 0xde, 0xe1, 0x84, 0x69, 0x79, 0x37, 0x0a, 0x3a, 0xde, + 0x21, 0xfa, 0x12, 0x4a, 0x9e, 0x4f, 0x89, 0xcb, 0xb5, 0x4b, 0x0b, 0xcb, 0x1f, 0xbd, 0xf1, 0xb6, + 0x56, 0x4b, 0x53, 0x95, 0xea, 0x48, 0xcb, 0xac, 0x41, 0x69, 0xb8, 0x8a, 0x0a, 0x30, 0xd3, 0xe8, + 0xad, 0x54, 0x32, 0xa8, 0x08, 0xb9, 0x56, 0xbb, 0xb7, 0x52, 0x31, 0xcc, 0x1f, 0xb2, 0x80, 0x7a, + 0x9c, 0x12, 0x3c, 0x68, 0x87, 0x5e, 0x1c, 0xf9, 0x21, 0xef, 0xc5, 0xc4, 0x45, 0x9f, 0x43, 0x8e, + 0x1f, 0xc5, 0x44, 0xc6, 0xba, 0xb0, 0x7c, 0x77, 0x7a, 0x28, 0x67, 0xb9, 0xd6, 0xd6, 0x51, 0x4c, + 0xb4, 0xbd, 0x42, 0x04, 0x7d, 0x02, 0x25, 0x26, 0x61, 0x8e, 0xef, 0xc9, 0xc3, 0xcd, 0x36, 0xaf, + 0x88, 0xe5, 0xd3, 0x93, 0xa5, 0x62, 0xca, 0xef, 0xb4, 0xfe, 0x1e, 0xfb, 0xb6, 0x8b, 0x29, 0xbc, + 0xe3, 0xa1, 0xff, 0xc1, 0x1c, 0xc7, 0xb4, 0x4f, 0xb8, 0x83, 0x3d, 0x2f, 0xad, 0x7d, 0x9d, 0x3a, + 0x48, 0x17, 0x1a, 0x9e, 0x47, 0xd1, 0x75, 0x98, 0xdf, 0xc1, 0x41, 0xb0, 0x8d, 0xdd, 0x67, 0x29, + 0x30, 0x27, 0x80, 0x76, 0x59, 0x4f, 0x0a, 0x90, 0x79, 0x07, 0x72, 0x22, 0x34, 0x54, 0x82, 0xd9, + 0x47, 0xdd, 0x95, 0xc6, 0xa3, 0x4a, 0x06, 0x01, 0xe4, 0xed, 0xf6, 0x7a, 0x77, 0xab, 0x5d, 0x31, + 0xd0, 0x45, 0x98, 0xef, 0x3d, 0xd9, 0x58, 0x71, 0xec, 0x76, 0x6f, 0xb3, 0xbb, 0xd1, 0x6b, 0x57, + 0xb2, 0xe6, 0x6f, 0x59, 0x98, 0xef, 0x84, 0x71, 0xc2, 0x7b, 0x47, 0xa1, 0x2b, 0x7d, 0x59, 0x9d, + 0xf0, 0xe5, 0xf6, 0x74, 0x5f, 0x26, 0x68, 0x67, 0x2d, 0x69, 0x41, 0x31, 0x52, 0x49, 0x54, 0x37, + 0x86, 0xf9, 0xfa, 0x74, 0x2b, 0x85, 0x21, 0x13, 0x3d, 0x82, 0x42, 0xea, 0x14, 0xab, 0xce, 0xc8, + 0x52, 0xbd, 0xfd, 0x36, 0x89, 0xd2, 0xd5, 0xaa, 0x24, 0xd0, 0x67, 0x50, 0x4e, 0x0b, 0xd7, 0x11, + 0x21, 0xb2, 0x6a, 0x4e, 0x4a, 0x5e, 0x7b, 0x45, 0x52, 0x5d, 0xd2, 0xaa, 0xf4, 0xc6, 0x0e, 0x36, + 0xe7, 0x0e, 0x67, 0x98, 0x69, 0x2a, 0xaf, 0xe7, 0xa1, 0xf4, 0x78, 0xa3, 0x6b, 0xb7, 0xda, 0x76, + 0xbb, 0x55, 0xc9, 0xa0, 0x39, 0x28, 0xe8, 0x81, 0x61, 0x7e, 0x97, 0x85, 0x4a, 0x37, 0xe1, 0x71, + 0xc2, 0xed, 0x28, 0xe1, 0x84, 0x4a, 0x83, 0x3b, 0x13, 0x06, 0xd7, 0xff, 0xc5, 0x94, 0x57, 0x98, + 0x67, 0x3d, 0x1e, 0x73, 0x27, 0xfb, 0xee, 0xee, 0x5c, 0x83, 0xf2, 0x2e, 0x66, 0xbb, 0x8e, 0xbe, + 0x1b, 0x84, 0xe1, 0xf3, 0xf6, 0x9c, 0x98, 0x4b, 0xad, 0x60, 0xe6, 0xa7, 0xea, 0xd0, 0x15, 0x28, + 0x6f, 0x36, 0x7a, 0x3d, 0x67, 0x6b, 0xcd, 0xee, 0x3e, 0x7e, 0xb8, 0x96, 0xd6, 0xd9, 0x7a, 0xc7, + 0xb6, 0xbb, 0x76, 0xc5, 0x10, 0x1e, 0x34, 0x9f, 0x38, 0x6b, 0x8d, 0xde, 0x5a, 0x25, 0x8b, 0xca, + 0x50, 0x6c, 0x3e, 0x71, 0xec, 0xc6, 0xc6, 0xc3, 0x76, 0x65, 0xc6, 0x7c, 0x6e, 0x40, 0xa9, 0x25, + 0x1e, 0xbd, 0x4e, 0xb8, 0x13, 0xa1, 0x55, 0x28, 0xca, 0x97, 0x50, 0xd4, 0x48, 0x6a, 0xc7, 0x8d, + 0x29, 0xb9, 0x90, 0x9c, 0xb6, 0xc2, 0xea, 0x2a, 0xd1, 0x5c, 0x74, 0x5f, 0x59, 0x9a, 0xd6, 0xd9, + 0x1b, 0xe7, 0x53, 0x92, 0xcc, 0x6f, 0x61, 0x61, 0x93, 0x46, 0x5e, 0xe2, 0x12, 0xba, 0x46, 0xb0, + 0x47, 0xc4, 0x43, 0x50, 0xd8, 0x09, 0xa2, 0x03, 0xf1, 0x2f, 0x8b, 0xa8, 0xca, 0xcd, 0xaa, 0x80, + 0xff, 0x7a, 0xb2, 0x94, 0x5f, 0x0d, 0xa2, 0x83, 0x4e, 0xeb, 0x74, 0xf8, 0x65, 0xe7, 0x05, 0xb0, + 0xe3, 0xbd, 0xc3, 0x05, 0x60, 0xfe, 0x6c, 0x40, 0x59, 0x07, 0xd0, 0xc2, 0x1c, 0xa3, 0xff, 0x40, + 0x89, 0xe2, 0x03, 0x67, 0xfb, 0x88, 0x13, 0x96, 0x06, 0x60, 0x17, 0x29, 0x3e, 0x68, 0x8a, 0x31, + 0xb2, 0xa1, 0x38, 0x20, 0x1c, 0x8b, 0x2e, 0x47, 0xe5, 0xfc, 0xce, 0xf4, 0x9c, 0xdb, 0x64, 0x10, + 0x71, 0xa2, 0xc5, 0xd7, 0x15, 0x4f, 0xdb, 0xa7, 0x75, 0xd0, 0x2d, 0x58, 0x08, 0x93, 0x81, 0x43, + 0x06, 0x31, 0x3f, 0x72, 0x68, 0x74, 0xc0, 0xe4, 0x2d, 0x34, 0xab, 0x70, 0xe5, 0x30, 0x19, 0xb4, + 0xc5, 0x92, 0x1d, 0x1d, 0x30, 0xf3, 0xa5, 0x01, 0x17, 0x46, 0x82, 0x8c, 0xe1, 0x3e, 0x41, 0x0f, + 0x20, 0xbf, 0x2b, 0x9d, 0x53, 0x7d, 0xc6, 0xcd, 0xe9, 0x11, 0x4d, 0x3a, 0x6d, 0x2b, 0x1e, 0x6a, + 0x40, 0x9e, 0x1f, 0xc5, 0xe9, 0x55, 0x21, 0xce, 0x74, 0x7d, 0xba, 0xc2, 0xb0, 0x7a, 0xf4, 0xfb, + 0x92, 0x12, 0xd1, 0x03, 0xc8, 0x49, 0x53, 0xd2, 0xe6, 0xe1, 0xff, 0xaf, 0x0f, 0xa1, 0x35, 0xb2, + 0x42, 0x32, 0xcd, 0xe3, 0x19, 0xb8, 0x7c, 0xbe, 0x63, 0xe8, 0x6b, 0x00, 0x8a, 0xc3, 0x3e, 0x71, + 0xfc, 0x70, 0x27, 0x52, 0xa7, 0xbc, 0xff, 0xb6, 0xbe, 0x5b, 0xb6, 0x90, 0x10, 0xa1, 0xb3, 0xb5, + 0x8c, 0x5d, 0xa2, 0x7a, 0x84, 0x3e, 0x86, 0x59, 0x32, 0xd6, 0x59, 0x2d, 0x4d, 0x17, 0xd6, 0x9d, + 0x4e, 0x8a, 0x47, 0x5f, 0x01, 0x88, 0x0e, 0x91, 0x38, 0x63, 0x27, 0xbf, 0xf7, 0xd6, 0x61, 0x6d, + 0x09, 0x09, 0xe1, 0x86, 0x88, 0x8a, 0xeb, 0xc1, 0x62, 0x17, 0x60, 0x14, 0x30, 0x6a, 0xbc, 0xe2, + 0x80, 0xc8, 0xd2, 0x95, 0x73, 0x5a, 0xc0, 0x21, 0x45, 0x3f, 0xd4, 0xc3, 0x63, 0x2e, 0x7e, 0x03, + 0xa5, 0xe1, 0x56, 0xa8, 0x07, 0x17, 0xdc, 0x28, 0x08, 0x88, 0xcb, 0x55, 0x57, 0xab, 0x7b, 0x91, + 0xf1, 0x1b, 0x40, 0xf4, 0xc0, 0x96, 0xea, 0x81, 0x2d, 0x5b, 0xf5, 0xc0, 0xbd, 0x18, 0xeb, 0x2e, + 0x60, 0x61, 0x28, 0x21, 0x26, 0x59, 0xb3, 0x00, 0xb3, 0xfb, 0x38, 0x48, 0x48, 0xf3, 0xc6, 0xf1, + 0x1f, 0xb5, 0xcc, 0xf1, 0x69, 0xcd, 0xf8, 0xe5, 0xb4, 0x66, 0xbc, 0x3c, 0xad, 0x19, 0xbf, 0x9f, + 0xd6, 0x8c, 0xe7, 0x7f, 0xd6, 0x32, 0x4f, 0x61, 0xe4, 0xc7, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, + 0x0a, 0x41, 0x4d, 0x79, 0xa6, 0x0c, 0x00, 0x00, } diff --git a/pkg/sql/distsqlrun/data.proto b/pkg/sql/distsqlrun/data.proto index 44759bbd7786..e03e02a21c6a 100644 --- a/pkg/sql/distsqlrun/data.proto +++ b/pkg/sql/distsqlrun/data.proto @@ -36,11 +36,21 @@ message Error { oneof detail { pgerror.Error pg_error = 1 [(gogoproto.customname) = "PGError"]; roachpb.UnhandledRetryableError retryableTxnError = 2; + // VersionMismatchError is only returned by SetupFlowRequests. + VersionMismatchError version_mismatch_error = 3; // TODO(andrei): Add AmbiguousResultError here once DistSQL starts executing // writes. } } +// VersionMismatchError is returned by SetupFlowRequest when the requested +// version is outside of [server_min_version..server_version]. +message VersionMismatchError { + optional uint32 requested_version = 1 [(gogoproto.nullable) = false]; + optional uint32 server_version = 2 [(gogoproto.nullable) = false]; + optional uint32 server_min_version = 3 [(gogoproto.nullable) = false]; +} + message Expression { // TODO(radu): TBD how this will be used optional string version = 1 [(gogoproto.nullable) = false]; @@ -95,6 +105,14 @@ message StreamEndpointSpec { (gogoproto.casttype) = "StreamID"]; // Serving address for the target host, only used for outgoing REMOTE streams. optional string target_addr = 3 [(gogoproto.nullable) = false]; + // Address to attempt if connecting to target_addr fails or is slow. Only used + // for outgoing remote streams. + // It's guaranteed that at most one of the attempts to connect to + // {target_addr, fallback_addr} will succeed (so it's safe to attempt them in + // parallel) - they will not both accept the incoming stream. + // + // If the field is left null, then no fallback will be used. + optional string fallback_addr = 4; } // InputSyncSpec is the specification for an input synchronizer; it decides how diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index 1c4c01a67fe9..52b79776516b 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -195,7 +195,11 @@ func (f *Flow) setupOutboundStream(spec StreamEndpointSpec) (RowReceiver, error) return f.syncFlowConsumer, nil case StreamEndpointSpec_REMOTE: - outbox := newOutbox(&f.FlowCtx, spec.TargetAddr, f.id, sid) + fallbackAddr := "" + if spec.FallbackAddr != nil { + fallbackAddr = *spec.FallbackAddr + } + outbox := newOutbox(&f.FlowCtx, spec.TargetAddr, fallbackAddr, f.id, sid) f.startables = append(f.startables, outbox) return outbox, nil diff --git a/pkg/sql/distsqlrun/flow_registry.go b/pkg/sql/distsqlrun/flow_registry.go index 3b40b893ece9..925a0a3df63b 100644 --- a/pkg/sql/distsqlrun/flow_registry.go +++ b/pkg/sql/distsqlrun/flow_registry.go @@ -65,6 +65,12 @@ type flowEntry struct { refCount int flow *Flow + // poisoned indicates that the entry corresponds to a flow that will never be + // registered. This is only used on gateways when some remote flows have been + // scheduled, but others failed and the query as a whole has been abandoned. + // Remote flows that have been scheduled already will notice this poison and + // fail quickly, instead of waiting for the connection timeout. + poisoned bool // inboundStreams are streams that receive data from other hosts, through the // FlowStream API. All fields in the inboundStreamInfos are protected by the @@ -190,6 +196,39 @@ func (fr *flowRegistry) RegisterFlow( } } +// PoisonFlow registers a flow that will never run, such that inbound streams +// attempting to connect to it find out quickly that they need to error out and +// don't wait for the connection timeout. +// The poisoned entry will leave in the FlowRegistry for timeout. Afterwards, +// late streams attempting to connect will wait for the regular connection +// timeout before timing out. +func (fr *flowRegistry) PoisonFlow(id FlowID, timeout time.Duration) { + fr.Lock() + defer fr.Unlock() + + entry, ok := fr.flows[id] + if entry.flow != nil { + panic("flow already registered") + } + if !ok { + entry = &flowEntry{} + fr.flows[id] = entry + } + entry.poisoned = true + // If there are any waiters, wake them up by closing waitCh. + if entry.waitCh != nil { + close(entry.waitCh) + } + // Take a reference that will be removed by UnregisterFlow, in the timer below. + entry.refCount++ + + // Set up a function to time out inbound streams after a while. + entry.streamTimer = time.AfterFunc(timeout, func() { + fr.UnregisterFlow(id) + }) + +} + // UnregisterFlow removes a flow from the registry. Any subsequent // ConnectInboundStream calls for the flow will fail to find it and time out. func (fr *flowRegistry) UnregisterFlow(id FlowID) { @@ -238,6 +277,9 @@ func (fr *flowRegistry) waitForFlowLocked( fr.Lock() fr.releaseEntryLocked(id) + if entry.poisoned { + return entry + } if entry.flow == nil { return nil } @@ -264,9 +306,17 @@ func (fr *flowRegistry) ConnectInboundStream( if entry == nil { return nil, nil, nil, errors.Errorf("flow %s not found", flowID) } + if entry.poisoned { + return nil, nil, nil, errors.Errorf("flow %s: poisoned", flowID) + } s, ok := entry.inboundStreams[streamID] if !ok { + // This error is expected when the client trying to connect this stream is + // using this server as the "fallback" but the registry hasn't been + // configured for this inbound stream (presumably because the primary server + // has been setup successfully, or at least it's not clear that it hasn't + // been setup successfully). return nil, nil, nil, errors.Errorf("flow %s: no inbound stream %d", flowID, streamID) } if s.connected { diff --git a/pkg/sql/distsqlrun/outbox.go b/pkg/sql/distsqlrun/outbox.go index 200d671364f9..c7c67535c2f5 100644 --- a/pkg/sql/distsqlrun/outbox.go +++ b/pkg/sql/distsqlrun/outbox.go @@ -19,14 +19,17 @@ import ( "sync" "time" + "github.com/pkg/errors" "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/log" ) const outboxBufRows = 16 const outboxFlushPeriod = 100 * time.Microsecond +const outboxAttemptFallbackTimeout = 10 * time.Millisecond // preferredEncoding is the encoding used for EncDatums that don't already have // an encoding available. @@ -41,7 +44,21 @@ type outbox struct { RowChannel flowCtx *FlowCtx - addr string + + // addr and fallbackAddr are the host:port to which the outbox will connect. + // addr is tried first and, if that doesn't connect within + // outboxAttemptFallbackTimeout, the outbox also tries to connect to + // fallbackAddr in parallel (if fallbackAddr is not empty). From this moment + // on, the stream is established when either of the two succeed. + // This fallback mechanism is used so that we connect to the gateway when + // scheduling flows on the intended nodes fails: the gateway sometimes falls + // back to scheduling flows locally. + // + // It is guaranteed that at most one of the connection attempts will succeed - + // both targets will not accept the stream. + addr string + fallbackAddr string + // The rows received from the RowChannel will be forwarded on this stream once // it is established. stream DistSQL_FlowStreamClient @@ -60,8 +77,13 @@ type outbox struct { var _ RowReceiver = &outbox{} var _ startable = &outbox{} -func newOutbox(flowCtx *FlowCtx, addr string, flowID FlowID, streamID StreamID) *outbox { - m := &outbox{flowCtx: flowCtx, addr: addr} +// newOutbox creates an outbox. +// +// fallbackAddr can be empty, in which case no fallback is used. +func newOutbox( + flowCtx *FlowCtx, addr string, fallbackAddr string, flowID FlowID, streamID StreamID, +) *outbox { + m := &outbox{flowCtx: flowCtx, addr: addr, fallbackAddr: fallbackAddr} m.encoder.setHeaderFields(flowID, streamID) return m } @@ -172,7 +194,7 @@ func (m *outbox) mainLoop(ctx context.Context) error { if log.V(2) { log.Infof(ctx, "outbox: calling FlowStream") } - m.stream, err = client.FlowStream(context.TODO()) + m.stream, err = client.FlowStream(ctx) if err != nil { if log.V(1) { log.Infof(ctx, "FlowStream error: %s", err) @@ -256,6 +278,109 @@ func (m *outbox) mainLoop(ctx context.Context) error { } } +type connectResult struct { + addr string + stream DistSQL_FlowStreamClient + err error +} + +// connectOutboundStream connects the stream by calling FlowStream. It also +// handles fallback is m.fallbackAddr is configured. If this returns without +// error, m.stream will be set. +func (m *outbox) connectOutboundStream(ctx context.Context) error { + // We're starting the connection asynchronously, then setting up a timer to + // start the fallback connection (if any). We'll then wait until either of the + // two succeeds. + streamCh := make(chan connectResult, 2) + connectAttemptsRemaning := 1 + primaryStreamCtx, primaryStreamCancel := context.WithCancel(ctx) + go func() { + stream, err := connectOutboundStream(primaryStreamCtx, m.flowCtx.rpcCtx, m.addr) + streamCh <- connectResult{ + addr: m.addr, + stream: stream, + err: err, + } + }() + + var fallbackStreamCtx context.Context + var fallbackStreamCancel context.CancelFunc + startFallbackConnection := func() { + if m.fallbackAddr == "" { + log.Fatal(ctx, "connecting to empty fallback") + } + stream, err := connectOutboundStream(fallbackStreamCtx, m.flowCtx.rpcCtx, m.fallbackAddr) + streamCh <- connectResult{ + addr: m.addr, + stream: stream, + err: err, + } + } + var fallbackTimer *time.Timer + if m.fallbackAddr != "" { + connectAttemptsRemaning++ + fallbackStreamCtx, fallbackStreamCancel = context.WithCancel(ctx) + fallbackTimer = time.AfterFunc(outboxAttemptFallbackTimeout, func() { + if m.fallbackAddr != "" { + startFallbackConnection() + } + }) + } + + fallbackStarted := false + errs := make(map[string]error) + for connectAttemptsRemaning > 0 { + s := <-streamCh + connectAttemptsRemaning-- + if s.err != nil { + // Start the fallback unless we or the timer have already started it. + if m.fallbackAddr != "" && !fallbackStarted { + fallbackStarted = true + if fallbackTimer.Stop() { + startFallbackConnection() + } + } + errs[s.addr] = m.err + continue + } + // We succeeded in connecting to the primary addr or the fallback. Sayonara, + // no reason to wait for the result of the other attempt (if still pending) + // - it's guaranteed to fail. + m.stream = s.stream + + // Cancel whatever connection attempt is still in progress, if any. + primaryStreamCancel() + fallbackStreamCancel() + + return nil + } + if m.fallbackAddr != "" { + return errors.Errorf( + "failed to connect outbound stream. Primary: %s. Fallback: %s", + errs[m.addr], errs[m.fallbackAddr]) + } + return errors.Errorf("failed to connect outbound stream: %s", errs[m.addr]) +} + +func connectOutboundStream( + ctx context.Context, rpcCtx *rpc.Context, addr string, +) (DistSQL_FlowStreamClient, error) { + conn, err := rpcCtx.GRPCDial(addr) + if err != nil { + log.VEventf(ctx, 1, "outbox: dial to %s: error: %s", addr, err) + return nil, err + } + client := NewDistSQLClient(conn) + log.VEventf(ctx, 2, "outbox: calling FlowStream to %s", addr) + stream, err := client.FlowStream(ctx) + if err != nil { + log.VEventf(ctx, 1, "outbox: FlowStream to %s: error: %s", addr, err) + return nil, err + } + log.VEventf(ctx, 2, "outbox: FlowStream to %s succeeded", addr) + return stream, nil +} + // drainSignal is a signal received from the consumer telling the producer that // it doesn't need any more rows and optionally asking the producer to drain. type drainSignal struct { diff --git a/pkg/sql/distsqlrun/outbox_test.go b/pkg/sql/distsqlrun/outbox_test.go index 24855012a607..7346c8ef0d1d 100644 --- a/pkg/sql/distsqlrun/outbox_test.go +++ b/pkg/sql/distsqlrun/outbox_test.go @@ -58,7 +58,7 @@ func TestOutbox(t *testing.T) { } flowID := FlowID{uuid.MakeV4()} streamID := StreamID(42) - outbox := newOutbox(&flowCtx, addr.String(), flowID, streamID) + outbox := newOutbox(&flowCtx, addr.String(), "" /* fallbackAddr */, flowID, streamID) var outboxWG sync.WaitGroup // Start the outbox. This should cause the stream to connect, even though // we're not sending any rows. @@ -212,7 +212,7 @@ func TestOutboxInitializesStreamBeforeRecevingAnyRows(t *testing.T) { } flowID := FlowID{uuid.MakeV4()} streamID := StreamID(42) - outbox := newOutbox(&flowCtx, addr.String(), flowID, streamID) + outbox := newOutbox(&flowCtx, addr.String(), "" /* fallbackAddr */, flowID, streamID) var outboxWG sync.WaitGroup // Start the outbox. This should cause the stream to connect, even though @@ -281,7 +281,7 @@ func TestOutboxClosesWhenConsumerCloses(t *testing.T) { var expectedErr error consumerReceivedMsg := make(chan struct{}) if tc.outboxIsClient { - outbox = newOutbox(&flowCtx, addr.String(), flowID, streamID) + outbox = newOutbox(&flowCtx, addr.String(), "" /* fallbackAddr */, flowID, streamID) outbox.start(context.TODO(), &wg) // Wait for the outbox to connect the stream. diff --git a/pkg/sql/distsqlrun/server.go b/pkg/sql/distsqlrun/server.go index 9afb92607e80..f383210e59b2 100644 --- a/pkg/sql/distsqlrun/server.go +++ b/pkg/sql/distsqlrun/server.go @@ -39,7 +39,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" ) -// Version identifies the distsqlrun protocol version. +// Version identifies the distsqlrun protocol version. Flows sent by this node +// will be tagged with this version. // // This version is separate from the main CockroachDB version numbering; it is // only changed when the distsqlrun API changes. @@ -62,6 +63,9 @@ import ( // - at some later point, we can choose to deprecate version 1 and have // servers only accept versions >= 2 (by setting // MinAcceptedVersion to 2). +// +// TODO(andrei): provide guidance on how to use this versus the August 2017 +// cluster-wide versioning mechanism. const Version = 4 // MinAcceptedVersion is the oldest version that the server is @@ -189,10 +193,7 @@ func (ds *ServerImpl) setupFlow( ) (context.Context, *Flow, error) { if req.Version < MinAcceptedVersion || req.Version > Version { - err := errors.Errorf( - "version mismatch in flow request: %d; this node accepts %d through %d", - req.Version, MinAcceptedVersion, Version, - ) + err := NewVersionMismatchError(req.Version, MinAcceptedVersion, Version) log.Warning(ctx, err) return ctx, nil, err } @@ -378,6 +379,16 @@ func (ds *ServerImpl) FlowStream(stream DistSQL_FlowStreamServer) error { return err } +// PoisonFlow registers a flow that will never run, such that inbound streams +// attempting to connect to it find out quickly that they need to error out and +// don't wait for the connection timeout. +// The poisoned entry will leave in the FlowRegistry for timeout. Afterwards, +// late streams attempting to connect will wait for the regular connection +// timeout before timing out. +func (ds *ServerImpl) PoisonFlow(id FlowID, timeout time.Duration) { + ds.flowRegistry.PoisonFlow(id, timeout) +} + // TestingKnobs are the testing knobs. type TestingKnobs struct { // RunBeforeBackfillChunk is called before executing each chunk of a diff --git a/pkg/util/netutil/net.go b/pkg/util/netutil/net.go index 916200098655..255ddb5d9f1f 100644 --- a/pkg/util/netutil/net.go +++ b/pkg/util/netutil/net.go @@ -23,6 +23,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "golang.org/x/net/context" "golang.org/x/net/http2" @@ -159,3 +160,21 @@ func FatalIfUnexpected(err error) { log.Fatal(context.TODO(), err) } } + +// ErrIsGRPCUnavailable checks whether an error is the GRPC error for +// "unavailable" nodes. This can be used to check if an RPC network error is +// guaranteed to mean that the server did not receive the request. +// All connection errors except for an unavailable node (this is GRPC's +// fail-fast error), may mean that the request succeeded on the remote server, +// but we were unable to receive the reply. +// +// The Unavailable code is used by GRPC to indicate that a request fails fast +// and is not sent, so we can be sure there is no ambiguity on these errors. +// Note that these are common if a node is down. +// See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/call.go#L182 +// See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/stream.go#L158 +// +// Returns false is err is nil. +func ErrIsGRPCUnavailable(err error) bool { + return grpc.Code(err) == codes.Unavailable +}