Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Cache results in ReadDestination and ReadConsumerGroup metadata calls…
Browse files Browse the repository at this point in the history
… redux (#340)
  • Loading branch information
thuningxu authored Dec 7, 2017
1 parent d93a4df commit a40cd68
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ cherami-server
cherami-replicator-server
cdb
cherami-store-tool
cmq
66 changes: 65 additions & 1 deletion clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/cache"
"github.com/uber/cherami-server/common/configure"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
Expand Down Expand Up @@ -177,6 +178,10 @@ const cassandraProtoVersion = 4
const deleteExtentTTLSeconds = int64(time.Hour*24) / int64(time.Second)
const defaultDeleteTTLSeconds = int64(time.Hour*24*30) / int64(time.Second)

const destinationCacheSize = 1048576
const consumerGroupCacheSize = 1048576
const cacheTTL = time.Second

// CassandraMetadataService Implements TChanMetadataServiceClient interface
// TODO: Convert all errors to the ones defined in the thrift API.
type CassandraMetadataService struct {
Expand All @@ -186,6 +191,9 @@ type CassandraMetadataService struct {
highConsLevel gocql.Consistency // Strongest cons level that can be used for this session
clusterName string
log bark.Logger

destinationCache cache.Cache
consumerGroupCache cache.Cache
}

// interface implementation check
Expand Down Expand Up @@ -278,6 +286,9 @@ func NewCassandraMetadataService(cfg configure.CommonMetadataConfig, log bark.Lo
highConsLevel: highCons,
clusterName: clusterName,
log: log.WithField(common.TagModule, `metadata`),

destinationCache: cache.New(destinationCacheSize, &cache.Options{TTL: cacheTTL}),
consumerGroupCache: cache.New(consumerGroupCacheSize, &cache.Options{TTL: cacheTTL}),
}, nil
}

Expand Down Expand Up @@ -669,6 +680,18 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
}
}

var key string
if getRequest.Path != nil {
key = "dstpath:" + getRequest.GetPath()
} else {
key = "dstuuid:" + getRequest.GetDestinationUUID()
}

cached := s.destinationCache.Get(key)
if cached != nil {
return cached.(*shared.DestinationDescription), nil
}

result = getUtilDestinationDescription()
var zoneConfigsData []map[string]interface{}
if getRequest.Path != nil {
Expand Down Expand Up @@ -735,6 +758,7 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
*result.DLQPurgeBefore = int64(cqlTimestampToUnixNano(*result.DLQPurgeBefore))
*result.DLQMergeBefore = int64(cqlTimestampToUnixNano(*result.DLQMergeBefore))

s.destinationCache.Put(key, result)
return result, nil
}

Expand Down Expand Up @@ -856,6 +880,9 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR
existing.ZoneConfigs = updateRequest.GetZoneConfigs()
}
existing.IsMultiZone = common.BoolPtr(isMultiZone)

s.destinationCache.Delete("dstpath:" + existing.GetPath())
s.destinationCache.Delete("dstuuid:" + updateRequest.GetDestinationUUID())
return existing, nil
}

Expand Down Expand Up @@ -909,6 +936,9 @@ func (s *CassandraMetadataService) DeleteDestination(ctx thrift.Context, deleteR
opsDelete,
time.Now(),
marshalRequest(deleteRequest))

s.destinationCache.Delete("dstpath:" + existing.GetPath())
s.destinationCache.Delete("dstuuid:" + existing.GetDestinationUUID())
return nil
}

Expand Down Expand Up @@ -974,6 +1004,8 @@ func (s *CassandraMetadataService) DeleteDestinationUUID(ctx thrift.Context, del
time.Now(),
marshalRequest(deleteRequest))

s.destinationCache.Delete("dstpath:" + existing.GetPath())
s.destinationCache.Delete("dstuuid:" + existing.GetDestinationUUID())
return nil
}

Expand Down Expand Up @@ -1487,6 +1519,13 @@ func (s *CassandraMetadataService) createDlqDestination(cgUUID string, cgName st

func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cgName string) (*shared.ConsumerGroupDescription, error) {
result := getUtilConsumerGroupDescription()

key := "dstuuid_cgname:" + dstUUID + cgName
cached := s.consumerGroupCache.Get(key)
if cached != nil {
return cached.(*shared.ConsumerGroupDescription), nil
}

var zoneConfigsData []map[string]interface{}
query := s.session.Query(sqlGetCGByName, dstUUID, cgName).Consistency(s.lowConsLevel)
if err := query.Scan(
Expand Down Expand Up @@ -1515,7 +1554,13 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg
Message: err.Error(),
}
}

result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
s.consumerGroupCache.Put(key, result)
if result.ConsumerGroupUUID != nil {
s.consumerGroupCache.Put("cguuid:"+*result.ConsumerGroupUUID, result)
}

return result, nil
}

Expand Down Expand Up @@ -1558,6 +1603,12 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
return nil, &shared.BadRequestError{Message: "ConsumerGroupUUID cannot be nil"}
}

key := "cguuid:" + request.GetConsumerGroupUUID()
cached := s.consumerGroupCache.Get(key)
if cached != nil {
return cached.(*shared.ConsumerGroupDescription), nil
}

result := getUtilConsumerGroupDescription()
var zoneConfigsData []map[string]interface{}
query := s.session.Query(sqlGetCGByUUID, request.GetConsumerGroupUUID()).Consistency(s.lowConsLevel)
Expand Down Expand Up @@ -1587,8 +1638,12 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
Message: err.Error(),
}
}
result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)

result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
s.consumerGroupCache.Put(key, result)
if result.DestinationUUID != nil && result.ConsumerGroupName != nil {
s.consumerGroupCache.Put("dstuuid_cgname:"+*result.DestinationUUID+*result.ConsumerGroupName, result)
}
return result, nil
}

Expand Down Expand Up @@ -1743,6 +1798,9 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete("dstuuid_cgname:" + newCG.GetDestinationUUID() + newCG.GetConsumerGroupName())
s.consumerGroupCache.Delete("cguuid:" + newCG.GetConsumerGroupUUID())

return newCG, nil
}

Expand Down Expand Up @@ -1871,6 +1929,10 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete("dstuuid_cgname:" + existingCG.GetDestinationUUID() + existingCG.GetConsumerGroupName())
s.consumerGroupCache.Delete("cguuid:" + existingCG.GetConsumerGroupUUID())
s.destinationCache.Delete("dstuuid:" + dlqDstID)

return nil
}

Expand Down Expand Up @@ -1934,6 +1996,8 @@ func (s *CassandraMetadataService) DeleteConsumerGroupUUID(ctx thrift.Context, r
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete("dstuuid_cgname:" + existing.GetDestinationUUID() + existing.GetConsumerGroupName())
s.consumerGroupCache.Delete("cguuid:" + existing.GetConsumerGroupUUID())
return nil
}

Expand Down
56 changes: 33 additions & 23 deletions test/integration/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ import (

type (
testBase struct {
Frontends map[string]*frontendhost.Frontend
InputHosts map[string]*inputhost.InputHost
OutputHosts map[string]*outputhost.OutputHost
StoreHosts map[string]*storehost.StoreHost
Controllers map[string]*controllerhost.Mcp
mClient *metadata.CassandraMetadataService
UUIDResolver common.UUIDResolver
keyspace string
storageBaseDir string
Frontends map[string]*frontendhost.Frontend
InputHosts map[string]*inputhost.InputHost
OutputHosts map[string]*outputhost.OutputHost
StoreHosts map[string]*storehost.StoreHost
Controllers map[string]*controllerhost.Mcp
mClient *metadata.CassandraMetadataService
UUIDResolver common.UUIDResolver
keyspace string
storageBaseDir string
auth configure.Authentication

*require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
suite.Suite
}
Expand Down Expand Up @@ -159,23 +161,17 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) {
tb.keyspace = "integration_test"
tb.Assertions = require.New(tb.T())

auth := configure.Authentication{
tb.auth = configure.Authentication{
Enabled: true,
Username: "cassandra",
Password: "cassandra",
}

// create the keyspace first
err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, auth)
err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, tb.auth)
tb.NoError(err)

tb.mClient, _ = metadata.NewCassandraMetadataService(&configure.MetadataConfig{
CassandraHosts: "127.0.0.1",
Port: 9042,
Keyspace: tb.keyspace,
Consistency: "One",
Authentication: auth,
}, nil)
tb.mClient = tb.GetNewMetadataClient()
tb.NotNil(tb.mClient)

// Drop the keyspace, if it exists. This preserves the keyspace for inspection if the test fails, and simplifies cleanup
Expand All @@ -198,6 +194,18 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) {
cassConfig.SetRefreshInterval(10 * time.Millisecond)
}

func (tb *testBase) GetNewMetadataClient() *metadata.CassandraMetadataService {
s, _ := metadata.NewCassandraMetadataService(&configure.MetadataConfig{
CassandraHosts: "127.0.0.1",
Port: 9042,
Keyspace: tb.keyspace,
Consistency: "One",
Authentication: tb.auth,
}, nil)

return s
}

func (tb *testBase) TearDownSuite() {
}

Expand All @@ -208,6 +216,8 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
tb.storageBaseDir, err = ioutil.TempDir("", "cherami_integration_test_")
tb.NoError(err)

tb.mClient = tb.GetNewMetadataClient()
tb.NotNil(tb.mClient)
tb.UUIDResolver = common.NewUUIDResolver(tb.mClient)
hwInfoReader := common.NewHostHardwareInfoReader(tb.mClient)

Expand All @@ -232,7 +242,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.StoreServiceName)
sCommon := common.NewService(common.StoreServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("store ringHosts: %v", cfg.GetRingHosts())
sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.mClient, storehostOpts)
sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.GetNewMetadataClient(), storehostOpts)
sh.Start(tc)

// start websocket server
Expand All @@ -256,7 +266,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.InputServiceName)
sCommon := common.NewService(common.InputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("input ringHosts: %v", cfg.GetRingHosts())
ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.mClient, nil)
ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.GetNewMetadataClient(), nil)
ih.Start(tc)
// start websocket server
common.WSStart(cfg.GetListenAddress().String(), cfg.GetWebsocketPort(), ih)
Expand All @@ -272,7 +282,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {

sCommon := common.NewService(common.FrontendServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("front ringHosts: %v", cfg.GetRingHosts())
fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.mClient, cfgMap[common.FrontendServiceName][i])
fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.GetNewMetadataClient(), cfgMap[common.FrontendServiceName][i])
fh.Start(tc)
tb.Frontends[hostID] = fh
frontendForOut = fh
Expand All @@ -288,7 +298,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
oh, tc := outputhost.NewOutputHost(
common.OutputServiceName,
sCommon,
tb.mClient,
tb.GetNewMetadataClient(),
frontendForOut,
nil,
cfgMap[common.OutputServiceName][i].GetKafkaConfig(),
Expand All @@ -308,7 +318,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
reporter := common.NewTestMetricsReporter()
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.ControllerServiceName)
sVice := common.NewService(serviceName, uuid.New(), cfg.ServiceConfig[serviceName], tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient, common.NewDummyZoneFailoverManager())
ch, tc := controllerhost.NewController(cfg, sVice, tb.GetNewMetadataClient(), common.NewDummyZoneFailoverManager())
ch.Start(tc)
tb.Controllers[hostID] = ch
}
Expand Down

0 comments on commit a40cd68

Please sign in to comment.