Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Cache results in ReadDestination and ReadConsumerGroup metadata calls #337

Merged
merged 1 commit into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ cherami-server
cherami-replicator-server
cdb
cherami-store-tool
cmq
124 changes: 108 additions & 16 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/cache"
"github.com/uber/cherami-server/common/configure"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
Expand Down Expand Up @@ -177,6 +178,10 @@ const cassandraProtoVersion = 4
const deleteExtentTTLSeconds = int64(time.Hour*24) / int64(time.Second)
const defaultDeleteTTLSeconds = int64(time.Hour*24*30) / int64(time.Second)

const destinationCacheSize = 1048576
const consumerGroupCacheSize = 1048576
const cacheTTL = time.Second

// CassandraMetadataService Implements TChanMetadataServiceClient interface
// TODO: Convert all errors to the ones defined in the thrift API.
type CassandraMetadataService struct {
Expand All @@ -186,6 +191,9 @@ type CassandraMetadataService struct {
highConsLevel gocql.Consistency // Strongest cons level that can be used for this session
clusterName string
log bark.Logger

destinationCache cache.Cache
consumerGroupCache cache.Cache
}

// interface implementation check
Expand Down Expand Up @@ -278,6 +286,9 @@ func NewCassandraMetadataService(cfg configure.CommonMetadataConfig, log bark.Lo
highConsLevel: highCons,
clusterName: clusterName,
log: log.WithField(common.TagModule, `metadata`),

destinationCache: cache.New(destinationCacheSize, &cache.Options{TTL: cacheTTL}),
consumerGroupCache: cache.New(consumerGroupCacheSize, &cache.Options{TTL: cacheTTL}),
}, nil
}

Expand Down Expand Up @@ -649,6 +660,25 @@ func unmarshalDstZoneConfigs(configsData []map[string]interface{}) []*shared.Des
return configs
}

type readDestinationResponse struct {
result *shared.DestinationDescription
err error
}

func (s *CassandraMetadataService) cacheReadDestinationResponse(key string, result *shared.DestinationDescription,
err error) (*shared.DestinationDescription, error) {
item := &readDestinationResponse{
result: result,
err: err,
}

if key != "" {
s.destinationCache.Put(key, item)
}

return result, err
}

// ReadDestination implements the corresponding TChanMetadataServiceClient API
// Either path or destinationUUID can be specified.
// Deleted destinations are returned with DELETED status only when destinationUUID is used.
Expand All @@ -669,6 +699,18 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
}
}

var key string
if getRequest.Path != nil {
key = getRequest.GetPath()
} else {
key = getRequest.GetDestinationUUID()
}

cached := s.destinationCache.Get(key)
if cached != nil {
return cached.(*readDestinationResponse).result, cached.(*readDestinationResponse).err
}

result = getUtilDestinationDescription()
var zoneConfigsData []map[string]interface{}
if getRequest.Path != nil {
Expand Down Expand Up @@ -722,20 +764,20 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
dest = getRequest.GetDestinationUUID()
}

return nil, &shared.EntityNotExistsError{
return s.cacheReadDestinationResponse(key, nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("Destination %s does not exist", dest),
}
})
}

return nil, &shared.InternalServiceError{
return s.cacheReadDestinationResponse(key, nil, &shared.InternalServiceError{
Message: err.Error(),
}
})
}

*result.DLQPurgeBefore = int64(cqlTimestampToUnixNano(*result.DLQPurgeBefore))
*result.DLQMergeBefore = int64(cqlTimestampToUnixNano(*result.DLQMergeBefore))

return result, nil
return s.cacheReadDestinationResponse(key, result, nil)
}

// UpdateDestination implements the corresponding TChanMetadataServiceClient API
Expand Down Expand Up @@ -856,6 +898,9 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR
existing.ZoneConfigs = updateRequest.GetZoneConfigs()
}
existing.IsMultiZone = common.BoolPtr(isMultiZone)

s.destinationCache.Delete(existing.GetPath())
s.destinationCache.Delete(updateRequest.GetDestinationUUID())
return existing, nil
}

Expand Down Expand Up @@ -909,6 +954,9 @@ func (s *CassandraMetadataService) DeleteDestination(ctx thrift.Context, deleteR
opsDelete,
time.Now(),
marshalRequest(deleteRequest))

s.destinationCache.Delete(existing.GetPath())
s.destinationCache.Delete(existing.GetDestinationUUID())
return nil
}

Expand Down Expand Up @@ -974,6 +1022,8 @@ func (s *CassandraMetadataService) DeleteDestinationUUID(ctx thrift.Context, del
time.Now(),
marshalRequest(deleteRequest))

s.destinationCache.Delete(existing.GetPath())
s.destinationCache.Delete(existing.GetDestinationUUID())
return nil
}

Expand Down Expand Up @@ -1473,8 +1523,34 @@ func (s *CassandraMetadataService) createDlqDestination(cgUUID string, cgName st
return dlqDestDesc, err
}

type readConsumerGroupResponse struct {
result *shared.ConsumerGroupDescription
err error
}

func (s *CassandraMetadataService) cacheReadConsumerGroupResponse(key string, result *shared.ConsumerGroupDescription,
err error) (*shared.ConsumerGroupDescription, error) {
item := &readConsumerGroupResponse{
result: result,
err: err,
}

if key != "" {
s.consumerGroupCache.Put(key, item)
}

return result, err
}

func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cgName string) (*shared.ConsumerGroupDescription, error) {
result := getUtilConsumerGroupDescription()

key := dstUUID + cgName
cached := s.consumerGroupCache.Get(key)
if cached != nil {
return cached.(*readConsumerGroupResponse).result, cached.(*readConsumerGroupResponse).err
}

var zoneConfigsData []map[string]interface{}
query := s.session.Query(sqlGetCGByName, dstUUID, cgName).Consistency(s.lowConsLevel)
if err := query.Scan(
Expand All @@ -1494,17 +1570,18 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return nil, &shared.EntityNotExistsError{
return s.cacheReadConsumerGroupResponse(key, nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s of destinationUUID %s does not exist", cgName, dstUUID),
}
})
}

return nil, &shared.InternalServiceError{
return s.cacheReadConsumerGroupResponse(key, nil, &shared.InternalServiceError{
Message: err.Error(),
}
})
}

result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
return result, nil
return s.cacheReadConsumerGroupResponse(key, result, nil)
}

// ReadConsumerGroup returns the ConsumerGroupDescription for the [destinationPath, groupName].
Expand Down Expand Up @@ -1546,6 +1623,12 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
return nil, &shared.BadRequestError{Message: "ConsumerGroupUUID cannot be nil"}
}

key := request.GetConsumerGroupUUID()
cached := s.consumerGroupCache.Get(key)
if cached != nil {
return cached.(*readConsumerGroupResponse).result, cached.(*readConsumerGroupResponse).err
}

result := getUtilConsumerGroupDescription()
var zoneConfigsData []map[string]interface{}
query := s.session.Query(sqlGetCGByUUID, request.GetConsumerGroupUUID()).Consistency(s.lowConsLevel)
Expand All @@ -1566,18 +1649,18 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return nil, &shared.EntityNotExistsError{
return s.cacheReadConsumerGroupResponse(key, nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s does not exist", *request.ConsumerGroupUUID),
}
})
}

return nil, &shared.InternalServiceError{
return s.cacheReadConsumerGroupResponse(key, nil, &shared.InternalServiceError{
Message: err.Error(),
}
})
}
result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)

return result, nil
result.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
return s.cacheReadConsumerGroupResponse(key, result, nil)
}

func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *shared.ConsumerGroupDescription) bool {
Expand Down Expand Up @@ -1731,6 +1814,9 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete(newCG.GetDestinationUUID() + newCG.GetConsumerGroupName())
s.consumerGroupCache.Delete(newCG.GetConsumerGroupUUID())

return newCG, nil
}

Expand Down Expand Up @@ -1859,6 +1945,10 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete(existingCG.GetDestinationUUID() + existingCG.GetConsumerGroupName())
s.consumerGroupCache.Delete(existingCG.GetConsumerGroupUUID())
s.destinationCache.Delete(dlqDstID)

return nil
}

Expand Down Expand Up @@ -1922,6 +2012,8 @@ func (s *CassandraMetadataService) DeleteConsumerGroupUUID(ctx thrift.Context, r
time.Now(),
marshalRequest(request))

s.consumerGroupCache.Delete(existing.GetDestinationUUID() + existing.GetConsumerGroupName())
s.consumerGroupCache.Delete(existing.GetConsumerGroupUUID())
return nil
}

Expand Down
56 changes: 33 additions & 23 deletions test/integration/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ import (

type (
testBase struct {
Frontends map[string]*frontendhost.Frontend
InputHosts map[string]*inputhost.InputHost
OutputHosts map[string]*outputhost.OutputHost
StoreHosts map[string]*storehost.StoreHost
Controllers map[string]*controllerhost.Mcp
mClient *metadata.CassandraMetadataService
UUIDResolver common.UUIDResolver
keyspace string
storageBaseDir string
Frontends map[string]*frontendhost.Frontend
InputHosts map[string]*inputhost.InputHost
OutputHosts map[string]*outputhost.OutputHost
StoreHosts map[string]*storehost.StoreHost
Controllers map[string]*controllerhost.Mcp
mClient *metadata.CassandraMetadataService
UUIDResolver common.UUIDResolver
keyspace string
storageBaseDir string
auth configure.Authentication

*require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
suite.Suite
}
Expand Down Expand Up @@ -159,23 +161,17 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) {
tb.keyspace = "integration_test"
tb.Assertions = require.New(tb.T())

auth := configure.Authentication{
tb.auth = configure.Authentication{
Enabled: true,
Username: "cassandra",
Password: "cassandra",
}

// create the keyspace first
err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, auth)
err := metadata.CreateKeyspaceNoSession("127.0.0.1", 9042, tb.keyspace, 1, true, tb.auth)
tb.NoError(err)

tb.mClient, _ = metadata.NewCassandraMetadataService(&configure.MetadataConfig{
CassandraHosts: "127.0.0.1",
Port: 9042,
Keyspace: tb.keyspace,
Consistency: "One",
Authentication: auth,
}, nil)
tb.mClient = tb.GetNewMetadataClient()
tb.NotNil(tb.mClient)

// Drop the keyspace, if it exists. This preserves the keyspace for inspection if the test fails, and simplifies cleanup
Expand All @@ -198,6 +194,18 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) {
cassConfig.SetRefreshInterval(10 * time.Millisecond)
}

func (tb *testBase) GetNewMetadataClient() *metadata.CassandraMetadataService {
s, _ := metadata.NewCassandraMetadataService(&configure.MetadataConfig{
CassandraHosts: "127.0.0.1",
Port: 9042,
Keyspace: tb.keyspace,
Consistency: "One",
Authentication: tb.auth,
}, nil)

return s
}

func (tb *testBase) TearDownSuite() {
}

Expand All @@ -208,6 +216,8 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
tb.storageBaseDir, err = ioutil.TempDir("", "cherami_integration_test_")
tb.NoError(err)

tb.mClient = tb.GetNewMetadataClient()
tb.NotNil(tb.mClient)
tb.UUIDResolver = common.NewUUIDResolver(tb.mClient)
hwInfoReader := common.NewHostHardwareInfoReader(tb.mClient)

Expand All @@ -232,7 +242,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.StoreServiceName)
sCommon := common.NewService(common.StoreServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("store ringHosts: %v", cfg.GetRingHosts())
sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.mClient, storehostOpts)
sh, tc := storehost.NewStoreHost(common.StoreServiceName, sCommon, tb.GetNewMetadataClient(), storehostOpts)
sh.Start(tc)

// start websocket server
Expand All @@ -256,7 +266,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.InputServiceName)
sCommon := common.NewService(common.InputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("input ringHosts: %v", cfg.GetRingHosts())
ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.mClient, nil)
ih, tc := inputhost.NewInputHost(common.InputServiceName, sCommon, tb.GetNewMetadataClient(), nil)
ih.Start(tc)
// start websocket server
common.WSStart(cfg.GetListenAddress().String(), cfg.GetWebsocketPort(), ih)
Expand All @@ -272,7 +282,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {

sCommon := common.NewService(common.FrontendServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
log.Infof("front ringHosts: %v", cfg.GetRingHosts())
fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.mClient, cfgMap[common.FrontendServiceName][i])
fh, tc := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, tb.GetNewMetadataClient(), cfgMap[common.FrontendServiceName][i])
fh.Start(tc)
tb.Frontends[hostID] = fh
frontendForOut = fh
Expand All @@ -288,7 +298,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
oh, tc := outputhost.NewOutputHost(
common.OutputServiceName,
sCommon,
tb.mClient,
tb.GetNewMetadataClient(),
frontendForOut,
nil,
cfgMap[common.OutputServiceName][i].GetKafkaConfig(),
Expand All @@ -308,7 +318,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
reporter := common.NewTestMetricsReporter()
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.ControllerServiceName)
sVice := common.NewService(serviceName, uuid.New(), cfg.ServiceConfig[serviceName], tb.UUIDResolver, hwInfoReader, reporter, dClient, common.NewBypassAuthManager())
ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient, common.NewDummyZoneFailoverManager())
ch, tc := controllerhost.NewController(cfg, sVice, tb.GetNewMetadataClient(), common.NewDummyZoneFailoverManager())
ch.Start(tc)
tb.Controllers[hostID] = ch
}
Expand Down