diff --git a/scripts/docker-integration-tests/simple/test.sh b/scripts/docker-integration-tests/simple/test.sh index d8776a171a..fa6d330e74 100755 --- a/scripts/docker-integration-tests/simple/test.sh +++ b/scripts/docker-integration-tests/simple/test.sh @@ -31,91 +31,91 @@ fi echo "Sleeping for a bit to ensure db up" sleep 15 # TODO Replace sleeps with logic to determine when to proceed - echo "Adding namespace" - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ - "name": "agg", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodDuration": "48h", - "blockSizeDuration": "2h", - "bufferFutureDuration": "10m", - "bufferPastDuration": "10m", - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodDuration": "5m" - }, - "indexOptions": { - "enabled": true, - "blockSizeDuration": "2h" - } +echo "Adding namespace" +curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ + "name": "agg", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "48h", + "blockSizeDuration": "2h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "2h" } - }' - - echo "Sleep until namespace is init'd" - ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' - - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ - "name": "unagg", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodDuration": "48h", - "blockSizeDuration": "2h", - "bufferFutureDuration": "10m", - "bufferPastDuration": "10m", - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodDuration": "5m" - }, - "indexOptions": { - "enabled": true, - "blockSizeDuration": "2h" - } + } +}' + +echo "Sleep until namespace is init'd" +ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' + +curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ + "name": "unagg", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "48h", + "blockSizeDuration": "2h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "2h" } - }' - - echo "Sleep until namespace is init'd" - ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' - - echo "Placement initialization" - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ - "num_shards": 64, - "replication_factor": 1, - "instances": [ - { - "id": "m3db_local", - "isolation_group": "rack-a", - "zone": "embedded", - "weight": 1024, - "endpoint": "127.0.0.1::9000", - "hostname": "127.0.0.1:", - "port": 9000 - } - ] - }' - - echo "Sleep until placement is init'd" - ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' - - echo "Sleep until bootstrapped" - ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' - - echo "Waiting until shards are marked as available" - ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]' + } +}' + +echo "Sleep until namespace is init'd" +ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' + +echo "Placement initialization" +curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ + "num_shards": 64, + "replication_factor": 1, + "instances": [ + { + "id": "m3db_local", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "endpoint": "127.0.0.1::9000", + "hostname": "127.0.0.1:", + "port": 9000 + } + ] +}' + +echo "Sleep until placement is init'd" +ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + +echo "Sleep until bootstrapped" +ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' + +echo "Waiting until shards are marked as available" +ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]' echo "Write data" curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{ diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index 799e757dda..68e34780b9 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -63,7 +63,10 @@ service Node { // Management endpoints NodeHealthResult health() throws (1: Error err) + // NB: bootstrapped is for use with cluster management tools like k8s. NodeBootstrappedResult bootstrapped() throws (1: Error err) + // NB: bootstrappedInPlacementOrNoPlacement is for use with cluster management tools like k8s. + NodeBootstrappedInPlacementOrNoPlacementResult bootstrappedInPlacementOrNoPlacement() throws (1: Error err) NodePersistRateLimitResult getPersistRateLimit() throws (1: Error err) NodePersistRateLimitResult setPersistRateLimit(1: NodeSetPersistRateLimitRequest req) throws (1: Error err) NodeWriteNewSeriesAsyncResult getWriteNewSeriesAsync() throws (1: Error err) @@ -261,6 +264,8 @@ struct NodeHealthResult { struct NodeBootstrappedResult {} +struct NodeBootstrappedInPlacementOrNoPlacementResult {} + struct NodePersistRateLimitResult { 1: required bool limitEnabled 2: required double limitMbps diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index 4f6546d9e9..dc482b2a93 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -6607,6 +6607,61 @@ func (p *NodeBootstrappedResult_) String() string { return fmt.Sprintf("NodeBootstrappedResult_(%+v)", *p) } +type NodeBootstrappedInPlacementOrNoPlacementResult_ struct { +} + +func NewNodeBootstrappedInPlacementOrNoPlacementResult_() *NodeBootstrappedInPlacementOrNoPlacementResult_ { + return &NodeBootstrappedInPlacementOrNoPlacementResult_{} +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult_) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult_) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("NodeBootstrappedInPlacementOrNoPlacementResult"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult_) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("NodeBootstrappedInPlacementOrNoPlacementResult_(%+v)", *p) +} + // Attributes: // - LimitEnabled // - LimitMbps @@ -11464,6 +11519,7 @@ type Node interface { Truncate(req *TruncateRequest) (r *TruncateResult_, err error) Health() (r *NodeHealthResult_, err error) Bootstrapped() (r *NodeBootstrappedResult_, err error) + BootstrappedInPlacementOrNoPlacement() (r *NodeBootstrappedInPlacementOrNoPlacementResult_, err error) GetPersistRateLimit() (r *NodePersistRateLimitResult_, err error) // Parameters: // - Req @@ -12787,6 +12843,83 @@ func (p *NodeClient) recvBootstrapped() (value *NodeBootstrappedResult_, err err return } +func (p *NodeClient) BootstrappedInPlacementOrNoPlacement() (r *NodeBootstrappedInPlacementOrNoPlacementResult_, err error) { + if err = p.sendBootstrappedInPlacementOrNoPlacement(); err != nil { + return + } + return p.recvBootstrappedInPlacementOrNoPlacement() +} + +func (p *NodeClient) sendBootstrappedInPlacementOrNoPlacement() (err error) { + oprot := p.OutputProtocol + if oprot == nil { + oprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.OutputProtocol = oprot + } + p.SeqId++ + if err = oprot.WriteMessageBegin("bootstrappedInPlacementOrNoPlacement", thrift.CALL, p.SeqId); err != nil { + return + } + args := NodeBootstrappedInPlacementOrNoPlacementArgs{} + if err = args.Write(oprot); err != nil { + return + } + if err = oprot.WriteMessageEnd(); err != nil { + return + } + return oprot.Flush() +} + +func (p *NodeClient) recvBootstrappedInPlacementOrNoPlacement() (value *NodeBootstrappedInPlacementOrNoPlacementResult_, err error) { + iprot := p.InputProtocol + if iprot == nil { + iprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.InputProtocol = iprot + } + method, mTypeId, seqId, err := iprot.ReadMessageBegin() + if err != nil { + return + } + if method != "bootstrappedInPlacementOrNoPlacement" { + err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "bootstrappedInPlacementOrNoPlacement failed: wrong method name") + return + } + if p.SeqId != seqId { + err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "bootstrappedInPlacementOrNoPlacement failed: out of sequence response") + return + } + if mTypeId == thrift.EXCEPTION { + error59 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error60 error + error60, err = error59.Read(iprot) + if err != nil { + return + } + if err = iprot.ReadMessageEnd(); err != nil { + return + } + err = error60 + return + } + if mTypeId != thrift.REPLY { + err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "bootstrappedInPlacementOrNoPlacement failed: invalid message type") + return + } + result := NodeBootstrappedInPlacementOrNoPlacementResult{} + if err = result.Read(iprot); err != nil { + return + } + if err = iprot.ReadMessageEnd(); err != nil { + return + } + if result.Err != nil { + err = result.Err + return + } + value = result.GetSuccess() + return +} + func (p *NodeClient) GetPersistRateLimit() (r *NodePersistRateLimitResult_, err error) { if err = p.sendGetPersistRateLimit(); err != nil { return @@ -12833,16 +12966,16 @@ func (p *NodeClient) recvGetPersistRateLimit() (value *NodePersistRateLimitResul return } if mTypeId == thrift.EXCEPTION { - error59 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error60 error - error60, err = error59.Read(iprot) + error61 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error62 error + error62, err = error61.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error60 + err = error62 return } if mTypeId != thrift.REPLY { @@ -12914,16 +13047,16 @@ func (p *NodeClient) recvSetPersistRateLimit() (value *NodePersistRateLimitResul return } if mTypeId == thrift.EXCEPTION { - error61 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error62 error - error62, err = error61.Read(iprot) + error63 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error64 error + error64, err = error63.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error62 + err = error64 return } if mTypeId != thrift.REPLY { @@ -12991,16 +13124,16 @@ func (p *NodeClient) recvGetWriteNewSeriesAsync() (value *NodeWriteNewSeriesAsyn return } if mTypeId == thrift.EXCEPTION { - error63 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error64 error - error64, err = error63.Read(iprot) + error65 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error66 error + error66, err = error65.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error64 + err = error66 return } if mTypeId != thrift.REPLY { @@ -13072,16 +13205,16 @@ func (p *NodeClient) recvSetWriteNewSeriesAsync() (value *NodeWriteNewSeriesAsyn return } if mTypeId == thrift.EXCEPTION { - error65 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error66 error - error66, err = error65.Read(iprot) + error67 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error68 error + error68, err = error67.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error66 + err = error68 return } if mTypeId != thrift.REPLY { @@ -13149,16 +13282,16 @@ func (p *NodeClient) recvGetWriteNewSeriesBackoffDuration() (value *NodeWriteNew return } if mTypeId == thrift.EXCEPTION { - error67 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error68 error - error68, err = error67.Read(iprot) + error69 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error70 error + error70, err = error69.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error68 + err = error70 return } if mTypeId != thrift.REPLY { @@ -13230,16 +13363,16 @@ func (p *NodeClient) recvSetWriteNewSeriesBackoffDuration() (value *NodeWriteNew return } if mTypeId == thrift.EXCEPTION { - error69 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error70 error - error70, err = error69.Read(iprot) + error71 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error72 error + error72, err = error71.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error70 + err = error72 return } if mTypeId != thrift.REPLY { @@ -13307,16 +13440,16 @@ func (p *NodeClient) recvGetWriteNewSeriesLimitPerShardPerSecond() (value *NodeW return } if mTypeId == thrift.EXCEPTION { - error71 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error72 error - error72, err = error71.Read(iprot) + error73 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error74 error + error74, err = error73.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error72 + err = error74 return } if mTypeId != thrift.REPLY { @@ -13388,16 +13521,16 @@ func (p *NodeClient) recvSetWriteNewSeriesLimitPerShardPerSecond() (value *NodeW return } if mTypeId == thrift.EXCEPTION { - error73 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error74 error - error74, err = error73.Read(iprot) + error75 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error76 error + error76, err = error75.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error74 + err = error76 return } if mTypeId != thrift.REPLY { @@ -13439,32 +13572,33 @@ func (p *NodeProcessor) ProcessorMap() map[string]thrift.TProcessorFunction { func NewNodeProcessor(handler Node) *NodeProcessor { - self75 := &NodeProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} - self75.processorMap["query"] = &nodeProcessorQuery{handler: handler} - self75.processorMap["aggregateRaw"] = &nodeProcessorAggregateRaw{handler: handler} - self75.processorMap["aggregate"] = &nodeProcessorAggregate{handler: handler} - self75.processorMap["fetch"] = &nodeProcessorFetch{handler: handler} - self75.processorMap["fetchTagged"] = &nodeProcessorFetchTagged{handler: handler} - self75.processorMap["write"] = &nodeProcessorWrite{handler: handler} - self75.processorMap["writeTagged"] = &nodeProcessorWriteTagged{handler: handler} - self75.processorMap["fetchBatchRaw"] = &nodeProcessorFetchBatchRaw{handler: handler} - self75.processorMap["fetchBlocksRaw"] = &nodeProcessorFetchBlocksRaw{handler: handler} - self75.processorMap["fetchBlocksMetadataRawV2"] = &nodeProcessorFetchBlocksMetadataRawV2{handler: handler} - self75.processorMap["writeBatchRaw"] = &nodeProcessorWriteBatchRaw{handler: handler} - self75.processorMap["writeTaggedBatchRaw"] = &nodeProcessorWriteTaggedBatchRaw{handler: handler} - self75.processorMap["repair"] = &nodeProcessorRepair{handler: handler} - self75.processorMap["truncate"] = &nodeProcessorTruncate{handler: handler} - self75.processorMap["health"] = &nodeProcessorHealth{handler: handler} - self75.processorMap["bootstrapped"] = &nodeProcessorBootstrapped{handler: handler} - self75.processorMap["getPersistRateLimit"] = &nodeProcessorGetPersistRateLimit{handler: handler} - self75.processorMap["setPersistRateLimit"] = &nodeProcessorSetPersistRateLimit{handler: handler} - self75.processorMap["getWriteNewSeriesAsync"] = &nodeProcessorGetWriteNewSeriesAsync{handler: handler} - self75.processorMap["setWriteNewSeriesAsync"] = &nodeProcessorSetWriteNewSeriesAsync{handler: handler} - self75.processorMap["getWriteNewSeriesBackoffDuration"] = &nodeProcessorGetWriteNewSeriesBackoffDuration{handler: handler} - self75.processorMap["setWriteNewSeriesBackoffDuration"] = &nodeProcessorSetWriteNewSeriesBackoffDuration{handler: handler} - self75.processorMap["getWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorGetWriteNewSeriesLimitPerShardPerSecond{handler: handler} - self75.processorMap["setWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorSetWriteNewSeriesLimitPerShardPerSecond{handler: handler} - return self75 + self77 := &NodeProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} + self77.processorMap["query"] = &nodeProcessorQuery{handler: handler} + self77.processorMap["aggregateRaw"] = &nodeProcessorAggregateRaw{handler: handler} + self77.processorMap["aggregate"] = &nodeProcessorAggregate{handler: handler} + self77.processorMap["fetch"] = &nodeProcessorFetch{handler: handler} + self77.processorMap["fetchTagged"] = &nodeProcessorFetchTagged{handler: handler} + self77.processorMap["write"] = &nodeProcessorWrite{handler: handler} + self77.processorMap["writeTagged"] = &nodeProcessorWriteTagged{handler: handler} + self77.processorMap["fetchBatchRaw"] = &nodeProcessorFetchBatchRaw{handler: handler} + self77.processorMap["fetchBlocksRaw"] = &nodeProcessorFetchBlocksRaw{handler: handler} + self77.processorMap["fetchBlocksMetadataRawV2"] = &nodeProcessorFetchBlocksMetadataRawV2{handler: handler} + self77.processorMap["writeBatchRaw"] = &nodeProcessorWriteBatchRaw{handler: handler} + self77.processorMap["writeTaggedBatchRaw"] = &nodeProcessorWriteTaggedBatchRaw{handler: handler} + self77.processorMap["repair"] = &nodeProcessorRepair{handler: handler} + self77.processorMap["truncate"] = &nodeProcessorTruncate{handler: handler} + self77.processorMap["health"] = &nodeProcessorHealth{handler: handler} + self77.processorMap["bootstrapped"] = &nodeProcessorBootstrapped{handler: handler} + self77.processorMap["bootstrappedInPlacementOrNoPlacement"] = &nodeProcessorBootstrappedInPlacementOrNoPlacement{handler: handler} + self77.processorMap["getPersistRateLimit"] = &nodeProcessorGetPersistRateLimit{handler: handler} + self77.processorMap["setPersistRateLimit"] = &nodeProcessorSetPersistRateLimit{handler: handler} + self77.processorMap["getWriteNewSeriesAsync"] = &nodeProcessorGetWriteNewSeriesAsync{handler: handler} + self77.processorMap["setWriteNewSeriesAsync"] = &nodeProcessorSetWriteNewSeriesAsync{handler: handler} + self77.processorMap["getWriteNewSeriesBackoffDuration"] = &nodeProcessorGetWriteNewSeriesBackoffDuration{handler: handler} + self77.processorMap["setWriteNewSeriesBackoffDuration"] = &nodeProcessorSetWriteNewSeriesBackoffDuration{handler: handler} + self77.processorMap["getWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorGetWriteNewSeriesLimitPerShardPerSecond{handler: handler} + self77.processorMap["setWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorSetWriteNewSeriesLimitPerShardPerSecond{handler: handler} + return self77 } func (p *NodeProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { @@ -13477,12 +13611,12 @@ func (p *NodeProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, er } iprot.Skip(thrift.STRUCT) iprot.ReadMessageEnd() - x76 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) + x78 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) - x76.Write(oprot) + x78.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() - return false, x76 + return false, x78 } @@ -14319,6 +14453,59 @@ func (p *nodeProcessorBootstrapped) Process(seqId int32, iprot, oprot thrift.TPr return true, err } +type nodeProcessorBootstrappedInPlacementOrNoPlacement struct { + handler Node +} + +func (p *nodeProcessorBootstrappedInPlacementOrNoPlacement) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := NodeBootstrappedInPlacementOrNoPlacementArgs{} + if err = args.Read(iprot); err != nil { + iprot.ReadMessageEnd() + x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) + oprot.WriteMessageBegin("bootstrappedInPlacementOrNoPlacement", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush() + return false, err + } + + iprot.ReadMessageEnd() + result := NodeBootstrappedInPlacementOrNoPlacementResult{} + var retval *NodeBootstrappedInPlacementOrNoPlacementResult_ + var err2 error + if retval, err2 = p.handler.BootstrappedInPlacementOrNoPlacement(); err2 != nil { + switch v := err2.(type) { + case *Error: + result.Err = v + default: + x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing bootstrappedInPlacementOrNoPlacement: "+err2.Error()) + oprot.WriteMessageBegin("bootstrappedInPlacementOrNoPlacement", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush() + return true, err2 + } + } else { + result.Success = retval + } + if err2 = oprot.WriteMessageBegin("bootstrappedInPlacementOrNoPlacement", thrift.REPLY, seqId); err2 != nil { + err = err2 + } + if err2 = result.Write(oprot); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.Flush(); err == nil && err2 != nil { + err = err2 + } + if err != nil { + return + } + return true, err +} + type nodeProcessorGetPersistRateLimit struct { handler Node } @@ -18414,6 +18601,211 @@ func (p *NodeBootstrappedResult) String() string { return fmt.Sprintf("NodeBootstrappedResult(%+v)", *p) } +type NodeBootstrappedInPlacementOrNoPlacementArgs struct { +} + +func NewNodeBootstrappedInPlacementOrNoPlacementArgs() *NodeBootstrappedInPlacementOrNoPlacementArgs { + return &NodeBootstrappedInPlacementOrNoPlacementArgs{} +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementArgs) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementArgs) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("bootstrappedInPlacementOrNoPlacement_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("NodeBootstrappedInPlacementOrNoPlacementArgs(%+v)", *p) +} + +// Attributes: +// - Success +// - Err +type NodeBootstrappedInPlacementOrNoPlacementResult struct { + Success *NodeBootstrappedInPlacementOrNoPlacementResult_ `thrift:"success,0" db:"success" json:"success,omitempty"` + Err *Error `thrift:"err,1" db:"err" json:"err,omitempty"` +} + +func NewNodeBootstrappedInPlacementOrNoPlacementResult() *NodeBootstrappedInPlacementOrNoPlacementResult { + return &NodeBootstrappedInPlacementOrNoPlacementResult{} +} + +var NodeBootstrappedInPlacementOrNoPlacementResult_Success_DEFAULT *NodeBootstrappedInPlacementOrNoPlacementResult_ + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) GetSuccess() *NodeBootstrappedInPlacementOrNoPlacementResult_ { + if !p.IsSetSuccess() { + return NodeBootstrappedInPlacementOrNoPlacementResult_Success_DEFAULT + } + return p.Success +} + +var NodeBootstrappedInPlacementOrNoPlacementResult_Err_DEFAULT *Error + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) GetErr() *Error { + if !p.IsSetErr() { + return NodeBootstrappedInPlacementOrNoPlacementResult_Err_DEFAULT + } + return p.Err +} +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) IsSetErr() bool { + return p.Err != nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if err := p.ReadField0(iprot); err != nil { + return err + } + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) ReadField0(iprot thrift.TProtocol) error { + p.Success = &NodeBootstrappedInPlacementOrNoPlacementResult_{} + if err := p.Success.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) ReadField1(iprot thrift.TProtocol) error { + p.Err = &Error{ + Type: 0, + } + if err := p.Err.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("bootstrappedInPlacementOrNoPlacement_result"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField0(oprot); err != nil { + return err + } + if err := p.writeField1(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) writeField0(oprot thrift.TProtocol) (err error) { + if p.IsSetSuccess() { + if err := oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) + } + if err := p.Success.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Success), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) + } + } + return err +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetErr() { + if err := oprot.WriteFieldBegin("err", thrift.STRUCT, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:err: ", p), err) + } + if err := p.Err.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Err), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:err: ", p), err) + } + } + return err +} + +func (p *NodeBootstrappedInPlacementOrNoPlacementResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("NodeBootstrappedInPlacementOrNoPlacementResult(%+v)", *p) +} + type NodeGetPersistRateLimitArgs struct { } @@ -20334,16 +20726,16 @@ func (p *ClusterClient) recvHealth() (value *HealthResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error179 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error180 error - error180, err = error179.Read(iprot) + error181 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error182 error + error182, err = error181.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error180 + err = error182 return } if mTypeId != thrift.REPLY { @@ -20415,16 +20807,16 @@ func (p *ClusterClient) recvWrite() (err error) { return } if mTypeId == thrift.EXCEPTION { - error181 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error182 error - error182, err = error181.Read(iprot) + error183 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error184 error + error184, err = error183.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error182 + err = error184 return } if mTypeId != thrift.REPLY { @@ -20495,16 +20887,16 @@ func (p *ClusterClient) recvWriteTagged() (err error) { return } if mTypeId == thrift.EXCEPTION { - error183 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error184 error - error184, err = error183.Read(iprot) + error185 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error186 error + error186, err = error185.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error184 + err = error186 return } if mTypeId != thrift.REPLY { @@ -20575,16 +20967,16 @@ func (p *ClusterClient) recvQuery() (value *QueryResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error185 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error186 error - error186, err = error185.Read(iprot) + error187 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error188 error + error188, err = error187.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error186 + err = error188 return } if mTypeId != thrift.REPLY { @@ -20656,16 +21048,16 @@ func (p *ClusterClient) recvAggregate() (value *AggregateQueryResult_, err error return } if mTypeId == thrift.EXCEPTION { - error187 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error188 error - error188, err = error187.Read(iprot) + error189 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error190 error + error190, err = error189.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error188 + err = error190 return } if mTypeId != thrift.REPLY { @@ -20737,16 +21129,16 @@ func (p *ClusterClient) recvFetch() (value *FetchResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error189 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error190 error - error190, err = error189.Read(iprot) + error191 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error192 error + error192, err = error191.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error190 + err = error192 return } if mTypeId != thrift.REPLY { @@ -20818,16 +21210,16 @@ func (p *ClusterClient) recvTruncate() (value *TruncateResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error191 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error192 error - error192, err = error191.Read(iprot) + error193 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error194 error + error194, err = error193.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error192 + err = error194 return } if mTypeId != thrift.REPLY { @@ -20869,15 +21261,15 @@ func (p *ClusterProcessor) ProcessorMap() map[string]thrift.TProcessorFunction { func NewClusterProcessor(handler Cluster) *ClusterProcessor { - self193 := &ClusterProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} - self193.processorMap["health"] = &clusterProcessorHealth{handler: handler} - self193.processorMap["write"] = &clusterProcessorWrite{handler: handler} - self193.processorMap["writeTagged"] = &clusterProcessorWriteTagged{handler: handler} - self193.processorMap["query"] = &clusterProcessorQuery{handler: handler} - self193.processorMap["aggregate"] = &clusterProcessorAggregate{handler: handler} - self193.processorMap["fetch"] = &clusterProcessorFetch{handler: handler} - self193.processorMap["truncate"] = &clusterProcessorTruncate{handler: handler} - return self193 + self195 := &ClusterProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} + self195.processorMap["health"] = &clusterProcessorHealth{handler: handler} + self195.processorMap["write"] = &clusterProcessorWrite{handler: handler} + self195.processorMap["writeTagged"] = &clusterProcessorWriteTagged{handler: handler} + self195.processorMap["query"] = &clusterProcessorQuery{handler: handler} + self195.processorMap["aggregate"] = &clusterProcessorAggregate{handler: handler} + self195.processorMap["fetch"] = &clusterProcessorFetch{handler: handler} + self195.processorMap["truncate"] = &clusterProcessorTruncate{handler: handler} + return self195 } func (p *ClusterProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { @@ -20890,12 +21282,12 @@ func (p *ClusterProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, } iprot.Skip(thrift.STRUCT) iprot.ReadMessageEnd() - x194 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) + x196 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) - x194.Write(oprot) + x196.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() - return false, x194 + return false, x196 } diff --git a/src/dbnode/generated/thrift/rpc/rpc_mock.go b/src/dbnode/generated/thrift/rpc/rpc_mock.go index 99361ec1ad..ee93f4b89f 100644 --- a/src/dbnode/generated/thrift/rpc/rpc_mock.go +++ b/src/dbnode/generated/thrift/rpc/rpc_mock.go @@ -225,6 +225,21 @@ func (mr *MockTChanNodeMockRecorder) Bootstrapped(ctx interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrapped", reflect.TypeOf((*MockTChanNode)(nil).Bootstrapped), ctx) } +// BootstrappedInPlacementOrNoPlacement mocks base method +func (m *MockTChanNode) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*NodeBootstrappedInPlacementOrNoPlacementResult_, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BootstrappedInPlacementOrNoPlacement", ctx) + ret0, _ := ret[0].(*NodeBootstrappedInPlacementOrNoPlacementResult_) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BootstrappedInPlacementOrNoPlacement indicates an expected call of BootstrappedInPlacementOrNoPlacement +func (mr *MockTChanNodeMockRecorder) BootstrappedInPlacementOrNoPlacement(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrappedInPlacementOrNoPlacement", reflect.TypeOf((*MockTChanNode)(nil).BootstrappedInPlacementOrNoPlacement), ctx) +} + // Fetch mocks base method func (m *MockTChanNode) Fetch(ctx thrift.Context, req *FetchRequest) (*FetchResult_, error) { m.ctrl.T.Helper() diff --git a/src/dbnode/generated/thrift/rpc/tchan-rpc.go b/src/dbnode/generated/thrift/rpc/tchan-rpc.go index 8796a7e422..3f2ca658b2 100644 --- a/src/dbnode/generated/thrift/rpc/tchan-rpc.go +++ b/src/dbnode/generated/thrift/rpc/tchan-rpc.go @@ -48,6 +48,7 @@ type TChanNode interface { Aggregate(ctx thrift.Context, req *AggregateQueryRequest) (*AggregateQueryResult_, error) AggregateRaw(ctx thrift.Context, req *AggregateQueryRawRequest) (*AggregateQueryRawResult_, error) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedResult_, error) + BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*NodeBootstrappedInPlacementOrNoPlacementResult_, error) Fetch(ctx thrift.Context, req *FetchRequest) (*FetchResult_, error) FetchBatchRaw(ctx thrift.Context, req *FetchBatchRawRequest) (*FetchBatchRawResult_, error) FetchBlocksMetadataRawV2(ctx thrift.Context, req *FetchBlocksMetadataRawV2Request) (*FetchBlocksMetadataRawV2Result_, error) @@ -527,6 +528,22 @@ func (c *tchanNodeClient) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedRes return resp.GetSuccess(), err } +func (c *tchanNodeClient) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*NodeBootstrappedInPlacementOrNoPlacementResult_, error) { + var resp NodeBootstrappedInPlacementOrNoPlacementResult + args := NodeBootstrappedInPlacementOrNoPlacementArgs{} + success, err := c.client.Call(ctx, c.thriftService, "bootstrappedInPlacementOrNoPlacement", &args, &resp) + if err == nil && !success { + switch { + case resp.Err != nil: + err = resp.Err + default: + err = fmt.Errorf("received no result or unknown exception for bootstrappedInPlacementOrNoPlacement") + } + } + + return resp.GetSuccess(), err +} + func (c *tchanNodeClient) Fetch(ctx thrift.Context, req *FetchRequest) (*FetchResult_, error) { var resp NodeFetchResult args := NodeFetchArgs{ @@ -914,6 +931,7 @@ func (s *tchanNodeServer) Methods() []string { "aggregate", "aggregateRaw", "bootstrapped", + "bootstrappedInPlacementOrNoPlacement", "fetch", "fetchBatchRaw", "fetchBlocksMetadataRawV2", @@ -946,6 +964,8 @@ func (s *tchanNodeServer) Handle(ctx thrift.Context, methodName string, protocol return s.handleAggregateRaw(ctx, protocol) case "bootstrapped": return s.handleBootstrapped(ctx, protocol) + case "bootstrappedInPlacementOrNoPlacement": + return s.handleBootstrappedInPlacementOrNoPlacement(ctx, protocol) case "fetch": return s.handleFetch(ctx, protocol) case "fetchBatchRaw": @@ -1078,6 +1098,34 @@ func (s *tchanNodeServer) handleBootstrapped(ctx thrift.Context, protocol athrif return err == nil, &res, nil } +func (s *tchanNodeServer) handleBootstrappedInPlacementOrNoPlacement(ctx thrift.Context, protocol athrift.TProtocol) (bool, athrift.TStruct, error) { + var req NodeBootstrappedInPlacementOrNoPlacementArgs + var res NodeBootstrappedInPlacementOrNoPlacementResult + + if err := req.Read(protocol); err != nil { + return false, nil, err + } + + r, err := + s.handler.BootstrappedInPlacementOrNoPlacement(ctx) + + if err != nil { + switch v := err.(type) { + case *Error: + if v == nil { + return false, nil, fmt.Errorf("Handler for err returned non-nil error type *Error but nil value") + } + res.Err = v + default: + return false, nil, err + } + } else { + res.Success = r + } + + return err == nil, &res, nil +} + func (s *tchanNodeServer) handleFetch(ctx thrift.Context, protocol athrift.TProtocol) (bool, athrift.TStruct, error) { var req NodeFetchArgs var res NodeFetchResult diff --git a/src/dbnode/network/server/httpjson/handlers.go b/src/dbnode/network/server/httpjson/handlers.go index 59c0bbff35..c9b117cfae 100644 --- a/src/dbnode/network/server/httpjson/handlers.go +++ b/src/dbnode/network/server/httpjson/handlers.go @@ -41,6 +41,30 @@ var ( errInvalidRequestBody = xerrors.NewInvalidParamsError(errors.New("request contains an invalid request body")) ) +// Error is an HTTP JSON error that also sets a return status code. +type Error interface { + error + + StatusCode() int +} + +type errorType struct { + error + statusCode int +} + +// NewError creates a new HTTP JSON error which has a specified status code. +func NewError(err error, statusCode int) Error { + e := errorType{error: err} + e.statusCode = statusCode + return e +} + +// StatusCode returns the HTTP status code that matches the error. +func (e errorType) StatusCode() int { + return e.statusCode +} + type respSuccess struct { } @@ -169,9 +193,7 @@ func RegisterHandlers(mux *http.ServeMux, service interface{}, opts ServerOption if method.Type.NumOut() == 1 { // Ensure we always call the post response fn if set if postResponseFn != nil { - defer func() { - postResponseFn(callContext, method.Name, nil) - }() + defer postResponseFn(callContext, method.Name, nil) } // Deal with error case @@ -206,6 +228,7 @@ func RegisterHandlers(mux *http.ServeMux, service interface{}, opts ServerOption return } + w.WriteHeader(http.StatusOK) w.Write(buff.Bytes()) }) } @@ -231,10 +254,18 @@ func writeError(w http.ResponseWriter, errValue interface{}) { return } - if value, ok := errValue.(error); ok && xerrors.IsInvalidParams(value) { - w.WriteHeader(http.StatusBadRequest) - } else { + switch v := errValue.(type) { + case Error: + w.WriteHeader(v.StatusCode()) + case error: + if xerrors.IsInvalidParams(v) { + w.WriteHeader(http.StatusBadRequest) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + default: w.WriteHeader(http.StatusInternalServerError) } + w.Write(buff.Bytes()) } diff --git a/src/dbnode/network/server/tchannelthrift/node/server.go b/src/dbnode/network/server/tchannelthrift/node/server.go index d115b5f5be..3327757bb2 100644 --- a/src/dbnode/network/server/tchannelthrift/node/server.go +++ b/src/dbnode/network/server/tchannelthrift/node/server.go @@ -31,7 +31,7 @@ import ( ) type server struct { - service rpc.TChanNode + service Service address string contextPool context.Pool opts *tchannel.ChannelOptions @@ -39,7 +39,7 @@ type server struct { // NewServer creates a new node TChannel Thrift network service func NewServer( - service rpc.TChanNode, + service Service, address string, contextPool context.Pool, opts *tchannel.ChannelOptions, diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index ebbaa435af..1bde7a194d 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -83,6 +83,19 @@ var ( // errNodeIsNotBootstrapped errNodeIsNotBootstrapped = errors.New("node is not bootstrapped") + + // errDatabaseIsNotInitializedYet is raised when an RPC attempt is made before the database + // has been set. + errDatabaseIsNotInitializedYet = errors.New("database is not yet initialized") + + // errDatabaseHasAlreadyBeenSet is raised when SetDatabase() is called more than one time. + errDatabaseHasAlreadyBeenSet = errors.New("database has already been set") + + // errNotImplemented raised when attempting to execute an un-implemented method + errNotImplemented = errors.New("method is not implemented") + + // errHealthNotSet is raised when server health data structure is not set. + errHealthNotSet = errors.New("server health not set") ) type serviceMetrics struct { @@ -121,15 +134,34 @@ func newServiceMetrics(scope tally.Scope, samplingRate float64) serviceMetrics { // TODO(r): server side pooling for all return types from service methods type service struct { - sync.RWMutex + state serviceState + + logger *zap.Logger - db storage.Database - logger *zap.Logger opts tchannelthrift.Options nowFn clock.NowFn pools pools metrics serviceMetrics - health *rpc.NodeHealthResult_ +} + +type serviceState struct { + sync.RWMutex + db storage.Database + health *rpc.NodeHealthResult_ +} + +func (s *serviceState) DB() (storage.Database, bool) { + s.RLock() + v := s.db + s.RUnlock() + return v, v != nil +} + +func (s *serviceState) Health() (*rpc.NodeHealthResult_, bool) { + s.RLock() + v := s.health + s.RUnlock() + return v, v != nil } type pools struct { @@ -149,8 +181,16 @@ var _ convert.FetchTaggedConversionPools = pools{} func (p pools) ID() ident.Pool { return p.id } func (p pools) CheckedBytesWrapper() xpool.CheckedBytesWrapperPool { return p.checkedBytesWrapper } +// Service is the interface for the node RPC service. +type Service interface { + rpc.TChanNode + + // Only safe to be called one time once the service has started. + SetDatabase(db storage.Database) error +} + // NewService creates a new node TChannel Thrift service -func NewService(db storage.Database, opts tchannelthrift.Options) rpc.TChanNode { +func NewService(db storage.Database, opts tchannelthrift.Options) Service { if opts == nil { opts = tchannelthrift.NewOptions() } @@ -186,36 +226,44 @@ func NewService(db storage.Database, opts tchannelthrift.Options) rpc.TChanNode writeBatchPooledReqPool := newWriteBatchPooledReqPool(iopts) writeBatchPooledReqPool.Init(opts.TagDecoderPool()) - s := &service{ - db: db, + return &service{ + state: serviceState{ + db: db, + health: &rpc.NodeHealthResult_{ + Ok: true, + Status: "up", + Bootstrapped: false, + }, + }, logger: iopts.Logger(), opts: opts, - nowFn: db.Options().ClockOptions().NowFn(), + nowFn: opts.ClockOptions().NowFn(), metrics: newServiceMetrics(scope, iopts.MetricsSamplingRate()), pools: pools{ + id: opts.IdentifierPool(), checkedBytesWrapper: wrapperPool, tagEncoder: opts.TagEncoderPool(), tagDecoder: opts.TagDecoderPool(), - id: db.Options().IdentifierPool(), segmentsArray: segmentPool, writeBatchPooledReqPool: writeBatchPooledReqPool, blockMetadataV2: opts.BlockMetadataV2Pool(), blockMetadataV2Slice: opts.BlockMetadataV2SlicePool(), }, - health: &rpc.NodeHealthResult_{ - Ok: true, - Status: "up", - Bootstrapped: false, - }, } - - return s } func (s *service) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) { - s.RLock() - health := s.health - s.RUnlock() + health, ok := s.state.Health() + if !ok { + // Health should always be set + return nil, convert.ToRPCError(errHealthNotSet) + } + + db, ok := s.state.DB() + if !ok { + // DB not yet set, just return existing health status + return health, nil + } // Update bootstrapped field if not up to date. Note that we use // IsBootstrappedAndDurable instead of IsBootstrapped to make sure @@ -223,16 +271,15 @@ func (s *service) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) { // our automated tooling will assume a node is healthy until it has // marked all its shards as available and is able to bootstrap all the // shards it owns from its own local disk. - bootstrapped := s.db.IsBootstrappedAndDurable() - + bootstrapped := db.IsBootstrappedAndDurable() if health.Bootstrapped != bootstrapped { newHealth := &rpc.NodeHealthResult_{} *newHealth = *health newHealth.Bootstrapped = bootstrapped - s.Lock() - s.health = newHealth - s.Unlock() + s.state.Lock() + s.state.health = newHealth + s.state.Unlock() // Update response health = newHealth @@ -241,23 +288,68 @@ func (s *service) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) { return health, nil } -// Bootstrapped is design to be used with cluster management tools like k8 that expect an endpoint -// that will return success if the node is healthy/bootstrapped and an error if not. We added this -// endpoint because while the Health endpoint provides the same information, this endpoint does not -// require parsing the response to determine if the node is bootstrapped or not. +// Bootstrapped is designed to be used with cluster management tools like k8s +// that expect an endpoint that will return success if the node is +// healthy/bootstrapped and an error if not. We added this endpoint because +// while the Health endpoint provides the same information, this endpoint does +// not require parsing the response to determine if the node is bootstrapped or +// not. func (s *service) Bootstrapped(ctx thrift.Context) (*rpc.NodeBootstrappedResult_, error) { - // Note that we use IsBootstrappedAndDurable instead of IsBootstrapped to make sure - // that in the scenario where a topology change has occurred, none of our automated - // tooling will assume a node is healthy until it has marked all its shards as available - // and is able to bootstrap all the shards it owns from its own local disk. - if bootstrapped := s.db.IsBootstrappedAndDurable(); !bootstrapped { + db, ok := s.state.DB() + if !ok { + return nil, convert.ToRPCError(errDatabaseIsNotInitializedYet) + } + + // Note that we use IsBootstrappedAndDurable instead of IsBootstrapped to + // make sure that in the scenario where a topology change has occurred, none + // of our automated tooling will assume a node is healthy until it has + // marked all its shards as available and is able to bootstrap all the + // shards it owns from its own local disk. + if bootstrapped := db.IsBootstrappedAndDurable(); !bootstrapped { return nil, convert.ToRPCError(errNodeIsNotBootstrapped) } return &rpc.NodeBootstrappedResult_{}, nil } +// BootstrappedInPlacementOrNoPlacement is designed to be used with cluster +// management tools like k8s that expected an endpoint that will return +// success if the node either: +// 1) Has no cluster placement set yet. +// 2) Is bootstrapped and durable, meaning it is bootstrapped and is able +// to bootstrap the shards it owns from it's own local disk. +// This is useful in addition to the Bootstrapped RPC method as it helps +// progress node addition/removal/modifications when no placement is set +// at all and therefore the node has not been able to bootstrap yet. +func (s *service) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) { + hasPlacement, err := s.opts.TopologyInitializer().TopologyIsSet() + if err != nil { + return nil, convert.ToRPCError(err) + } + + if !hasPlacement { + // No placement at all. + return &rpc.NodeBootstrappedInPlacementOrNoPlacementResult_{}, nil + } + + db, ok := s.state.DB() + if !ok { + return nil, convert.ToRPCError(errDatabaseIsNotInitializedYet) + } + + if bootstrapped := db.IsBootstrappedAndDurable(); !bootstrapped { + return nil, convert.ToRPCError(errNodeIsNotBootstrapped) + } + + return &rpc.NodeBootstrappedInPlacementOrNoPlacementResult_{}, nil +} + func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryResult_, error) { + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + ctx, sp := tchannelthrift.Context(tctx).StartTraceSpan(tracepoint.Query) sp.LogFields( opentracinglog.String("query", req.Query.String()), @@ -266,7 +358,7 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR xopentracing.Time("end", time.Unix(0, req.RangeStart)), ) - result, err := s.query(ctx, req) + result, err := s.query(ctx, db, req) if err != nil { sp.LogFields(opentracinglog.Error(err)) } @@ -275,15 +367,9 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR return result, err } -func (s *service) query(ctx context.Context, req *rpc.QueryRequest) (*rpc.QueryResult_, error) { - if s.isOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) - } - +func (s *service) query(ctx context.Context, db storage.Database, req *rpc.QueryRequest) (*rpc.QueryResult_, error) { start, rangeStartErr := convert.ToTime(req.RangeStart, req.RangeType) end, rangeEndErr := convert.ToTime(req.RangeEnd, req.RangeType) - if rangeStartErr != nil || rangeEndErr != nil { return nil, tterrors.NewBadRequestError(xerrors.FirstError(rangeStartErr, rangeEndErr)) } @@ -301,7 +387,7 @@ func (s *service) query(ctx context.Context, req *rpc.QueryRequest) (*rpc.QueryR if l := req.Limit; l != nil { opts.Limit = int(*l) } - queryResult, err := s.db.QueryIDs(ctx, nsID, index.Query{Query: q}, opts) + queryResult, err := db.QueryIDs(ctx, nsID, index.Query{Query: q}, opts) if err != nil { return nil, convert.ToRPCError(err) } @@ -330,7 +416,7 @@ func (s *service) query(ctx context.Context, req *rpc.QueryRequest) (*rpc.QueryR continue } tsID := entry.Key() - datapoints, err := s.readDatapoints(ctx, nsID, tsID, start, end, + datapoints, err := s.readDatapoints(ctx, db, nsID, tsID, start, end, req.ResultTimeType) if err != nil { return nil, convert.ToRPCError(err) @@ -342,17 +428,18 @@ func (s *service) query(ctx context.Context, req *rpc.QueryRequest) (*rpc.QueryR } func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) { - if s.isOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return nil, err } - callStart := s.nowFn() - ctx := tchannelthrift.Context(tctx) - - start, rangeStartErr := convert.ToTime(req.RangeStart, req.RangeType) - end, rangeEndErr := convert.ToTime(req.RangeEnd, req.RangeType) + var ( + callStart = s.nowFn() + ctx = tchannelthrift.Context(tctx) + start, rangeStartErr = convert.ToTime(req.RangeStart, req.RangeType) + end, rangeEndErr = convert.ToTime(req.RangeEnd, req.RangeType) + ) if rangeStartErr != nil || rangeEndErr != nil { s.metrics.fetch.ReportError(s.nowFn().Sub(callStart)) return nil, tterrors.NewBadRequestError(xerrors.FirstError(rangeStartErr, rangeEndErr)) @@ -362,7 +449,7 @@ func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchR nsID := s.pools.id.GetStringID(ctx, req.NameSpace) // Make datapoints an initialized empty array for JSON serialization as empty array than null - datapoints, err := s.readDatapoints(ctx, nsID, tsID, start, end, + datapoints, err := s.readDatapoints(ctx, db, nsID, tsID, start, end, req.ResultTimeType) if err != nil { s.metrics.fetch.ReportError(s.nowFn().Sub(callStart)) @@ -375,11 +462,12 @@ func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchR func (s *service) readDatapoints( ctx context.Context, + db storage.Database, nsID, tsID ident.ID, start, end time.Time, timeType rpc.TimeType, ) ([]*rpc.Datapoint, error) { - encoded, err := s.db.ReadEncoded(ctx, nsID, tsID, start, end) + encoded, err := db.ReadEncoded(ctx, nsID, tsID, start, end) if err != nil { return nil, err } @@ -387,7 +475,7 @@ func (s *service) readDatapoints( // Make datapoints an initialized empty array for JSON serialization as empty array than null datapoints := make([]*rpc.Datapoint, 0) - multiIt := s.db.Options().MultiReaderIteratorPool().Get() + multiIt := db.Options().MultiReaderIteratorPool().Get() multiIt.ResetSliceOfSlices(xio.NewReaderSliceOfSlicesFromBlockReadersIterator(encoded)) defer multiIt.Close() @@ -415,6 +503,11 @@ func (s *service) readDatapoints( } func (s *service) FetchTagged(tctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + ctx, sp := tchannelthrift.Context(tctx).StartTraceSpan(tracepoint.FetchTagged) sp.LogFields( opentracinglog.String("query", string(req.Query)), @@ -423,7 +516,7 @@ func (s *service) FetchTagged(tctx thrift.Context, req *rpc.FetchTaggedRequest) xopentracing.Time("end", time.Unix(0, req.RangeEnd)), ) - result, err := s.fetchTagged(ctx, req) + result, err := s.fetchTagged(ctx, db, req) if err != nil { sp.LogFields(opentracinglog.Error(err)) } @@ -432,12 +525,7 @@ func (s *service) FetchTagged(tctx thrift.Context, req *rpc.FetchTaggedRequest) return result, err } -func (s *service) fetchTagged(ctx context.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { - if s.isOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) - } - +func (s *service) fetchTagged(ctx context.Context, db storage.Database, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { callStart := s.nowFn() ns, query, opts, fetchData, err := convert.FromRPCFetchTaggedRequest(req, s.pools) @@ -446,7 +534,7 @@ func (s *service) fetchTagged(ctx context.Context, req *rpc.FetchTaggedRequest) return nil, tterrors.NewBadRequestError(err) } - queryResult, err := s.db.QueryIDs(ctx, ns, query, opts) + queryResult, err := db.QueryIDs(ctx, ns, query, opts) if err != nil { s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return nil, convert.ToRPCError(err) @@ -479,7 +567,7 @@ func (s *service) fetchTagged(ctx context.Context, req *rpc.FetchTaggedRequest) if !fetchData { continue } - segments, rpcErr := s.readEncoded(ctx, nsID, tsID, opts.StartInclusive, opts.EndExclusive) + segments, rpcErr := s.readEncoded(ctx, db, nsID, tsID, opts.StartInclusive, opts.EndExclusive) if rpcErr != nil { elem.Err = rpcErr continue @@ -492,9 +580,9 @@ func (s *service) fetchTagged(ctx context.Context, req *rpc.FetchTaggedRequest) } func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { - if s.isOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return nil, err } callStart := s.nowFn() @@ -506,7 +594,7 @@ func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest) return nil, tterrors.NewBadRequestError(err) } - queryResult, err := s.db.AggregateQuery(ctx, ns, query, opts) + queryResult, err := db.AggregateQuery(ctx, ns, query, opts) if err != nil { s.metrics.aggregate.ReportError(s.nowFn().Sub(callStart)) return nil, convert.ToRPCError(err) @@ -535,9 +623,9 @@ func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest) } func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRequest) (*rpc.AggregateQueryRawResult_, error) { - if s.isOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return nil, err } callStart := s.nowFn() @@ -549,7 +637,7 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe return nil, tterrors.NewBadRequestError(err) } - queryResult, err := s.db.AggregateQuery(ctx, ns, query, opts) + queryResult, err := db.AggregateQuery(ctx, ns, query, opts) if err != nil { s.metrics.aggregate.ReportError(s.nowFn().Sub(callStart)) return nil, convert.ToRPCError(err) @@ -602,9 +690,9 @@ func (s *service) encodeTags( } func (s *service) FetchBatchRaw(tctx thrift.Context, req *rpc.FetchBatchRawRequest) (*rpc.FetchBatchRawResult_, error) { - if s.isOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return nil, err } callStart := s.nowFn() @@ -634,7 +722,7 @@ func (s *service) FetchBatchRaw(tctx thrift.Context, req *rpc.FetchBatchRawReque result.Elements = append(result.Elements, rawResult) tsID := s.newID(ctx, req.Ids[i]) - segments, rpcErr := s.readEncoded(ctx, nsID, tsID, start, end) + segments, rpcErr := s.readEncoded(ctx, db, nsID, tsID, start, end) if rpcErr != nil { rawResult.Err = rpcErr if tterrors.IsBadRequestError(rawResult.Err) { @@ -658,17 +746,18 @@ func (s *service) FetchBatchRaw(tctx thrift.Context, req *rpc.FetchBatchRawReque } func (s *service) FetchBlocksRaw(tctx thrift.Context, req *rpc.FetchBlocksRawRequest) (*rpc.FetchBlocksRawResult_, error) { - if s.isOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return nil, err } - callStart := s.nowFn() - ctx := tchannelthrift.Context(tctx) - - nsID := s.newID(ctx, req.NameSpace) - // check if the namespace if known - nsMetadata, ok := s.db.Namespace(nsID) + var ( + callStart = s.nowFn() + ctx = tchannelthrift.Context(tctx) + nsID = s.newID(ctx, req.NameSpace) + // check if the namespace if known + nsMetadata, ok = db.Namespace(nsID) + ) if !ok { return nil, tterrors.NewBadRequestError(fmt.Errorf("unable to find specified namespace: %v", nsID.String())) } @@ -690,7 +779,7 @@ func (s *service) FetchBlocksRaw(tctx thrift.Context, req *rpc.FetchBlocksRawReq } tsID := s.newID(ctx, request.ID) - fetched, err := s.db.FetchBlocks( + fetched, err := db.FetchBlocks( ctx, nsID, uint32(req.Shard), tsID, blockStarts) if err != nil { s.metrics.fetchBlocks.ReportError(s.nowFn().Sub(callStart)) @@ -732,12 +821,11 @@ func (s *service) FetchBlocksRaw(tctx thrift.Context, req *rpc.FetchBlocksRawReq } func (s *service) FetchBlocksMetadataRawV2(tctx thrift.Context, req *rpc.FetchBlocksMetadataRawV2Request) (*rpc.FetchBlocksMetadataRawV2Result_, error) { - if s.db.IsOverloaded() { - s.metrics.overloadRejected.Inc(1) - return nil, tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return nil, err } - var err error callStart := s.nowFn() defer func() { // No need to report metric anywhere else as we capture all cases here @@ -765,7 +853,7 @@ func (s *service) FetchBlocksMetadataRawV2(tctx thrift.Context, req *rpc.FetchBl start = time.Unix(0, req.RangeStart) end = time.Unix(0, req.RangeEnd) ) - fetchedMetadata, nextPageToken, err := s.db.FetchBlocksMetadataV2( + fetchedMetadata, nextPageToken, err := db.FetchBlocksMetadataV2( ctx, nsID, uint32(req.Shard), start, end, req.Limit, req.PageToken, opts) if err != nil { return nil, convert.ToRPCError(err) @@ -867,9 +955,9 @@ func (s *service) getBlocksMetadataV2FromResult( } func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { - if s.db.IsOverloaded() { - s.metrics.overloadRejected.Inc(1) - return tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return err } callStart := s.nowFn() @@ -894,7 +982,7 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { return tterrors.NewBadRequestError(err) } - if err = s.db.Write( + if err = db.Write( ctx, s.pools.id.GetStringID(ctx, req.NameSpace), s.pools.id.GetStringID(ctx, req.ID), @@ -913,9 +1001,9 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { } func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) error { - if s.db.IsOverloaded() { - s.metrics.overloadRejected.Inc(1) - return tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return err } callStart := s.nowFn() @@ -951,7 +1039,7 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) return tterrors.NewBadRequestError(err) } - if err = s.db.WriteTagged(ctx, + if err = db.WriteTagged(ctx, s.pools.id.GetStringID(ctx, req.NameSpace), s.pools.id.GetStringID(ctx, req.ID), iter, xtime.FromNormalizedTime(dp.Timestamp, d), @@ -966,9 +1054,9 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) } func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawRequest) error { - if s.db.IsOverloaded() { - s.metrics.overloadRejected.Inc(1) - return tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return err } callStart := s.nowFn() @@ -988,7 +1076,7 @@ func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawReque nonRetryableErrors int ) - batchWriter, err := s.db.BatchWriter(nsID, len(req.Elements)) + batchWriter, err := db.BatchWriter(nsID, len(req.Elements)) if err != nil { return convert.ToRPCError(err) } @@ -1023,7 +1111,7 @@ func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawReque ) } - err = s.db.WriteBatch(ctx, nsID, batchWriter.(ts.WriteBatch), + err = db.WriteBatch(ctx, nsID, batchWriter.(ts.WriteBatch), pooledReq) if err != nil { return convert.ToRPCError(err) @@ -1049,9 +1137,9 @@ func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawReque } func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error { - if s.db.IsOverloaded() { - s.metrics.overloadRejected.Inc(1) - return tterrors.NewInternalError(errServerIsOverloaded) + db, err := s.startRPCWithDB() + if err != nil { + return err } callStart := s.nowFn() @@ -1071,7 +1159,7 @@ func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedB nonRetryableErrors int ) - batchWriter, err := s.db.BatchWriter(nsID, len(req.Elements)) + batchWriter, err := db.BatchWriter(nsID, len(req.Elements)) if err != nil { return convert.ToRPCError(err) } @@ -1113,7 +1201,7 @@ func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedB elem.Datapoint.Annotation) } - err = s.db.WriteTaggedBatch(ctx, nsID, batchWriter, pooledReq) + err = db.WriteTaggedBatch(ctx, nsID, batchWriter, pooledReq) if err != nil { return convert.ToRPCError(err) } @@ -1138,9 +1226,14 @@ func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedB } func (s *service) Repair(tctx thrift.Context) error { + db, err := s.startRPCWithDB() + if err != nil { + return err + } + callStart := s.nowFn() - if err := s.db.Repair(); err != nil { + if err := db.Repair(); err != nil { s.metrics.repair.ReportError(s.nowFn().Sub(callStart)) return convert.ToRPCError(err) } @@ -1151,9 +1244,14 @@ func (s *service) Repair(tctx thrift.Context) error { } func (s *service) Truncate(tctx thrift.Context, req *rpc.TruncateRequest) (r *rpc.TruncateResult_, err error) { + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + callStart := s.nowFn() ctx := tchannelthrift.Context(tctx) - truncated, err := s.db.Truncate(s.newID(ctx, req.NameSpace)) + truncated, err := db.Truncate(s.newID(ctx, req.NameSpace)) if err != nil { s.metrics.truncate.ReportError(s.nowFn().Sub(callStart)) return nil, convert.ToRPCError(err) @@ -1170,7 +1268,12 @@ func (s *service) Truncate(tctx thrift.Context, req *rpc.TruncateRequest) (r *rp func (s *service) GetPersistRateLimit( ctx thrift.Context, ) (*rpc.NodePersistRateLimitResult_, error) { - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + + runtimeOptsMgr := db.Options().RuntimeOptionsManager() opts := runtimeOptsMgr.Get().PersistRateLimitOptions() limitEnabled := opts.LimitEnabled() limitMbps := opts.LimitMbps() @@ -1187,7 +1290,12 @@ func (s *service) SetPersistRateLimit( ctx thrift.Context, req *rpc.NodeSetPersistRateLimitRequest, ) (*rpc.NodePersistRateLimitResult_, error) { - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + + runtimeOptsMgr := db.Options().RuntimeOptionsManager() runopts := runtimeOptsMgr.Get() opts := runopts.PersistRateLimitOptions() if req.LimitEnabled != nil { @@ -1208,7 +1316,12 @@ func (s *service) SetPersistRateLimit( func (s *service) GetWriteNewSeriesAsync( ctx thrift.Context, ) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + + runtimeOptsMgr := db.Options().RuntimeOptionsManager() value := runtimeOptsMgr.Get().WriteNewSeriesAsync() return &rpc.NodeWriteNewSeriesAsyncResult_{ WriteNewSeriesAsync: value, @@ -1219,7 +1332,12 @@ func (s *service) SetWriteNewSeriesAsync( ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesAsyncRequest, ) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + + runtimeOptsMgr := db.Options().RuntimeOptionsManager() set := runtimeOptsMgr.Get().SetWriteNewSeriesAsync(req.WriteNewSeriesAsync) if err := runtimeOptsMgr.Update(set); err != nil { return nil, tterrors.NewBadRequestError(err) @@ -1233,7 +1351,12 @@ func (s *service) GetWriteNewSeriesBackoffDuration( *rpc.NodeWriteNewSeriesBackoffDurationResult_, error, ) { - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + + runtimeOptsMgr := db.Options().RuntimeOptionsManager() value := runtimeOptsMgr.Get().WriteNewSeriesBackoffDuration() return &rpc.NodeWriteNewSeriesBackoffDurationResult_{ WriteNewSeriesBackoffDuration: int64(value / time.Millisecond), @@ -1248,11 +1371,16 @@ func (s *service) SetWriteNewSeriesBackoffDuration( *rpc.NodeWriteNewSeriesBackoffDurationResult_, error, ) { + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + unit, err := convert.ToDuration(req.DurationType) if err != nil { return nil, tterrors.NewBadRequestError(xerrors.NewInvalidParamsError(err)) } - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + runtimeOptsMgr := db.Options().RuntimeOptionsManager() value := time.Duration(req.WriteNewSeriesBackoffDuration) * unit set := runtimeOptsMgr.Get().SetWriteNewSeriesBackoffDuration(value) if err := runtimeOptsMgr.Update(set); err != nil { @@ -1267,7 +1395,12 @@ func (s *service) GetWriteNewSeriesLimitPerShardPerSecond( *rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error, ) { - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + + runtimeOptsMgr := db.Options().RuntimeOptionsManager() value := runtimeOptsMgr.Get().WriteNewSeriesLimitPerShardPerSecond() return &rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_{ WriteNewSeriesLimitPerShardPerSecond: int64(value), @@ -1281,7 +1414,12 @@ func (s *service) SetWriteNewSeriesLimitPerShardPerSecond( *rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error, ) { - runtimeOptsMgr := s.db.Options().RuntimeOptionsManager() + db, err := s.startRPCWithDB() + if err != nil { + return nil, err + } + + runtimeOptsMgr := db.Options().RuntimeOptionsManager() value := int(req.WriteNewSeriesLimitPerShardPerSecond) set := runtimeOptsMgr.Get().SetWriteNewSeriesLimitPerShardPerSecond(value) if err := runtimeOptsMgr.Update(set); err != nil { @@ -1290,11 +1428,30 @@ func (s *service) SetWriteNewSeriesLimitPerShardPerSecond( return s.GetWriteNewSeriesLimitPerShardPerSecond(ctx) } -func (s *service) isOverloaded() bool { - // NB(xichen): for now we only use the database load to determine - // whether the server is overloaded. In the future we may also take - // into account other metrics such as CPU load, disk I/O rate, etc. - return s.db.IsOverloaded() +func (s *service) SetDatabase(db storage.Database) error { + s.state.Lock() + defer s.state.Unlock() + + if s.state.db != nil { + return errDatabaseHasAlreadyBeenSet + } + + s.state.db = db + return nil +} + +func (s *service) startRPCWithDB() (storage.Database, error) { + db, ok := s.state.DB() + if !ok { + return nil, convert.ToRPCError(errDatabaseIsNotInitializedYet) + } + + if db.IsOverloaded() { + s.metrics.overloadRejected.Inc(1) + return nil, convert.ToRPCError(errServerIsOverloaded) + } + + return db, nil } func (s *service) newID(ctx context.Context, id []byte) ident.ID { @@ -1315,10 +1472,11 @@ func (s *service) newPooledID( func (s *service) readEncoded( ctx context.Context, + db storage.Database, nsID, tsID ident.ID, start, end time.Time, ) ([]*rpc.Segments, *rpc.Error) { - encoded, err := s.db.ReadEncoded(ctx, nsID, tsID, start, end) + encoded, err := db.ReadEncoded(ctx, nsID, tsID, start, end) if err != nil { return nil, convert.ToRPCError(err) } diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 769e0f39bf..6f745cba97 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -23,11 +23,14 @@ package node import ( "bytes" "context" + "errors" "fmt" "sort" "testing" "time" + "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift" @@ -125,6 +128,93 @@ func TestServiceBootstrapped(t *testing.T) { require.NoError(t, err) } +func TestServiceBootstrappedInPlacementOrNoPlacement(t *testing.T) { + type TopologyIsSetResult struct { + result bool + err error + } + + type bootstrappedAndDurableResult struct { + result bool + } + + tests := []struct { + name string + dbSet bool + TopologyIsSet *TopologyIsSetResult + bootstrappedAndDurable *bootstrappedAndDurableResult + expectErr bool + }{ + { + name: "bootstrapped in placement", + dbSet: true, + TopologyIsSet: &TopologyIsSetResult{result: true, err: nil}, + bootstrappedAndDurable: &bootstrappedAndDurableResult{result: true}, + }, + { + name: "not in placement", + dbSet: true, + TopologyIsSet: &TopologyIsSetResult{result: false, err: nil}, + }, + { + name: "topology check error", + dbSet: true, + TopologyIsSet: &TopologyIsSetResult{result: false, err: errors.New("an error")}, + expectErr: true, + }, + { + name: "db not set in placement", + dbSet: false, + TopologyIsSet: &TopologyIsSetResult{result: true, err: nil}, + expectErr: true, + }, + { + name: "not bootstrapped in placement", + dbSet: true, + TopologyIsSet: &TopologyIsSetResult{result: true, err: nil}, + bootstrappedAndDurable: &bootstrappedAndDurableResult{result: false}, + expectErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // Simulate placement + mockTopoInit := topology.NewMockInitializer(ctrl) + if r := test.TopologyIsSet; r != nil { + mockTopoInit.EXPECT().TopologyIsSet().Return(r.result, r.err) + } + + var db storage.Database + if test.dbSet { + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + // Simulate bootstrapped and durable + if r := test.bootstrappedAndDurable; r != nil { + mockDB.EXPECT().IsBootstrappedAndDurable().Return(r.result) + } + db = mockDB + } + + testOpts := testTChannelThriftOptions. + SetTopologyInitializer(mockTopoInit) + service := NewService(db, testOpts).(*service) + + // Call BootstrappedInPlacementOrNoPlacement + tctx, _ := thrift.NewContext(time.Minute) + _, err := service.BootstrappedInPlacementOrNoPlacement(tctx) + if test.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + func TestServiceQuery(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -299,6 +389,42 @@ func TestServiceQueryOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceQueryDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + start = time.Now().Add(-2 * time.Hour) + end = start.Add(2 * time.Hour) + enc = testStorageOpts.EncoderPool().Get() + nsID = "metrics" + limit = int64(100) + ) + + defer ctx.Close() + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + enc.Reset(start, 0) + + _, err := service.Query(tctx, &rpc.QueryRequest{ + Query: &rpc.Query{ + Regexp: &rpc.RegexpQuery{ + Field: "foo", + Regexp: "b.*", + }, + }, + RangeStart: start.Unix(), + RangeEnd: end.Unix(), + RangeType: rpc.TimeType_UNIX_SECONDS, + NameSpace: nsID, + Limit: &limit, + ResultTimeType: rpc.TimeType_UNIX_SECONDS, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceQueryUnknownErr(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -453,6 +579,35 @@ func TestServiceFetchIsOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceFetchDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + start = time.Now().Add(-2 * time.Hour) + end = start.Add(2 * time.Hour) + enc = testStorageOpts.EncoderPool().Get() + nsID = "metrics" + ) + + defer ctx.Close() + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + enc.Reset(start, 0) + + _, err := service.Fetch(tctx, &rpc.FetchRequest{ + RangeStart: start.Unix(), + RangeEnd: end.Unix(), + RangeType: rpc.TimeType_UNIX_SECONDS, + NameSpace: nsID, + ID: "foo", + ResultTimeType: rpc.TimeType_UNIX_SECONDS, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceFetchUnknownErr(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -667,6 +822,35 @@ func TestServiceFetchBatchRawIsOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceFetchBatchRawDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + start = time.Now().Add(-2 * time.Hour) + end = start.Add(2 * time.Hour) + enc = testStorageOpts.EncoderPool().Get() + nsID = "metrics" + ids = [][]byte{[]byte("foo"), []byte("bar")} + ) + + defer ctx.Close() + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + enc.Reset(start, 0) + + _, err := service.FetchBatchRaw(tctx, &rpc.FetchBatchRawRequest{ + RangeStart: start.Unix(), + RangeEnd: end.Unix(), + RangeTimeType: rpc.TimeType_UNIX_SECONDS, + NameSpace: []byte(nsID), + Ids: ids, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceFetchBlocksRaw(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -826,6 +1010,42 @@ func TestServiceFetchBlocksRawIsOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceFetchBlocksRawDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + nsID = "metrics" + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + start = time.Now().Add(-2 * time.Hour) + end = start.Add(2 * time.Hour) + enc = testStorageOpts.EncoderPool().Get() + ids = [][]byte{[]byte("foo"), []byte("bar")} + ) + + defer ctx.Close() + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + enc.Reset(start, 0) + + _, err := service.FetchBlocksRaw(tctx, &rpc.FetchBlocksRawRequest{ + NameSpace: []byte(nsID), + Shard: 0, + Elements: []*rpc.FetchBlocksRawRequestElement{ + &rpc.FetchBlocksRawRequestElement{ + ID: ids[0], + Starts: []int64{start.UnixNano()}, + }, + &rpc.FetchBlocksRawRequestElement{ + ID: ids[1], + Starts: []int64{start.UnixNano()}, + }, + }, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceFetchBlocksMetadataEndpointV2Raw(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1008,6 +1228,42 @@ func TestServiceFetchBlocksMetadataEndpointV2RawIsOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceFetchBlocksMetadataEndpointV2RawDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // Configure constants / options + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + now = time.Now() + start = now.Truncate(time.Hour) + end = now.Add(4 * time.Hour).Truncate(time.Hour) + limit = int64(2) + includeSizes = true + includeChecksums = true + includeLastRead = true + nsID = "metrics" + ) + + defer ctx.Close() + + // Run RPC method + _, err := service.FetchBlocksMetadataRawV2(tctx, &rpc.FetchBlocksMetadataRawV2Request{ + NameSpace: []byte(nsID), + Shard: 0, + RangeStart: start.UnixNano(), + RangeEnd: end.UnixNano(), + Limit: limit, + PageToken: nil, + IncludeSizes: &includeSizes, + IncludeChecksums: &includeChecksums, + IncludeLastRead: &includeLastRead, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceFetchTagged(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1204,6 +1460,47 @@ func TestServiceFetchTaggedIsOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceFetchTaggedDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + + start = time.Now().Add(-2 * time.Hour) + end = start.Add(2 * time.Hour) + + nsID = "metrics" + ) + + defer ctx.Close() + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + + req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*")) + require.NoError(t, err) + + startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS) + require.NoError(t, err) + endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) + require.NoError(t, err) + var limit int64 = 10 + data, err := idx.Marshal(req) + require.NoError(t, err) + + _, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: true, + Limit: &limit, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceFetchTaggedNoData(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1473,6 +1770,30 @@ func TestServiceWriteOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceWriteDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + ) + + defer ctx.Close() + + err := service.Write(tctx, &rpc.WriteRequest{ + NameSpace: "metrics", + ID: "foo", + Datapoint: &rpc.Datapoint{ + Timestamp: time.Now().Unix(), + TimestampTimeType: rpc.TimeType_UNIX_SECONDS, + Value: 42.42, + }, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceWriteTagged(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1550,6 +1871,29 @@ func TestServiceWriteTaggedOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceWriteTaggedDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + ) + defer ctx.Close() + + err := service.WriteTagged(tctx, &rpc.WriteTaggedRequest{ + NameSpace: "metrics", + ID: "foo", + Datapoint: &rpc.Datapoint{ + Timestamp: time.Now().Unix(), + TimestampTimeType: rpc.TimeType_UNIX_SECONDS, + Value: 42.42, + }, + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceWriteBatchRaw(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1624,6 +1968,23 @@ func TestServiceWriteBatchRawOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceWriteBatchRawDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + ) + defer ctx.Close() + + err := service.WriteBatchRaw(tctx, &rpc.WriteBatchRawRequest{ + NameSpace: []byte("metrics"), + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceWriteTaggedBatchRaw(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1710,6 +2071,23 @@ func TestServiceWriteTaggedBatchRawOverloaded(t *testing.T) { require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } +func TestServiceWriteTaggedBatchRawDatabaseNotSet(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + service = NewService(nil, testTChannelThriftOptions).(*service) + tctx, _ = tchannelthrift.NewContext(time.Minute) + ctx = tchannelthrift.Context(tctx) + ) + defer ctx.Close() + + err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{ + NameSpace: []byte("metrics"), + }) + require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) +} + func TestServiceWriteTaggedBatchRawUnknownError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1778,6 +2156,7 @@ func TestServiceRepair(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false) service := NewService(mockDB, testTChannelThriftOptions).(*service) @@ -1797,6 +2176,7 @@ func TestServiceTruncate(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false) service := NewService(mockDB, testTChannelThriftOptions).(*service) @@ -1829,6 +2209,7 @@ func TestServiceSetPersistRateLimit(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(opts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false).AnyTimes() service := NewService(mockDB, testTChannelThriftOptions).(*service) @@ -1861,6 +2242,7 @@ func TestServiceSetWriteNewSeriesAsync(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(opts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false).AnyTimes() service := NewService(mockDB, testTChannelThriftOptions).(*service) @@ -1892,6 +2274,7 @@ func TestServiceSetWriteNewSeriesBackoffDuration(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(opts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false).AnyTimes() service := NewService(mockDB, testTChannelThriftOptions).(*service) @@ -1926,6 +2309,7 @@ func TestServiceSetWriteNewSeriesLimitPerShardPerSecond(t *testing.T) { mockDB := storage.NewMockDatabase(ctrl) mockDB.EXPECT().Options().Return(opts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false).AnyTimes() service := NewService(mockDB, testTChannelThriftOptions).(*service) diff --git a/src/dbnode/network/server/tchannelthrift/options.go b/src/dbnode/network/server/tchannelthrift/options.go index d624ded613..f162bda490 100644 --- a/src/dbnode/network/server/tchannelthrift/options.go +++ b/src/dbnode/network/server/tchannelthrift/options.go @@ -21,13 +21,19 @@ package tchannelthrift import ( - "github.com/m3db/m3/src/x/serialize" + "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" ) type options struct { + clockOpts clock.Options instrumentOpts instrument.Options + topologyInitializer topology.Initializer + idPool ident.Pool blockMetadataV2Pool BlockMetadataV2Pool blockMetadataV2SlicePool BlockMetadataV2SlicePool tagEncoderPool serialize.TagEncoderPool @@ -36,6 +42,13 @@ type options struct { // NewOptions creates new options func NewOptions() Options { + bytesPool := pool.NewCheckedBytesPool(nil, nil, func(s []pool.Bucket) pool.BytesPool { + return pool.NewBytesPool(s, nil) + }) + bytesPool.Init() + + idPool := ident.NewPool(bytesPool, ident.PoolOptions{}) + tagEncoderPool := serialize.NewTagEncoderPool( serialize.NewTagEncoderOptions(), pool.NewObjectPoolOptions(), ) @@ -47,7 +60,9 @@ func NewOptions() Options { tagDecoderPool.Init() return &options{ + clockOpts: clock.NewOptions(), instrumentOpts: instrument.NewOptions(), + idPool: idPool, blockMetadataV2Pool: NewBlockMetadataV2Pool(nil), blockMetadataV2SlicePool: NewBlockMetadataV2SlicePool(nil, 0), tagEncoderPool: tagEncoderPool, @@ -55,6 +70,16 @@ func NewOptions() Options { } } +func (o *options) SetClockOptions(value clock.Options) Options { + opts := *o + opts.clockOpts = value + return &opts +} + +func (o *options) ClockOptions() clock.Options { + return o.clockOpts +} + func (o *options) SetInstrumentOptions(value instrument.Options) Options { opts := *o opts.instrumentOpts = value @@ -65,6 +90,26 @@ func (o *options) InstrumentOptions() instrument.Options { return o.instrumentOpts } +func (o *options) SetTopologyInitializer(value topology.Initializer) Options { + opts := *o + opts.topologyInitializer = value + return &opts +} + +func (o *options) TopologyInitializer() topology.Initializer { + return o.topologyInitializer +} + +func (o *options) SetIdentifierPool(value ident.Pool) Options { + opts := *o + opts.idPool = value + return &opts +} + +func (o *options) IdentifierPool() ident.Pool { + return o.idPool +} + func (o *options) SetBlockMetadataV2Pool(value BlockMetadataV2Pool) Options { opts := *o opts.blockMetadataV2Pool = value diff --git a/src/dbnode/network/server/tchannelthrift/types.go b/src/dbnode/network/server/tchannelthrift/types.go index 3fdc50b94d..8d5fff0bc4 100644 --- a/src/dbnode/network/server/tchannelthrift/types.go +++ b/src/dbnode/network/server/tchannelthrift/types.go @@ -21,39 +21,60 @@ package tchannelthrift import ( - "github.com/m3db/m3/src/x/serialize" + "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/serialize" ) // Options controls server behavior type Options interface { - // SetInstrumentOptions sets the instrumentation options + // SetClockOptions sets the clock options. + SetClockOptions(value clock.Options) Options + + // ClockOptions returns the clock options. + ClockOptions() clock.Options + + // SetInstrumentOptions sets the instrumentation options. SetInstrumentOptions(value instrument.Options) Options - // InstrumentOptions returns the instrumentation options + // InstrumentOptions returns the instrumentation options. InstrumentOptions() instrument.Options - // SetBlockMetadataV2Pool sets the block metadata pool + // SetTopologyInitializer sets the TopologyInitializer. + SetTopologyInitializer(value topology.Initializer) Options + + // TopologyInitializer returns the TopologyInitializer. + TopologyInitializer() topology.Initializer + + // SetIdentifierPool sets the identifier pool. + SetIdentifierPool(value ident.Pool) Options + + // IdentifierPool returns the identifier pool. + IdentifierPool() ident.Pool + + // SetBlockMetadataV2Pool sets the block metadata pool. SetBlockMetadataV2Pool(value BlockMetadataV2Pool) Options - // BlockMetadataV2Pool returns the block metadata pool + // BlockMetadataV2Pool returns the block metadata pool. BlockMetadataV2Pool() BlockMetadataV2Pool - // SetBlockMetadataV2SlicePool sets the block metadata slice pool + // SetBlockMetadataV2SlicePool sets the block metadata slice pool. SetBlockMetadataV2SlicePool(value BlockMetadataV2SlicePool) Options - // BlockMetadataV2SlicePool returns the block metadata slice pool + // BlockMetadataV2SlicePool returns the block metadata slice pool. BlockMetadataV2SlicePool() BlockMetadataV2SlicePool // SetTagEncoderPool sets the tag encoder pool. SetTagEncoderPool(value serialize.TagEncoderPool) Options - // TagEncoderPool returns the tag encoder pool + // TagEncoderPool returns the tag encoder pool. TagEncoderPool() serialize.TagEncoderPool // SetTagDecoderPool sets the tag encoder pool. SetTagDecoderPool(value serialize.TagDecoderPool) Options - // TagDecoderPool returns the tag encoder pool + // TagDecoderPool returns the tag encoder pool. TagDecoderPool() serialize.TagDecoderPool } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 4e696f1f8f..040455fc9e 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -437,6 +437,7 @@ func Run(runOpts RunOptions) { // Apply pooling options. opts = withEncodingAndPoolingOptions(cfg, logger, schema, opts, cfg.PoolingPolicy) + opts = opts.SetCommitLogOptions(opts.CommitLogOptions(). SetInstrumentOptions(opts.InstrumentOptions()). SetFilesystemOptions(fsopts). @@ -510,6 +511,51 @@ func Run(runOpts RunOptions) { opts = opts.SetNamespaceInitializer(envCfg.NamespaceInitializer) + // Set tchannelthrift options. + ttopts := tchannelthrift.NewOptions(). + SetClockOptions(opts.ClockOptions()). + SetInstrumentOptions(opts.InstrumentOptions()). + SetTopologyInitializer(envCfg.TopologyInitializer). + SetIdentifierPool(opts.IdentifierPool()). + SetTagEncoderPool(tagEncoderPool). + SetTagDecoderPool(tagDecoderPool) + + // Start servers before constructing the DB so orchestration tools can check health endpoints + // before topology is set. + var ( + contextPool = opts.ContextPool() + tchannelOpts = xtchannel.NewDefaultChannelOptions() + // Pass nil for the database argument because we haven't constructed it yet. We'll call + // SetDatabase() once we've initialized it. + service = ttnode.NewService(nil, ttopts) + ) + tchannelthriftNodeClose, err := ttnode.NewServer(service, + cfg.ListenAddress, contextPool, tchannelOpts).ListenAndServe() + if err != nil { + logger.Fatal("could not open tchannelthrift interface", + zap.String("address", cfg.ListenAddress), zap.Error(err)) + } + defer tchannelthriftNodeClose() + logger.Info("node tchannelthrift: listening", zap.String("address", cfg.ListenAddress)) + + httpjsonNodeClose, err := hjnode.NewServer(service, + cfg.HTTPNodeListenAddress, contextPool, nil).ListenAndServe() + if err != nil { + logger.Fatal("could not open httpjson interface", + zap.String("address", cfg.HTTPNodeListenAddress), zap.Error(err)) + } + defer httpjsonNodeClose() + logger.Info("node httpjson: listening", zap.String("address", cfg.HTTPNodeListenAddress)) + + if cfg.DebugListenAddress != "" { + go func() { + if err := http.ListenAndServe(cfg.DebugListenAddress, nil); err != nil { + logger.Error("debug server could not listen", + zap.String("address", cfg.DebugListenAddress), zap.Error(err)) + } + }() + } + topo, err := envCfg.TopologyInitializer.Init() if err != nil { logger.Fatal("could not initialize m3db topology", zap.Error(err)) @@ -533,10 +579,9 @@ func Run(runOpts RunOptions) { }, func(opts client.AdminOptions) client.AdminOptions { if cfg.Proto != nil { - return opts.SetEncodingProto( - schema, - encoding.NewOptions(), - ).(client.AdminOptions) + adminOpts := opts.SetEncodingProto(schema, + encoding.NewOptions()) + return adminOpts.(client.AdminOptions) } return opts }, @@ -558,12 +603,6 @@ func Run(runOpts RunOptions) { // Feature currently not working. SetRepairEnabled(false) - // Set tchannelthrift options - ttopts := tchannelthrift.NewOptions(). - SetInstrumentOptions(opts.InstrumentOptions()). - SetTagEncoderPool(tagEncoderPool). - SetTagDecoderPool(tagDecoderPool) - // Set bootstrap options - We need to create a topology map provider from the // same topology that will be passed to the cluster so that when we make // bootstrapping decisions they are in sync with the clustered database @@ -597,34 +636,7 @@ func Run(runOpts RunOptions) { bs.SetBootstrapperProvider(updated.BootstrapperProvider()) }) - // Initialize clustered database - clusterTopoWatch, err := topo.Watch() - if err != nil { - logger.Fatal("could not create cluster topology watch", zap.Error(err)) - } - db, err := cluster.NewDatabase(hostID, topo, clusterTopoWatch, opts) - if err != nil { - logger.Fatal("could not construct database", zap.Error(err)) - } - - if err := db.Open(); err != nil { - logger.Fatal("could not open database", zap.Error(err)) - } - - contextPool := opts.ContextPool() - - tchannelOpts := xtchannel.NewDefaultChannelOptions() - service := ttnode.NewService(db, ttopts) - - tchannelthriftNodeClose, err := ttnode.NewServer(service, - cfg.ListenAddress, contextPool, tchannelOpts).ListenAndServe() - if err != nil { - logger.Fatal("could not open tchannelthrift interface", - zap.String("address", cfg.ListenAddress), zap.Error(err)) - } - defer tchannelthriftNodeClose() - logger.Info("node tchannelthrift: listening", zap.String("address", cfg.ListenAddress)) - + // Start the cluster services now that the M3DB client is available. tchannelthriftClusterClose, err := ttcluster.NewServer(m3dbClient, cfg.ClusterListenAddress, contextPool, tchannelOpts).ListenAndServe() if err != nil { @@ -634,15 +646,6 @@ func Run(runOpts RunOptions) { defer tchannelthriftClusterClose() logger.Info("cluster tchannelthrift: listening", zap.String("address", cfg.ClusterListenAddress)) - httpjsonNodeClose, err := hjnode.NewServer(service, - cfg.HTTPNodeListenAddress, contextPool, nil).ListenAndServe() - if err != nil { - logger.Fatal("could not open httpjson interface", - zap.String("address", cfg.HTTPNodeListenAddress), zap.Error(err)) - } - defer httpjsonNodeClose() - logger.Info("node httpjson: listening", zap.String("address", cfg.HTTPNodeListenAddress)) - httpjsonClusterClose, err := hjcluster.NewServer(m3dbClient, cfg.HTTPClusterListenAddress, contextPool, nil).ListenAndServe() if err != nil { @@ -652,15 +655,24 @@ func Run(runOpts RunOptions) { defer httpjsonClusterClose() logger.Info("cluster httpjson: listening", zap.String("address", cfg.HTTPClusterListenAddress)) - if cfg.DebugListenAddress != "" { - go func() { - if err := http.ListenAndServe(cfg.DebugListenAddress, nil); err != nil { - logger.Error("debug server could not listen", - zap.String("address", cfg.DebugListenAddress), zap.Error(err)) - } - }() + // Initialize clustered database. + clusterTopoWatch, err := topo.Watch() + if err != nil { + logger.Fatal("could not create cluster topology watch", zap.Error(err)) + } + + db, err := cluster.NewDatabase(hostID, topo, clusterTopoWatch, opts) + if err != nil { + logger.Fatal("could not construct database", zap.Error(err)) + } + + if err := db.Open(); err != nil { + logger.Fatal("could not open database", zap.Error(err)) } + // Now that we've initialized the database we can set it on the service. + service.SetDatabase(db) + go func() { if runOpts.BootstrapCh != nil { // Notify on bootstrap chan if specified @@ -1060,8 +1072,17 @@ func withEncodingAndPoolingOptions( logger.Fatal("unrecognized pooling type", zap.Any("type", policy.Type)) } - logger.Sugar().Infof("bytes pool %s init", policy.Type) - bytesPool.Init() + { + // Avoid polluting the rest of the function with `l` var + l := logger + if t := policy.Type; t != nil { + l = l.With(zap.String("policy", string(*t))) + } + + l.Info("bytes pool init") + bytesPool.Init() + l.Info("bytes pool init done") + } segmentReaderPool := xio.NewSegmentReaderPool( poolOptions( diff --git a/src/dbnode/topology/dynamic.go b/src/dbnode/topology/dynamic.go index 21cc81ca15..2c9d06251b 100644 --- a/src/dbnode/topology/dynamic.go +++ b/src/dbnode/topology/dynamic.go @@ -24,6 +24,7 @@ import ( "errors" "sync" + "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" @@ -73,6 +74,25 @@ func (i *dynamicInitializer) Init() (Topology, error) { return i.topo, nil } +func (i *dynamicInitializer) TopologyIsSet() (bool, error) { + services, err := i.opts.ConfigServiceClient().Services(i.opts.ServicesOverrideOptions()) + if err != nil { + return false, err + } + + _, err = services.Query(i.opts.ServiceID(), i.opts.QueryOptions()) + if err != nil { + if err == kv.ErrNotFound { + // Valid, just means topology is not set + return false, nil + } + + return false, err + } + + return true, nil +} + type dynamicTopology struct { sync.RWMutex opts DynamicOptions diff --git a/src/dbnode/topology/static.go b/src/dbnode/topology/static.go index e748c7adac..c3bd9515a4 100644 --- a/src/dbnode/topology/static.go +++ b/src/dbnode/topology/static.go @@ -34,7 +34,7 @@ type staticInitializer struct { opts StaticOptions } -// NewStaticInitializer creates a static topology initializer +// NewStaticInitializer creates a static topology initializer. func NewStaticInitializer(opts StaticOptions) Initializer { return staticInitializer{opts} } @@ -46,11 +46,16 @@ func (i staticInitializer) Init() (Topology, error) { return NewStaticTopology(i.opts), nil } +func (i staticInitializer) TopologyIsSet() (bool, error) { + // Always has the specified static topology ready. + return true, nil +} + type staticTopology struct { w xwatch.Watchable } -// NewStaticTopology creates a static topology +// NewStaticTopology creates a static topology. func NewStaticTopology(opts StaticOptions) Topology { w := xwatch.NewWatchable() w.Update(NewStaticMap(opts)) @@ -62,7 +67,7 @@ func (t *staticTopology) Get() Map { } func (t *staticTopology) Watch() (MapWatch, error) { - // Topology is static, the returned watch will not receive any updates + // Topology is static, the returned watch will not receive any updates. _, w, err := t.w.Watch() if err != nil { return nil, err diff --git a/src/dbnode/topology/topology_mock.go b/src/dbnode/topology/topology_mock.go index 66caf9d9d6..18ce1a9d23 100644 --- a/src/dbnode/topology/topology_mock.go +++ b/src/dbnode/topology/topology_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/topology/types.go -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -190,6 +190,21 @@ func (mr *MockInitializerMockRecorder) Init() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockInitializer)(nil).Init)) } +// TopologyIsSet mocks base method +func (m *MockInitializer) TopologyIsSet() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TopologyIsSet") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TopologyIsSet indicates an expected call of TopologyIsSet +func (mr *MockInitializerMockRecorder) TopologyIsSet() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopologyIsSet", reflect.TypeOf((*MockInitializer)(nil).TopologyIsSet)) +} + // MockTopology is a mock of Topology interface type MockTopology struct { ctrl *gomock.Controller diff --git a/src/dbnode/topology/types.go b/src/dbnode/topology/types.go index cf47649f09..9c3ffbd18f 100644 --- a/src/dbnode/topology/types.go +++ b/src/dbnode/topology/types.go @@ -54,6 +54,11 @@ type HostShardSet interface { type Initializer interface { // Init will return a new topology Init() (Topology, error) + + // TopologyIsSet returns whether the topology is able to be + // initialized immediately or if instead it will blockingly + // wait to be set on initialization + TopologyIsSet() (bool, error) } // Topology is a container of a topology map and disseminates topology map changes