From 5d16549f805e537f4b55b661018680476f075ec2 Mon Sep 17 00:00:00 2001 From: Xu Ning Date: Wed, 29 Nov 2017 18:52:58 -0800 Subject: [PATCH] Cache results in ReadDestination and ReadConsumerGroup metadata calls --- .gitignore | 1 + clients/metadata/metadata_cassandra.go | 124 +++++++++++++++++++++---- test/integration/base.go | 56 ++++++----- 3 files changed, 142 insertions(+), 39 deletions(-) diff --git a/.gitignore b/.gitignore index 6709caed..a973180d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ cherami-server cherami-replicator-server cdb cherami-store-tool +cmq diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index 00605622..4a9d136b 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -29,6 +29,7 @@ import ( "time" "github.com/uber/cherami-server/common" + "github.com/uber/cherami-server/common/cache" "github.com/uber/cherami-server/common/configure" m "github.com/uber/cherami-thrift/.generated/go/metadata" "github.com/uber/cherami-thrift/.generated/go/shared" @@ -177,6 +178,10 @@ const cassandraProtoVersion = 4 const deleteExtentTTLSeconds = int64(time.Hour*24) / int64(time.Second) const defaultDeleteTTLSeconds = int64(time.Hour*24*30) / int64(time.Second) +const destinationCacheSize = 1048576 +const consumerGroupCacheSize = 1048576 +const cacheTTL = time.Second + // CassandraMetadataService Implements TChanMetadataServiceClient interface // TODO: Convert all errors to the ones defined in the thrift API. type CassandraMetadataService struct { @@ -186,6 +191,9 @@ type CassandraMetadataService struct { highConsLevel gocql.Consistency // Strongest cons level that can be used for this session clusterName string log bark.Logger + + destinationCache cache.Cache + consumerGroupCache cache.Cache } // interface implementation check @@ -278,6 +286,9 @@ func NewCassandraMetadataService(cfg configure.CommonMetadataConfig, log bark.Lo highConsLevel: highCons, clusterName: clusterName, log: log.WithField(common.TagModule, `metadata`), + + destinationCache: cache.New(destinationCacheSize, &cache.Options{TTL: cacheTTL}), + consumerGroupCache: cache.New(consumerGroupCacheSize, &cache.Options{TTL: cacheTTL}), }, nil } @@ -649,6 +660,25 @@ func unmarshalDstZoneConfigs(configsData []map[string]interface{}) []*shared.Des return configs } +type readDestinationResponse struct { + result *shared.DestinationDescription + err error +} + +func (s *CassandraMetadataService) cacheReadDestinationResponse(key string, result *shared.DestinationDescription, + err error) (*shared.DestinationDescription, error) { + item := &readDestinationResponse{ + result: result, + err: err, + } + + if key != "" { + s.destinationCache.Put(key, item) + } + + return result, err +} + // ReadDestination implements the corresponding TChanMetadataServiceClient API // Either path or destinationUUID can be specified. // Deleted destinations are returned with DELETED status only when destinationUUID is used. @@ -669,6 +699,18 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques } } + var key string + if getRequest.Path != nil { + key = getRequest.GetPath() + } else { + key = getRequest.GetDestinationUUID() + } + + cached := s.destinationCache.Get(key) + if cached != nil { + return cached.(*readDestinationResponse).result, cached.(*readDestinationResponse).err + } + result = getUtilDestinationDescription() var zoneConfigsData []map[string]interface{} if getRequest.Path != nil { @@ -722,20 +764,20 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques dest = getRequest.GetDestinationUUID() } - return nil, &shared.EntityNotExistsError{ + return s.cacheReadDestinationResponse(key, nil, &shared.EntityNotExistsError{ Message: fmt.Sprintf("Destination %s does not exist", dest), - } + }) } - return nil, &shared.InternalServiceError{ + return s.cacheReadDestinationResponse(key, nil, &shared.InternalServiceError{ Message: err.Error(), - } + }) } *result.DLQPurgeBefore = int64(cqlTimestampToUnixNano(*result.DLQPurgeBefore)) *result.DLQMergeBefore = int64(cqlTimestampToUnixNano(*result.DLQMergeBefore)) - return result, nil + return s.cacheReadDestinationResponse(key, result, nil) } // UpdateDestination implements the corresponding TChanMetadataServiceClient API @@ -856,6 +898,9 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR existing.ZoneConfigs = updateRequest.GetZoneConfigs() } existing.IsMultiZone = common.BoolPtr(isMultiZone) + + s.destinationCache.Delete(existing.GetPath()) + s.destinationCache.Delete(updateRequest.GetDestinationUUID()) return existing, nil } @@ -909,6 +954,9 @@ func (s *CassandraMetadataService) DeleteDestination(ctx thrift.Context, deleteR opsDelete, time.Now(), marshalRequest(deleteRequest)) + + s.destinationCache.Delete(existing.GetPath()) + s.destinationCache.Delete(existing.GetDestinationUUID()) return nil } @@ -974,6 +1022,8 @@ func (s *CassandraMetadataService) DeleteDestinationUUID(ctx thrift.Context, del time.Now(), marshalRequest(deleteRequest)) + s.destinationCache.Delete(existing.GetPath()) + s.destinationCache.Delete(existing.GetDestinationUUID()) return nil } @@ -1473,8 +1523,34 @@ func (s *CassandraMetadataService) createDlqDestination(cgUUID string, cgName st return dlqDestDesc, err } +type readConsumerGroupResponse struct { + result *shared.ConsumerGroupDescription + err error +} + +func (s *CassandraMetadataService) cacheReadConsumerGroupResponse(key string, result *shared.ConsumerGroupDescription, + err error) (*shared.ConsumerGroupDescription, error) { + item := &readConsumerGroupResponse{ + result: result, + err: err, + } + + if key != "" { + s.consumerGroupCache.Put(key, item) + } + + return result, err +} + func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cgName string) (*shared.ConsumerGroupDescription, error) { result := getUtilConsumerGroupDescription() + + key := dstUUID + cgName + cached := s.consumerGroupCache.Get(key) + if cached != nil { + return cached.(*readConsumerGroupResponse).result, cached.(*readConsumerGroupResponse).err + } + var zoneConfigsData []map[string]interface{} query := s.session.Query(sqlGetCGByName, dstUUID, cgName).Consistency(s.lowConsLevel) if err := query.Scan( @@ -1494,17 +1570,18 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg &zoneConfigsData, &result.Options); err != nil { if err == gocql.ErrNotFound { - return nil, &shared.EntityNotExistsError{ + return s.cacheReadConsumerGroupResponse(key, nil, &shared.EntityNotExistsError{ Message: fmt.Sprintf("ConsumerGroup %s of destinationUUID %s does not exist", cgName, dstUUID), - } + }) } - return nil, &shared.InternalServiceError{ + return s.cacheReadConsumerGroupResponse(key, nil, &shared.InternalServiceError{ Message: err.Error(), - } + }) } + result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) - return result, nil + return s.cacheReadConsumerGroupResponse(key, result, nil) } // ReadConsumerGroup returns the ConsumerGroupDescription for the [destinationPath, groupName]. @@ -1546,6 +1623,12 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r return nil, &shared.BadRequestError{Message: "ConsumerGroupUUID cannot be nil"} } + key := request.GetConsumerGroupUUID() + cached := s.consumerGroupCache.Get(key) + if cached != nil { + return cached.(*readConsumerGroupResponse).result, cached.(*readConsumerGroupResponse).err + } + result := getUtilConsumerGroupDescription() var zoneConfigsData []map[string]interface{} query := s.session.Query(sqlGetCGByUUID, request.GetConsumerGroupUUID()).Consistency(s.lowConsLevel) @@ -1566,18 +1649,18 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r &zoneConfigsData, &result.Options); err != nil { if err == gocql.ErrNotFound { - return nil, &shared.EntityNotExistsError{ + return s.cacheReadConsumerGroupResponse(key, nil, &shared.EntityNotExistsError{ Message: fmt.Sprintf("ConsumerGroup %s does not exist", *request.ConsumerGroupUUID), - } + }) } - return nil, &shared.InternalServiceError{ + return s.cacheReadConsumerGroupResponse(key, nil, &shared.InternalServiceError{ Message: err.Error(), - } + }) } - result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) - return result, nil + result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) + return s.cacheReadConsumerGroupResponse(key, result, nil) } func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *shared.ConsumerGroupDescription) bool { @@ -1731,6 +1814,9 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque time.Now(), marshalRequest(request)) + s.consumerGroupCache.Delete(newCG.GetDestinationUUID() + newCG.GetConsumerGroupName()) + s.consumerGroupCache.Delete(newCG.GetConsumerGroupUUID()) + return newCG, nil } @@ -1859,6 +1945,10 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque time.Now(), marshalRequest(request)) + s.consumerGroupCache.Delete(existingCG.GetDestinationUUID() + existingCG.GetConsumerGroupName()) + s.consumerGroupCache.Delete(existingCG.GetConsumerGroupUUID()) + s.destinationCache.Delete(dlqDstID) + return nil } @@ -1922,6 +2012,8 @@ func (s *CassandraMetadataService) DeleteConsumerGroupUUID(ctx thrift.Context, r time.Now(), marshalRequest(request)) + s.consumerGroupCache.Delete(existing.GetDestinationUUID() + existing.GetConsumerGroupName()) + s.consumerGroupCache.Delete(existing.GetConsumerGroupUUID()) return nil } diff --git a/test/integration/base.go b/test/integration/base.go index 012bbb28..087a8638 100644 --- a/test/integration/base.go +++ b/test/integration/base.go @@ -50,15 +50,17 @@ import ( type ( testBase struct { - Frontends map[string]*frontendhost.Frontend - InputHosts map[string]*inputhost.InputHost - OutputHosts map[string]*outputhost.OutputHost - StoreHosts map[string]*storehost.StoreHost - Controllers map[string]*controllerhost.Mcp - mClient *metadata.CassandraMetadataService - UUIDResolver common.UUIDResolver - keyspace string - storageBaseDir string + Frontends map[string]*frontendhost.Frontend + InputHosts map[string]*inputhost.InputHost + OutputHosts map[string]*outputhost.OutputHost + StoreHosts map[string]*storehost.StoreHost + Controllers map[string]*controllerhost.Mcp + mClient *metadata.CassandraMetadataService + UUIDResolver common.UUIDResolver + keyspace string + storageBaseDir string + auth configure.Authentication + *require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error suite.Suite } @@ -159,23 +161,17 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) { tb.keyspace = "integration_test" tb.Assertions = require.New(tb.T()) - auth := configure.Authentication{ + tb.auth = configure.Authentication{ Enabled: true, Username: "cassandra", Password: "cassandra", } // create the keyspace first - err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, auth) + err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, tb.auth) tb.NoError(err) - tb.mClient, _ = metadata.NewCassandraMetadataService(&configure.MetadataConfig{ - CassandraHosts: "127.0.0.1", - Port: 9042, - Keyspace: tb.keyspace, - Consistency: "One", - Authentication: auth, - }, nil) + tb.mClient = tb.GetNewMetadataClient() tb.NotNil(tb.mClient) // Drop the keyspace, if it exists. This preserves the keyspace for inspection if the test fails, and simplifies cleanup @@ -198,6 +194,18 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) { cassConfig.SetRefreshInterval(10 * time.Millisecond) } +func (tb *testBase) GetNewMetadataClient() *metadata.CassandraMetadataService { + s, _ := metadata.NewCassandraMetadataService(&configure.MetadataConfig{ + CassandraHosts: "127.0.0.1", + Port: 9042, + Keyspace: tb.keyspace, + Consistency: "One", + Authentication: tb.auth, + }, nil) + + return s +} + func (tb *testBase) TearDownSuite() { } @@ -208,6 +216,8 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { tb.storageBaseDir, err = ioutil.TempDir("", "cherami_integration_test_") tb.NoError(err) + tb.mClient = tb.GetNewMetadataClient() + tb.NotNil(tb.mClient) tb.UUIDResolver = common.NewUUIDResolver(tb.mClient) hwInfoReader := common.NewHostHardwareInfoReader(tb.mClient) @@ -232,7 +242,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.StoreServiceName) sCommon := common.NewService(common.StoreServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) log.Infof("store ringHosts: %v", cfg.GetRingHosts()) - sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.mClient, storehostOpts) + sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.GetNewMetadataClient(), storehostOpts) sh.Start(tc) // start websocket server @@ -256,7 +266,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.InputServiceName) sCommon := common.NewService(common.InputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) log.Infof("input ringHosts: %v", cfg.GetRingHosts()) - ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.mClient, nil) + ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.GetNewMetadataClient(), nil) ih.Start(tc) // start websocket server common.WSStart(cfg.GetListenAddress().String(), cfg.GetWebsocketPort(), ih) @@ -272,7 +282,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { sCommon := common.NewService(common.FrontendServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) log.Infof("front ringHosts: %v", cfg.GetRingHosts()) - fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.mClient, cfgMap[common.FrontendServiceName][i]) + fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.GetNewMetadataClient(), cfgMap[common.FrontendServiceName][i]) fh.Start(tc) tb.Frontends[hostID] = fh frontendForOut = fh @@ -288,7 +298,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { oh, tc := outputhost.NewOutputHost( common.OutputServiceName, sCommon, - tb.mClient, + tb.GetNewMetadataClient(), frontendForOut, nil, cfgMap[common.OutputServiceName][i].GetKafkaConfig(), @@ -308,7 +318,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { reporter := common.NewTestMetricsReporter() dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.ControllerServiceName) sVice := common.NewService(serviceName, uuid.New(), cfg.ServiceConfig[serviceName], tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) - ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient, common.NewDummyZoneFailoverManager()) + ch, tc := controllerhost.NewController(cfg, sVice, tb.GetNewMetadataClient(), common.NewDummyZoneFailoverManager()) ch.Start(tc) tb.Controllers[hostID] = ch }