diff --git a/.gitignore b/.gitignore index 6709caed..a973180d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ cherami-server cherami-replicator-server cdb cherami-store-tool +cmq diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index 60eba531..354b2b3b 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -29,6 +29,7 @@ import ( "time" "github.com/uber/cherami-server/common" + "github.com/uber/cherami-server/common/cache" "github.com/uber/cherami-server/common/configure" m "github.com/uber/cherami-thrift/.generated/go/metadata" "github.com/uber/cherami-thrift/.generated/go/shared" @@ -177,6 +178,10 @@ const cassandraProtoVersion = 4 const deleteExtentTTLSeconds = int64(time.Hour*24) / int64(time.Second) const defaultDeleteTTLSeconds = int64(time.Hour*24*30) / int64(time.Second) +const destinationCacheSize = 1048576 +const consumerGroupCacheSize = 1048576 +const cacheTTL = time.Second + // CassandraMetadataService Implements TChanMetadataServiceClient interface // TODO: Convert all errors to the ones defined in the thrift API. type CassandraMetadataService struct { @@ -186,6 +191,9 @@ type CassandraMetadataService struct { highConsLevel gocql.Consistency // Strongest cons level that can be used for this session clusterName string log bark.Logger + + destinationCache cache.Cache + consumerGroupCache cache.Cache } // interface implementation check @@ -278,6 +286,9 @@ func NewCassandraMetadataService(cfg configure.CommonMetadataConfig, log bark.Lo highConsLevel: highCons, clusterName: clusterName, log: log.WithField(common.TagModule, `metadata`), + + destinationCache: cache.New(destinationCacheSize, &cache.Options{TTL: cacheTTL}), + consumerGroupCache: cache.New(consumerGroupCacheSize, &cache.Options{TTL: cacheTTL}), }, nil } @@ -669,6 +680,18 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques } } + var key string + if getRequest.Path != nil { + key = "dstpath:" + getRequest.GetPath() + } else { + key = "dstuuid:" + getRequest.GetDestinationUUID() + } + + cached := s.destinationCache.Get(key) + if cached != nil { + return cached.(*shared.DestinationDescription), nil + } + result = getUtilDestinationDescription() var zoneConfigsData []map[string]interface{} if getRequest.Path != nil { @@ -735,6 +758,7 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques *result.DLQPurgeBefore = int64(cqlTimestampToUnixNano(*result.DLQPurgeBefore)) *result.DLQMergeBefore = int64(cqlTimestampToUnixNano(*result.DLQMergeBefore)) + s.destinationCache.Put(key, result) return result, nil } @@ -856,6 +880,9 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR existing.ZoneConfigs = updateRequest.GetZoneConfigs() } existing.IsMultiZone = common.BoolPtr(isMultiZone) + + s.destinationCache.Delete("dstpath:" + existing.GetPath()) + s.destinationCache.Delete("dstuuid:" + updateRequest.GetDestinationUUID()) return existing, nil } @@ -909,6 +936,9 @@ func (s *CassandraMetadataService) DeleteDestination(ctx thrift.Context, deleteR opsDelete, time.Now(), marshalRequest(deleteRequest)) + + s.destinationCache.Delete("dstpath:" + existing.GetPath()) + s.destinationCache.Delete("dstuuid:" + existing.GetDestinationUUID()) return nil } @@ -974,6 +1004,8 @@ func (s *CassandraMetadataService) DeleteDestinationUUID(ctx thrift.Context, del time.Now(), marshalRequest(deleteRequest)) + s.destinationCache.Delete("dstpath:" + existing.GetPath()) + s.destinationCache.Delete("dstuuid:" + existing.GetDestinationUUID()) return nil } @@ -1487,6 +1519,13 @@ func (s *CassandraMetadataService) createDlqDestination(cgUUID string, cgName st func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cgName string) (*shared.ConsumerGroupDescription, error) { result := getUtilConsumerGroupDescription() + + key := "dstuuid_cgname:" + dstUUID + cgName + cached := s.consumerGroupCache.Get(key) + if cached != nil { + return cached.(*shared.ConsumerGroupDescription), nil + } + var zoneConfigsData []map[string]interface{} query := s.session.Query(sqlGetCGByName, dstUUID, cgName).Consistency(s.lowConsLevel) if err := query.Scan( @@ -1515,7 +1554,13 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg Message: err.Error(), } } + result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) + s.consumerGroupCache.Put(key, result) + if result.ConsumerGroupUUID != nil { + s.consumerGroupCache.Put("cguuid:"+*result.ConsumerGroupUUID, result) + } + return result, nil } @@ -1558,6 +1603,12 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r return nil, &shared.BadRequestError{Message: "ConsumerGroupUUID cannot be nil"} } + key := "cguuid:" + request.GetConsumerGroupUUID() + cached := s.consumerGroupCache.Get(key) + if cached != nil { + return cached.(*shared.ConsumerGroupDescription), nil + } + result := getUtilConsumerGroupDescription() var zoneConfigsData []map[string]interface{} query := s.session.Query(sqlGetCGByUUID, request.GetConsumerGroupUUID()).Consistency(s.lowConsLevel) @@ -1587,8 +1638,12 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r Message: err.Error(), } } - result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) + result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) + s.consumerGroupCache.Put(key, result) + if result.DestinationUUID != nil && result.ConsumerGroupName != nil { + s.consumerGroupCache.Put("dstuuid_cgname:"+*result.DestinationUUID+*result.ConsumerGroupName, result) + } return result, nil } @@ -1743,6 +1798,9 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque time.Now(), marshalRequest(request)) + s.consumerGroupCache.Delete("dstuuid_cgname:" + newCG.GetDestinationUUID() + newCG.GetConsumerGroupName()) + s.consumerGroupCache.Delete("cguuid:" + newCG.GetConsumerGroupUUID()) + return newCG, nil } @@ -1871,6 +1929,10 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque time.Now(), marshalRequest(request)) + s.consumerGroupCache.Delete("dstuuid_cgname:" + existingCG.GetDestinationUUID() + existingCG.GetConsumerGroupName()) + s.consumerGroupCache.Delete("cguuid:" + existingCG.GetConsumerGroupUUID()) + s.destinationCache.Delete("dstuuid:" + dlqDstID) + return nil } @@ -1934,6 +1996,8 @@ func (s *CassandraMetadataService) DeleteConsumerGroupUUID(ctx thrift.Context, r time.Now(), marshalRequest(request)) + s.consumerGroupCache.Delete("dstuuid_cgname:" + existing.GetDestinationUUID() + existing.GetConsumerGroupName()) + s.consumerGroupCache.Delete("cguuid:" + existing.GetConsumerGroupUUID()) return nil } diff --git a/test/integration/base.go b/test/integration/base.go index 012bbb28..087a8638 100644 --- a/test/integration/base.go +++ b/test/integration/base.go @@ -50,15 +50,17 @@ import ( type ( testBase struct { - Frontends map[string]*frontendhost.Frontend - InputHosts map[string]*inputhost.InputHost - OutputHosts map[string]*outputhost.OutputHost - StoreHosts map[string]*storehost.StoreHost - Controllers map[string]*controllerhost.Mcp - mClient *metadata.CassandraMetadataService - UUIDResolver common.UUIDResolver - keyspace string - storageBaseDir string + Frontends map[string]*frontendhost.Frontend + InputHosts map[string]*inputhost.InputHost + OutputHosts map[string]*outputhost.OutputHost + StoreHosts map[string]*storehost.StoreHost + Controllers map[string]*controllerhost.Mcp + mClient *metadata.CassandraMetadataService + UUIDResolver common.UUIDResolver + keyspace string + storageBaseDir string + auth configure.Authentication + *require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error suite.Suite } @@ -159,23 +161,17 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) { tb.keyspace = "integration_test" tb.Assertions = require.New(tb.T()) - auth := configure.Authentication{ + tb.auth = configure.Authentication{ Enabled: true, Username: "cassandra", Password: "cassandra", } // create the keyspace first - err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, auth) + err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, tb.auth) tb.NoError(err) - tb.mClient, _ = metadata.NewCassandraMetadataService(&configure.MetadataConfig{ - CassandraHosts: "127.0.0.1", - Port: 9042, - Keyspace: tb.keyspace, - Consistency: "One", - Authentication: auth, - }, nil) + tb.mClient = tb.GetNewMetadataClient() tb.NotNil(tb.mClient) // Drop the keyspace, if it exists. This preserves the keyspace for inspection if the test fails, and simplifies cleanup @@ -198,6 +194,18 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) { cassConfig.SetRefreshInterval(10 * time.Millisecond) } +func (tb *testBase) GetNewMetadataClient() *metadata.CassandraMetadataService { + s, _ := metadata.NewCassandraMetadataService(&configure.MetadataConfig{ + CassandraHosts: "127.0.0.1", + Port: 9042, + Keyspace: tb.keyspace, + Consistency: "One", + Authentication: tb.auth, + }, nil) + + return s +} + func (tb *testBase) TearDownSuite() { } @@ -208,6 +216,8 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { tb.storageBaseDir, err = ioutil.TempDir("", "cherami_integration_test_") tb.NoError(err) + tb.mClient = tb.GetNewMetadataClient() + tb.NotNil(tb.mClient) tb.UUIDResolver = common.NewUUIDResolver(tb.mClient) hwInfoReader := common.NewHostHardwareInfoReader(tb.mClient) @@ -232,7 +242,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.StoreServiceName) sCommon := common.NewService(common.StoreServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) log.Infof("store ringHosts: %v", cfg.GetRingHosts()) - sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.mClient, storehostOpts) + sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.GetNewMetadataClient(), storehostOpts) sh.Start(tc) // start websocket server @@ -256,7 +266,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.InputServiceName) sCommon := common.NewService(common.InputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) log.Infof("input ringHosts: %v", cfg.GetRingHosts()) - ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.mClient, nil) + ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.GetNewMetadataClient(), nil) ih.Start(tc) // start websocket server common.WSStart(cfg.GetListenAddress().String(), cfg.GetWebsocketPort(), ih) @@ -272,7 +282,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { sCommon := common.NewService(common.FrontendServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) log.Infof("front ringHosts: %v", cfg.GetRingHosts()) - fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.mClient, cfgMap[common.FrontendServiceName][i]) + fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.GetNewMetadataClient(), cfgMap[common.FrontendServiceName][i]) fh.Start(tc) tb.Frontends[hostID] = fh frontendForOut = fh @@ -288,7 +298,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { oh, tc := outputhost.NewOutputHost( common.OutputServiceName, sCommon, - tb.mClient, + tb.GetNewMetadataClient(), frontendForOut, nil, cfgMap[common.OutputServiceName][i].GetKafkaConfig(), @@ -308,7 +318,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { reporter := common.NewTestMetricsReporter() dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.ControllerServiceName) sVice := common.NewService(serviceName, uuid.New(), cfg.ServiceConfig[serviceName], tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager()) - ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient, common.NewDummyZoneFailoverManager()) + ch, tc := controllerhost.NewController(cfg, sVice, tb.GetNewMetadataClient(), common.NewDummyZoneFailoverManager()) ch.Start(tc) tb.Controllers[hostID] = ch }