From 93c7cb40d74b650403290ea2c62425b864769b07 Mon Sep 17 00:00:00 2001 From: Patrick Pfeiffer Date: Thu, 18 Apr 2024 15:23:57 +0200 Subject: [PATCH] (NOBIDS) compress mempool in cache --- cache/redis_cache.go | 14 ++++++++++++-- cache/tiered_cache.go | 30 ++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 ++ handlers/api.go | 4 ++++ services/services.go | 32 +++++++++++++++++++++++--------- utils/compress.go | 32 ++++++++++++++++++++++++++++++++ 7 files changed, 104 insertions(+), 11 deletions(-) create mode 100644 utils/compress.go diff --git a/cache/redis_cache.go b/cache/redis_cache.go index 319f8f4834..b1592b696d 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -31,17 +31,27 @@ func InitRedisCache(ctx context.Context, redisAddress string) (*RedisCache, erro return r, nil } +func (cache *RedisCache) SetBytes(ctx context.Context, key string, value []byte, expiration time.Duration) error { + return cache.redisRemoteCache.Set(ctx, key, value, expiration).Err() +} + +func (cache *RedisCache) GetBytes(ctx context.Context, key string) ([]byte, error) { + value, err := cache.redisRemoteCache.Get(ctx, key).Bytes() + if err != nil { + return nil, err + } + return value, nil +} + func (cache *RedisCache) SetString(ctx context.Context, key, value string, expiration time.Duration) error { return cache.redisRemoteCache.Set(ctx, key, value, expiration).Err() } func (cache *RedisCache) GetString(ctx context.Context, key string) (string, error) { - value, err := cache.redisRemoteCache.Get(ctx, key).Result() if err != nil { return "", err } - return value, nil } diff --git a/cache/tiered_cache.go b/cache/tiered_cache.go index 531c7bf837..c460e69265 100644 --- a/cache/tiered_cache.go +++ b/cache/tiered_cache.go @@ -20,11 +20,13 @@ type tieredCache struct { type RemoteCache interface { Set(ctx context.Context, key string, value any, expiration time.Duration) error + SetBytes(ctx context.Context, key string, value []byte, expiration time.Duration) error SetString(ctx context.Context, key, value string, expiration time.Duration) error SetUint64(ctx context.Context, key string, value uint64, expiration time.Duration) error SetBool(ctx context.Context, key string, value bool, expiration time.Duration) error Get(ctx context.Context, key string, returnValue any) (any, error) + GetBytes(ctx context.Context, key string) ([]byte, error) GetString(ctx context.Context, key string) (string, error) GetUint64(ctx context.Context, key string) (uint64, error) GetBool(ctx context.Context, key string) (bool, error) @@ -47,6 +49,34 @@ func MustInitTieredCache(redisAddress string) { } } +func (cache *tieredCache) SetBytes(key string, value []byte, expiration time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + cache.localGoCache.Set([]byte(key), value, int(expiration.Seconds())) + return cache.remoteCache.SetBytes(ctx, key, value, expiration) +} + +func (cache *tieredCache) GetBytesWithLocalTimeout(key string, localExpiration time.Duration) ([]byte, error) { + // try to retrieve the key from the local cache + wanted, err := cache.localGoCache.Get([]byte(key)) + if err == nil { + return wanted, nil + } + + // retrieve the key from the remote cache + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + value, err := cache.remoteCache.GetBytes(ctx, key) + if err != nil { + return nil, err + } + + cache.localGoCache.Set([]byte(key), value, int(localExpiration.Seconds())) + return value, nil +} + func (cache *tieredCache) SetString(key, value string, expiration time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() diff --git a/go.mod b/go.mod index c88c4846cd..138aada0cd 100644 --- a/go.mod +++ b/go.mod @@ -167,6 +167,7 @@ require ( github.com/jackc/pgproto3/v2 v2.3.2 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jbenet/goprocess v0.1.4 // indirect + github.com/klauspost/pgzip v1.2.6 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/go.sum b/go.sum index 649de3831e..f041c3ddf3 100644 --- a/go.sum +++ b/go.sum @@ -1079,6 +1079,8 @@ github.com/klauspost/cpuid/v2 v2.1.1/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8t github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU= +github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/handlers/api.go b/handlers/api.go index 5d0d164bfd..3c39bb1eee 100644 --- a/handlers/api.go +++ b/handlers/api.go @@ -2334,6 +2334,10 @@ func ApiValidatorPerformance(w http.ResponseWriter, r *http.Request) { continue } + if len(latestBalances[uint64(validatorIndex)]) == 0 { + logger.WithField("validatorIndex", validatorIndex).Errorf("error latestBalances map has no entry for validator") + continue + } eMap["balance"] = latestBalances[uint64(validatorIndex)][0].Balance eMap["performancetoday"] = currentDayIncome[uint64(validatorIndex)] eMap["performancetotal"] = eMap["performancetotal"].(int64) + currentDayIncome[uint64(validatorIndex)] diff --git a/services/services.go b/services/services.go index 9eb3af9e7f..36176223ac 100644 --- a/services/services.go +++ b/services/services.go @@ -1023,14 +1023,19 @@ func LatestProposedSlot() uint64 { } func LatestMempoolTransactions() *types.RawMempoolResponse { - wanted := &types.RawMempoolResponse{} - cacheKey := fmt.Sprintf("%d:frontend:mempool", utils.Config.Chain.ClConfig.DepositChainID) - if wanted, err := cache.TieredCache.GetWithLocalTimeout(cacheKey, time.Minute, wanted); err == nil { - return wanted.(*types.RawMempoolResponse) - } else { + cacheKey := fmt.Sprintf("%d:frontend:mempoolCompressed", utils.Config.Chain.ClConfig.DepositChainID) + mempoolCompressed, err := cache.TieredCache.GetBytesWithLocalTimeout(cacheKey, time.Minute) + if err != nil { logger.Errorf("error retrieving mempool data from cache: %v", err) + return &types.RawMempoolResponse{} + } + mempool := &types.RawMempoolResponse{} + err = utils.Decompress(mempoolCompressed, mempool) + if err != nil { + logger.Errorf("error decompressing mempool data: %v", err) + return &types.RawMempoolResponse{} } - return &types.RawMempoolResponse{} + return mempool } func LatestBurnData() *types.BurnPageData { @@ -1297,7 +1302,7 @@ func getGasNowData() (*types.GasNowPageData, error) { var raw json.RawMessage err = client.Call(&raw, "eth_getBlockByNumber", "pending", true) if err != nil { - return nil, fmt.Errorf("error retrieving pending block data: %v", err) + return nil, fmt.Errorf("error retrieving pending block data: %.1000w", err) // limit error message to 1000 characters } // var res map[string]interface{} @@ -1518,8 +1523,17 @@ func mempoolUpdater(wg *sync.WaitGroup) { } } - cacheKey := fmt.Sprintf("%d:frontend:mempool", utils.Config.Chain.ClConfig.DepositChainID) - err = cache.TieredCache.Set(cacheKey, mempoolTx, utils.Day) + cacheKey := fmt.Sprintf("%d:frontend:mempoolCompressed", utils.Config.Chain.ClConfig.DepositChainID) + mempoolTxCompressed, err := utils.Compress(mempoolTx) + if err != nil { + logger.Errorf("error compressing mempool data: %v", err) + continue + } + if len(mempoolTxCompressed) > 500_000_000 { + logger.Errorf("error mempool data is bigger than 500_000_000: %v", len(mempoolTxCompressed)) + continue + } + err = cache.TieredCache.SetBytes(cacheKey, mempoolTxCompressed, utils.Day) if err != nil { logger.Errorf("error caching mempool data: %v", err) } diff --git a/utils/compress.go b/utils/compress.go new file mode 100644 index 0000000000..39e31c5d9b --- /dev/null +++ b/utils/compress.go @@ -0,0 +1,32 @@ +package utils + +import ( + "bytes" + "encoding/json" + + "github.com/klauspost/pgzip" +) + +func Compress(d any) ([]byte, error) { + var b bytes.Buffer + zw := pgzip.NewWriter(&b) + err := json.NewEncoder(zw).Encode(d) + if err != nil { + return nil, err + } + zw.Close() + return b.Bytes(), nil +} + +func Decompress(d []byte, dest any) error { + var b bytes.Buffer + zr, err := pgzip.NewReader(&b) + if err != nil { + return err + } + err = json.NewDecoder(zr).Decode(dest) + if err != nil { + return err + } + return nil +}