Skip to content

Commit

Permalink
historyarchive: Cache bucket files from history archives on disk. (st…
Browse files Browse the repository at this point in the history
…ellar#5171)

* go mod tidy
* Add double-close protection
* Add request tracking when cache invokes upstream download
* Add cache hit tracking
* Move stat tracking to a separate file
* Modify test to track stats+integrity after caching
* Stop double-closing identical XDR stream readers
  • Loading branch information
Shaptic authored Jan 19, 2024
1 parent 477db6f commit 7e6d25f
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 78 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru v1.0.2
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
106 changes: 50 additions & 56 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/url"
"path"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -52,6 +50,8 @@ type ConnectOptions struct {
CheckpointFrequency uint32
// UserAgent is the value of `User-Agent` header. Applicable only for HTTP client.
UserAgent string
// CacheConfig controls how/if bucket files are cached on the disk.
CacheConfig CacheOptions
}

type Ledger struct {
Expand All @@ -60,51 +60,6 @@ type Ledger struct {
TransactionResult xdr.TransactionHistoryResultEntry
}

// golang will auto wrap them back to 0 if they overflow after addition.
type archiveStats struct {
requests atomic.Uint32
fileDownloads atomic.Uint32
fileUploads atomic.Uint32
backendName string
}

type ArchiveStats interface {
GetRequests() uint32
GetDownloads() uint32
GetUploads() uint32
GetBackendName() string
}

func (as *archiveStats) incrementDownloads() {
as.fileDownloads.Add(1)
as.incrementRequests()
}

func (as *archiveStats) incrementUploads() {
as.fileUploads.Add(1)
as.incrementRequests()
}

func (as *archiveStats) incrementRequests() {
as.requests.Add(1)
}

func (as *archiveStats) GetRequests() uint32 {
return as.requests.Load()
}

func (as *archiveStats) GetDownloads() uint32 {
return as.fileDownloads.Load()
}

func (as *archiveStats) GetUploads() uint32 {
return as.fileUploads.Load()
}

func (as *archiveStats) GetBackendName() string {
return as.backendName
}

type ArchiveBackend interface {
Exists(path string) (bool, error)
Size(path string) (int64, error)
Expand Down Expand Up @@ -162,6 +117,7 @@ type Archive struct {
checkpointManager CheckpointManager

backend ArchiveBackend
cache *ArchiveBucketCache
stats archiveStats
}

Expand Down Expand Up @@ -216,13 +172,11 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command
return err
}
a.stats.incrementUploads()
return a.backend.PutFile(path,
ioutil.NopCloser(bytes.NewReader(buf)))
return a.backend.PutFile(path, io.NopCloser(bytes.NewReader(buf)))
}

func (a *Archive) BucketExists(bucket Hash) (bool, error) {
a.stats.incrementRequests()
return a.backend.Exists(BucketPath(bucket))
return a.cachedExists(BucketPath(bucket))
}

func (a *Archive) BucketSize(bucket Hash) (int64, error) {
Expand Down Expand Up @@ -395,8 +349,8 @@ func (a *Archive) ListCategoryCheckpoints(cat string, pth string) (chan uint32,
ext := categoryExt(cat)
rx := regexp.MustCompile(cat + hexPrefixPat + cat +
"-([0-9a-f]{8})\\." + regexp.QuoteMeta(ext) + "$")
sch, errs := a.backend.ListFiles(path.Join(cat, pth))
a.stats.incrementRequests()
sch, errs := a.backend.ListFiles(path.Join(cat, pth))
ch := make(chan uint32)
errs = makeErrorPump(errs)

Expand Down Expand Up @@ -433,14 +387,42 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
if !strings.HasSuffix(pth, ".xdr.gz") {
return nil, errors.New("File has non-.xdr.gz suffix: " + pth)
}
rdr, err := a.backend.GetFile(pth)
a.stats.incrementDownloads()
rdr, err := a.cachedGet(pth)
if err != nil {
return nil, err
}
return NewXdrGzStream(rdr)
}

func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) {
if a.cache != nil {
rdr, foundInCache, err := a.cache.GetFile(pth, a.backend)
if !foundInCache {
a.stats.incrementDownloads()
} else {
a.stats.incrementCacheHits()
}
if err == nil {
return rdr, nil
}

// If there's an error, retry with the uncached backend.
a.cache.Evict(pth)
}

a.stats.incrementDownloads()
return a.backend.GetFile(pth)
}

func (a *Archive) cachedExists(pth string) (bool, error) {
if a.cache != nil && a.cache.Exists(pth) {
return true, nil
}

a.stats.incrementRequests()
return a.backend.Exists(pth)
}

func Connect(u string, opts ConnectOptions) (*Archive, error) {
arch := Archive{
networkPassphrase: opts.NetworkPassphrase,
Expand Down Expand Up @@ -490,9 +472,21 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) {
err = errors.New("unknown URL scheme: '" + parsed.Scheme + "'")
}

arch.stats = archiveStats{backendName: parsed.String()}
if err != nil {
return &arch, err
}

return &arch, err
if opts.CacheConfig.Cache {
cache, innerErr := MakeArchiveBucketCache(opts.CacheConfig)
if innerErr != nil {
return &arch, innerErr
}

arch.cache = cache
}

arch.stats = archiveStats{backendName: parsed.String()}
return &arch, nil
}

func MustConnect(u string, opts ConnectOptions) *Archive {
Expand Down
202 changes: 202 additions & 0 deletions historyarchive/archive_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package historyarchive

import (
"io"
"os"
"path"

lru "github.com/hashicorp/golang-lru"
log "github.com/sirupsen/logrus"
)

type CacheOptions struct {
Cache bool

Path string
MaxFiles uint
}

type ArchiveBucketCache struct {
path string
lru *lru.Cache
log *log.Entry
}

// MakeArchiveBucketCache creates a cache on the disk at the given path that
// acts as an LRU cache, mimicking a particular upstream.
func MakeArchiveBucketCache(opts CacheOptions) (*ArchiveBucketCache, error) {
log_ := log.
WithField("subservice", "fs-cache").
WithField("path", opts.Path).
WithField("cap", opts.MaxFiles)

if _, err := os.Stat(opts.Path); err == nil || os.IsExist(err) {
log_.Warnf("Cache directory already exists, removing")
os.RemoveAll(opts.Path)
}

backend := &ArchiveBucketCache{
path: opts.Path,
log: log_,
}

cache, err := lru.NewWithEvict(int(opts.MaxFiles), backend.onEviction)
if err != nil {
return &ArchiveBucketCache{}, err
}
backend.lru = cache

log_.Info("Bucket cache initialized")
return backend, nil
}

// GetFile retrieves the file contents from the local cache if present.
// Otherwise, it returns the same result as the upstream, adding that result
// into the local cache if possible. It returns a 3-tuple of a reader (which may
// be nil on an error), an indication of whether or not it was *found* in the
// cache, and any error.
func (abc *ArchiveBucketCache) GetFile(
filepath string,
upstream ArchiveBackend,
) (io.ReadCloser, bool, error) {
L := abc.log.WithField("key", filepath)
localPath := path.Join(abc.path, filepath)

// If the lockfile exists, we should defer to the remote source but *not*
// update the cache, as it means there's an in-progress sync of the same
// file.
_, statErr := os.Stat(NameLockfile(localPath))
if statErr == nil || os.IsExist(statErr) {
L.Info("Incomplete file in on-disk cache: deferring")
reader, err := upstream.GetFile(filepath)
return reader, false, err
} else if _, ok := abc.lru.Get(localPath); !ok {
L.Info("File does not exist in the cache: downloading")

// Since it's not on-disk, pull it from the remote backend, shove it
// into the cache, and write it to disk.
remote, err := upstream.GetFile(filepath)
if err != nil {
return remote, false, err
}

local, err := abc.createLocal(filepath)
if err != nil {
// If there's some local FS error, we can still continue with the
// remote version, so just log it and continue.
L.WithError(err).Warn("Creating cache file failed")
return remote, false, nil
}

return teeReadCloser(remote, local, func() error {
L.Debug("Download complete: removing lockfile")
return os.Remove(NameLockfile(localPath))
}), false, nil
}

L.Info("Found file in cache")
// The cache claims it exists, so just give it a read and send it.
local, err := os.Open(localPath)
if err != nil {
// Uh-oh, the cache and the disk are not in sync somehow? Let's evict
// this value and try again (recurse) w/ the remote version.
L.WithError(err).Warn("Opening cached file failed")
abc.lru.Remove(localPath)
return abc.GetFile(filepath, upstream)
}

return local, true, nil
}

func (abc *ArchiveBucketCache) Exists(filepath string) bool {
return abc.lru.Contains(path.Join(abc.path, filepath))
}

// Close purges the cache and cleans up the filesystem.
func (abc *ArchiveBucketCache) Close() error {
abc.lru.Purge()
return os.RemoveAll(abc.path)
}

// Evict removes a file from the cache and the filesystem.
func (abc *ArchiveBucketCache) Evict(filepath string) {
log.WithField("key", filepath).Info("Evicting file from the disk")
abc.lru.Remove(path.Join(abc.path, filepath))
}

func (abc *ArchiveBucketCache) onEviction(key, value interface{}) {
path := key.(string)
os.Remove(NameLockfile(path)) // just in case
if err := os.Remove(path); err != nil { // best effort removal
abc.log.WithError(err).
WithField("key", path).
Warn("Removal failed after cache eviction")
}
}

func (abc *ArchiveBucketCache) createLocal(filepath string) (*os.File, error) {
localPath := path.Join(abc.path, filepath)
if err := os.MkdirAll(path.Dir(localPath), 0755 /* drwxr-xr-x */); err != nil {
return nil, err
}

local, err := os.Create(localPath) /* mode -rw-rw-rw- */
if err != nil {
return nil, err
}
_, err = os.Create(NameLockfile(localPath))
if err != nil {
return nil, err
}

abc.lru.Add(localPath, struct{}{}) // just use the cache as an array
return local, nil
}

func NameLockfile(file string) string {
return file + ".lock"
}

// The below is a helper interface so that we can use io.TeeReader to write
// data locally immediately as we read it remotely.

type trc struct {
io.Reader
close func() error
closed bool // prevents a double-close
}

func (t trc) Close() error {
if t.closed {
return nil
}

return t.close()
}

func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.ReadCloser {
closer := trc{
Reader: io.TeeReader(r, w),
closed: false,
}
closer.close = func() error {
if closer.closed {
return nil
}

// Always run all closers, but return the first error
err1 := r.Close()
err2 := w.Close()
err3 := onClose()

closer.closed = true
if err1 != nil {
return err1
} else if err2 != nil {
return err2
}
return err3
}

return closer
}
Loading

0 comments on commit 7e6d25f

Please sign in to comment.