Skip to content

Commit

Permalink
Add ocs cache warmup strategy for first request from the user (#2117)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored Oct 26, 2021
1 parent 5b66c1b commit a88999e
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 42 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/ocs-first-req-warmup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Add ocs cache warmup strategy for first request from the user

https://github.com/cs3org/reva/pull/2117
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
_ "github.com/cs3org/reva/pkg/ocm/share/manager/loader"
_ "github.com/cs3org/reva/pkg/publicshare/manager/loader"
_ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/share/cache/loader"
_ "github.com/cs3org/reva/pkg/share/manager/loader"
_ "github.com/cs3org/reva/pkg/storage/fs/loader"
_ "github.com/cs3org/reva/pkg/storage/registry/loader"
Expand Down
5 changes: 4 additions & 1 deletion internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1945,7 +1945,10 @@ func (s *svc) getPath(ctx context.Context, ref *provider.Reference, keys ...stri
if ref.ResourceId != nil {
req := &provider.StatRequest{Ref: ref, ArbitraryMetadataKeys: keys}
res, err := s.stat(ctx, req)
if (res != nil && res.Status.Code != rpc.Code_CODE_OK) || err != nil {
if err != nil {
return "", status.NewStatusFromErrType(ctx, "getPath ref="+ref.String(), err)
}
if res != nil && res.Status.Code != rpc.Code_CODE_OK {
return "", res.Status
}

Expand Down
69 changes: 69 additions & 0 deletions internal/http/services/owncloud/ocs/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package ocs

import (
"context"
"net/http"
"net/http/httptest"

"github.com/cs3org/reva/pkg/appctx"
ctxpkg "github.com/cs3org/reva/pkg/ctx"
"google.golang.org/grpc/metadata"
)

func (s *svc) cacheWarmup(w http.ResponseWriter, r *http.Request) {
if s.warmupCacheTracker != nil {
u := ctxpkg.ContextMustGetUser(r.Context())
tkn := ctxpkg.ContextMustGetToken(r.Context())
log := appctx.GetLogger(r.Context())

// We make a copy of the context because the original one comes with its cancel channel,
// so once the initial request is finished, this ctx gets cancelled as well.
// And in most of the cases, the warmup takes a longer amount of time to complete than the original request.
// TODO: Check if we can come up with a better solution, eg, https://stackoverflow.com/a/54132324
ctx := context.Background()
ctx = appctx.WithLogger(ctx, log)
ctx = ctxpkg.ContextSetUser(ctx, u)
ctx = ctxpkg.ContextSetToken(ctx, tkn)
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, tkn)

req, _ := http.NewRequest("GET", "", nil)
req = req.WithContext(ctx)
req.URL = r.URL

id := u.Id.OpaqueId
if _, err := s.warmupCacheTracker.Get(id); err != nil {
p := httptest.NewRecorder()
_ = s.warmupCacheTracker.Set(id, true)

log.Info().Msgf("cache warmup getting created shares for user %s", id)
req.URL.Path = "/v1.php/apps/files_sharing/api/v1/shares"
s.router.ServeHTTP(p, req)

log.Info().Msgf("cache warmup getting received shares for user %s", id)
req.URL.Path = "/v1.php/apps/files_sharing/api/v1/shares"
q := req.URL.Query()
q.Set("shared_with_me", "true")
q.Set("state", "all")
req.URL.RawQuery = q.Encode()
s.router.ServeHTTP(p, req)
}
}
}
5 changes: 5 additions & 0 deletions internal/http/services/owncloud/ocs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
CacheWarmupDrivers map[string]map[string]interface{} `mapstructure:"cache_warmup_drivers"`
ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"`
ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"`
UserIdentifierCacheTTL int `mapstructure:"user_identifier_cache_ttl"`
}

// Init sets sane defaults
Expand Down Expand Up @@ -66,5 +67,9 @@ func (c *Config) Init() {
c.ResourceInfoCacheSize = 1000000
}

if c.UserIdentifierCacheTTL == 0 {
c.UserIdentifierCacheTTL = 60
}

c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc)
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (h *Handler) Init(c *config.Config) {
h.additionalInfoTemplate, _ = template.New("additionalInfo").Parse(c.AdditionalInfoAttribute)

h.userIdentifierCache = ttlcache.NewCache()
_ = h.userIdentifierCache.SetTTL(time.Second * 60)
_ = h.userIdentifierCache.SetTTL(time.Second * time.Duration(c.UserIdentifierCacheTTL))

if h.resourceInfoCacheTTL > 0 {
cwm, err := getCacheWarmupManager(c)
Expand All @@ -111,13 +111,14 @@ func (h *Handler) Init(c *config.Config) {
}

func (h *Handler) startCacheWarmup(c cache.Warmup) {
time.Sleep(2 * time.Second)
infos, err := c.GetResourceInfos()
if err != nil {
return
}
for _, r := range infos {
key := wrapResourceID(r.Id)
_ = h.resourceInfoCache.SetWithExpire(key, r, time.Second*h.resourceInfoCacheTTL)
_ = h.resourceInfoCache.SetWithExpire(key, r, h.resourceInfoCacheTTL)
}
}

Expand Down Expand Up @@ -511,6 +512,7 @@ func (h *Handler) listSharesWithMe(w http.ResponseWriter, r *http.Request) {
// which pending state to list
stateFilter := getStateFilter(r.FormValue("state"))

log := appctx.GetLogger(r.Context())
client, err := pool.GetGatewayServiceClient(h.gatewayAddr)
if err != nil {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "error getting grpc gateway client", err)
Expand Down Expand Up @@ -1021,6 +1023,7 @@ func (h *Handler) getResourceInfo(ctx context.Context, client gateway.GatewayAPI
pinfo = infoIf.(*provider.ResourceInfo)
status = &rpc.Status{Code: rpc.Code_CODE_OK}
} else {
logger.Debug().Msgf("cache miss for resource %+v, statting", key)
statReq := &provider.StatRequest{
Ref: ref,
}
Expand Down
16 changes: 14 additions & 2 deletions internal/http/services/owncloud/ocs/ocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package ocs

import (
"net/http"
"time"

"github.com/ReneKroon/ttlcache/v2"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/config"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/handlers/apps/sharing/sharees"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares"
Expand All @@ -41,8 +43,9 @@ func init() {
}

type svc struct {
c *config.Config
router *chi.Mux
c *config.Config
router *chi.Mux
warmupCacheTracker *ttlcache.Cache
}

func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) {
Expand All @@ -63,6 +66,11 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
return nil, err
}

if conf.CacheWarmupDriver == "first-request" && conf.ResourceInfoCacheTTL > 0 {
s.warmupCacheTracker = ttlcache.NewCache()
_ = s.warmupCacheTracker.SetTTL(time.Second * time.Duration(conf.ResourceInfoCacheTTL))
}

return s, nil
}

Expand Down Expand Up @@ -138,6 +146,10 @@ func (s *svc) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log := appctx.GetLogger(r.Context())
log.Debug().Str("path", r.URL.Path).Msg("ocs routing")

// Warmup the share cache for the user
go s.cacheWarmup(w, r)

// unset raw path, otherwise chi uses it to route and then fails to match percent encoded path segments
r.URL.RawPath = ""
s.router.ServeHTTP(w, r)
Expand Down
82 changes: 45 additions & 37 deletions pkg/share/cache/cbox/cbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package eos
package cbox

import (
"context"
"database/sql"
"fmt"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/auth/scope"
ctxpkg "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/share/cache"
"github.com/cs3org/reva/pkg/share/cache/registry"
"github.com/cs3org/reva/pkg/storage/fs/eos"
"github.com/cs3org/reva/pkg/token/manager/jwt"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"

// Provides mysql drivers
_ "github.com/go-sql-driver/mysql"
Expand All @@ -49,6 +52,7 @@ type config struct {
DbName string `mapstructure:"db_name"`
EOSNamespace string `mapstructure:"namespace"`
GatewaySvc string `mapstructure:"gatewaysvc"`
JWTSecret string `mapstructure:"jwt_secret"`
}

type manager struct {
Expand Down Expand Up @@ -90,52 +94,56 @@ func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) {
}
defer rows.Close()

tokenManager, err := jwt.New(map[string]interface{}{
"secret": m.conf.JWTSecret,
})
if err != nil {
return nil, err
}

u := &userpb.User{
Id: &userpb.UserId{
OpaqueId: "root",
},
UidNumber: 0,
GidNumber: 0,
}
scope, err := scope.AddOwnerScope(nil)
if err != nil {
return nil, err
}

tkn, err := tokenManager.MintToken(context.Background(), u, scope)
if err != nil {
return nil, err
}
ctx := metadata.AppendToOutgoingContext(context.Background(), ctxpkg.TokenHeader, tkn)

client, err := pool.GetGatewayServiceClient(m.conf.GatewaySvc)
if err != nil {
return nil, err
}

infos := []*provider.ResourceInfo{}
for rows.Next() {
var storageID, nodeID string
if err := rows.Scan(&storageID, &nodeID); err != nil {
continue
}

eosOpts := map[string]interface{}{
"namespace": m.conf.EOSNamespace,
"master_url": fmt.Sprintf("root://%s.cern.ch", storageID),
"version_invariant": true,
"gatewaysvc": m.conf.GatewaySvc,
}
eos, err := eos.New(eosOpts)
if err != nil {
return nil, err
}

ctx := ctxpkg.ContextSetUser(context.Background(), &userpb.User{
Id: &userpb.UserId{
OpaqueId: "root",
},
Opaque: &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"uid": {
Decoder: "plain",
Value: []byte("0"),
},
"gid": {
Decoder: "plain",
Value: []byte("0"),
},
},
},
})

inf, err := eos.GetMD(ctx, &provider.Reference{
statReq := provider.StatRequest{Ref: &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: storageID,
OpaqueId: nodeID,
},
}, []string{})
if err != nil {
return nil, err
}}

statRes, err := client.Stat(ctx, &statReq)
if err != nil || statRes.Status.Code != rpc.Code_CODE_OK {
continue
}
infos = append(infos, inf)

infos = append(infos, statRes.Info)
}

if err = rows.Err(); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions pkg/share/cache/loader/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package loader

import (
// Load share cache drivers.
_ "github.com/cs3org/reva/pkg/share/cache/cbox"
// Add your own here
)

0 comments on commit a88999e

Please sign in to comment.