Skip to content

Commit

Permalink
Add ocs cache warmup strategy for first request from the user
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Oct 21, 2021
1 parent 1c2ecf2 commit 8ee808f
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 44 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/ocs-first-req-warmup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Add ocs cache warmup strategy for first request from the user

https://github.com/cs3org/reva/pull/2117
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
_ "github.com/cs3org/reva/pkg/ocm/share/manager/loader"
_ "github.com/cs3org/reva/pkg/publicshare/manager/loader"
_ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/share/cache/loader"
_ "github.com/cs3org/reva/pkg/share/manager/loader"
_ "github.com/cs3org/reva/pkg/storage/fs/loader"
_ "github.com/cs3org/reva/pkg/storage/registry/loader"
Expand Down
52 changes: 52 additions & 0 deletions internal/http/services/owncloud/ocs/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package ocs

import (
"context"
"net/http"
"net/http/httptest"

ctxpkg "github.com/cs3org/reva/pkg/ctx"
"google.golang.org/grpc/metadata"
)

func (s *svc) cacheWarmup(w http.ResponseWriter, r *http.Request) {
if s.warmupCache != nil {
u := ctxpkg.ContextMustGetUser(r.Context())
tkn := ctxpkg.ContextMustGetToken(r.Context())

ctx := context.Background()
ctx = ctxpkg.ContextSetUser(ctx, u)
ctx = ctxpkg.ContextSetToken(ctx, tkn)
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, tkn)
req := r.Clone(ctx)
req.Method = http.MethodGet

id := u.Id.OpaqueId
if _, err := s.warmupCache.Get(id); err != nil {
p := httptest.NewRecorder()
_ = s.warmupCache.Set(id, true)
req.URL.Path = "/apps/files_sharing/api/v1/shares"
go s.router.ServeHTTP(p, req)
req.URL.Path = "/apps/files_sharing/api/v1/shares?shared_with_me=true"
go s.router.ServeHTTP(p, req)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ func (h *Handler) Init(c *config.Config) {
}

func (h *Handler) startCacheWarmup(c cache.Warmup) {
time.Sleep(2 * time.Second)
infos, err := c.GetResourceInfos()
if err != nil {
return
}
for _, r := range infos {
key := wrapResourceID(r.Id)
_ = h.resourceInfoCache.SetWithExpire(key, r, time.Second*h.resourceInfoCacheTTL)
_ = h.resourceInfoCache.SetWithExpire(key, r, h.resourceInfoCacheTTL)
}
}

Expand Down Expand Up @@ -493,11 +494,11 @@ func (h *Handler) ListShares(w http.ResponseWriter, r *http.Request) {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "error mapping share data", err)
}
if listSharedWithMe {
h.listSharesWithMe(w, r)
h.ListSharesWithMe(w, r)
return
}
}
h.listSharesWithOthers(w, r)
h.ListSharesWithOthers(w, r)
}

const (
Expand All @@ -507,7 +508,8 @@ const (
ocsStateRejected = 2
)

func (h *Handler) listSharesWithMe(w http.ResponseWriter, r *http.Request) {
// ListSharesWithMe returns the shares receieved by the user present in the context
func (h *Handler) ListSharesWithMe(w http.ResponseWriter, r *http.Request) {
// which pending state to list
stateFilter := getStateFilter(r.FormValue("state"))

Expand Down Expand Up @@ -704,7 +706,9 @@ func findMatch(shareJailInfos []*provider.ResourceInfo, id *provider.ResourceId)
return nil
}

func (h *Handler) listSharesWithOthers(w http.ResponseWriter, r *http.Request) {
// ListSharesWithOthers returns the public, user, group and federated shares
// created by the user present in the context
func (h *Handler) ListSharesWithOthers(w http.ResponseWriter, r *http.Request) {
shares := make([]*conversions.ShareData, 0)

filters := []*collaboration.Filter{}
Expand Down
16 changes: 14 additions & 2 deletions internal/http/services/owncloud/ocs/ocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package ocs

import (
"net/http"
"time"

"github.com/ReneKroon/ttlcache/v2"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/config"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/handlers/apps/sharing/sharees"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares"
Expand All @@ -41,8 +43,9 @@ func init() {
}

type svc struct {
c *config.Config
router *chi.Mux
c *config.Config
router *chi.Mux
warmupCache *ttlcache.Cache
}

func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) {
Expand All @@ -63,6 +66,11 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
return nil, err
}

if conf.CacheWarmupDriver == "first-request" && conf.ResourceInfoCacheTTL > 0 {
s.warmupCache = ttlcache.NewCache()
_ = s.warmupCache.SetTTL(time.Second * time.Duration(conf.ResourceInfoCacheTTL))
}

return s, nil
}

Expand Down Expand Up @@ -138,6 +146,10 @@ func (s *svc) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log := appctx.GetLogger(r.Context())
log.Debug().Str("path", r.URL.Path).Msg("ocs routing")

// Warmup the share cache for the user
s.cacheWarmup(w, r)

// unset raw path, otherwise chi uses it to route and then fails to match percent encoded path segments
r.URL.RawPath = ""
s.router.ServeHTTP(w, r)
Expand Down
82 changes: 45 additions & 37 deletions pkg/share/cache/cbox/cbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package eos
package cbox

import (
"context"
"database/sql"
"fmt"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/auth/scope"
ctxpkg "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/share/cache"
"github.com/cs3org/reva/pkg/share/cache/registry"
"github.com/cs3org/reva/pkg/storage/fs/eos"
"github.com/cs3org/reva/pkg/token/manager/jwt"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"

// Provides mysql drivers
_ "github.com/go-sql-driver/mysql"
Expand All @@ -49,6 +52,7 @@ type config struct {
DbName string `mapstructure:"db_name"`
EOSNamespace string `mapstructure:"namespace"`
GatewaySvc string `mapstructure:"gatewaysvc"`
JWTSecret string `mapstructure:"jwt_secret"`
}

type manager struct {
Expand Down Expand Up @@ -90,52 +94,56 @@ func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) {
}
defer rows.Close()

tokenManager, err := jwt.New(map[string]interface{}{
"secret": m.conf.JWTSecret,
})
if err != nil {
return nil, err
}

u := &userpb.User{
Id: &userpb.UserId{
OpaqueId: "root",
},
UidNumber: 0,
GidNumber: 0,
}
scope, err := scope.AddOwnerScope(nil)
if err != nil {
return nil, err
}

tkn, err := tokenManager.MintToken(context.Background(), u, scope)
if err != nil {
return nil, err
}
ctx := metadata.AppendToOutgoingContext(context.Background(), ctxpkg.TokenHeader, tkn)

client, err := pool.GetGatewayServiceClient(m.conf.GatewaySvc)
if err != nil {
return nil, err
}

infos := []*provider.ResourceInfo{}
for rows.Next() {
var storageID, nodeID string
if err := rows.Scan(&storageID, &nodeID); err != nil {
continue
}

eosOpts := map[string]interface{}{
"namespace": m.conf.EOSNamespace,
"master_url": fmt.Sprintf("root://%s.cern.ch", storageID),
"version_invariant": true,
"gatewaysvc": m.conf.GatewaySvc,
}
eos, err := eos.New(eosOpts)
if err != nil {
return nil, err
}

ctx := ctxpkg.ContextSetUser(context.Background(), &userpb.User{
Id: &userpb.UserId{
OpaqueId: "root",
},
Opaque: &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"uid": {
Decoder: "plain",
Value: []byte("0"),
},
"gid": {
Decoder: "plain",
Value: []byte("0"),
},
},
},
})

inf, err := eos.GetMD(ctx, &provider.Reference{
statReq := provider.StatRequest{Ref: &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: storageID,
OpaqueId: nodeID,
},
}, []string{})
if err != nil {
return nil, err
}}

statRes, err := client.Stat(ctx, &statReq)
if err != nil || statRes.Status.Code != rpc.Code_CODE_OK {
continue
}
infos = append(infos, inf)

infos = append(infos, statRes.Info)
}

if err = rows.Err(); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions pkg/share/cache/loader/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package loader

import (
// Load share cache drivers.
_ "github.com/cs3org/reva/pkg/share/cache/cbox"
// Add your own here
)
3 changes: 3 additions & 0 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,9 @@ func getResourceType(isDir bool) provider.ResourceType {
}

func (fs *eosfs) extractUIDAndGID(u *userpb.User) (eosclient.Authorization, error) {
if u.Id.OpaqueId == "root" {
return eosclient.Authorization{Role: eosclient.Role{UID: "0", GID: "0"}}, nil
}
if u.UidNumber == 0 {
return eosclient.Authorization{}, errors.New("eosfs: uid missing for user")
}
Expand Down

0 comments on commit 8ee808f

Please sign in to comment.