Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split MULTI transaction in batches (fix #149) #148

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions database/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
const (
redisConnectionTimeout = 200 * time.Millisecond
redisReadWriteTimeout = 300 * time.Second
RedisMultiMaxSize = 5000
)

var (
Expand Down
16 changes: 15 additions & 1 deletion rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,26 @@ func (c *CLI) RemoveMirror(ctx context.Context, in *MirrorIDRequest) (*empty.Emp
conn.Send("MULTI")

// Remove each FILEINFO / FILEMIRRORS
for _, file := range files {
for count, file := range files {
conn.Send("DEL", fmt.Sprintf("FILEINFO_%d_%s", in.ID, file))
conn.Send("SREM", fmt.Sprintf("FILEMIRRORS_%s", file), in.ID)
conn.Send("PUBLISH", database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", in.ID, file))
if count > 0 && count % database.RedisMultiMaxSize == 0 {
_, err = conn.Do("EXEC")
if err != nil {
return nil, errors.Wrap(err, "operation failed")
}
conn.Send("MULTI")
}
}

_, err = conn.Do("EXEC")
if err != nil {
return nil, errors.Wrap(err, "operation failed")
}

conn.Send("MULTI")

// Remove all other keys
conn.Send("DEL",
fmt.Sprintf("MIRROR_%d", in.ID),
Expand Down
138 changes: 95 additions & 43 deletions scan/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type scan struct {
conn redis.Conn
mirrorid int
filesTmpKey string
count int64
files []filedata
}

type ScanResult struct {
Expand Down Expand Up @@ -82,6 +82,7 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string,
mirrorid: id,
conn: conn,
cache: c,
files: make([]filedata, 0, 1000),
}

var scanner Scanner
Expand Down Expand Up @@ -128,28 +129,26 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string,
}
}(&err)

conn.Send("MULTI")

filesKey := fmt.Sprintf("MIRRORFILES_%d", id)
s.filesTmpKey = fmt.Sprintf("MIRRORFILESTMP_%d", id)

// Remove any left over
conn.Send("DEL", s.filesTmpKey)
_, err = conn.Do("DEL", s.filesTmpKey)
if err != nil {
return nil, err
}

// Scan the mirror
var precision core.Precision
precision, err = scanner.Scan(url, name, conn, stop)
if err != nil {
// Discard MULTI
s.ScannerDiscard()

// Remove the temporary key
conn.Do("DEL", s.filesTmpKey)

log.Errorf("[%s] %s", name, err.Error())
return nil, err
}

// Exec multi
log.Infof("[%s] Indexing the files...", name)

// Commit changes
s.ScannerCommit()

// Get the list of files no more present on this mirror
Expand All @@ -162,13 +161,21 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string,
// Remove this mirror from the given file SET
if len(toremove) > 0 {
conn.Send("MULTI")
for _, e := range toremove {
for count, e := range toremove {
log.Debugf("[%s] Removing %s from mirror", name, e)
conn.Send("SREM", fmt.Sprintf("FILEMIRRORS_%s", e), id)
conn.Send("DEL", fmt.Sprintf("FILEINFO_%d_%s", id, e))

// Publish update
database.SendPublish(conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", id, e))

if count > 0 && count % database.RedisMultiMaxSize == 0 {
_, err = conn.Do("EXEC")
if err != nil {
return nil, err
}
conn.Send("MULTI")
}
}
_, err = conn.Do("EXEC")
if err != nil {
Expand All @@ -178,7 +185,7 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string,

// Finally rename the temporary sets containing the list
// of files for this mirror to the production key
if s.count > 0 {
if len(s.files) > 0 {
_, err = conn.Do("RENAME", s.filesTmpKey, filesKey)
if err != nil {
return nil, err
Expand All @@ -202,11 +209,11 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string,
log.Warningf("Unable to check timezone shifts: %s", err)
}

log.Infof("[%s] Indexed %d files (%d known), %d removed", name, s.count, common, len(toremove))
log.Infof("[%s] Indexed %d files (%d known), %d removed", name, len(s.files), common, len(toremove))
res := &ScanResult{
MirrorID: id,
MirrorName: name,
FilesIndexed: s.count,
FilesIndexed: int64(len(s.files)),
KnownIndexed: common,
Removed: int64(len(toremove)),
TZOffsetMs: tzoffset,
Expand All @@ -223,29 +230,39 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string,
}

func (s *scan) ScannerAddFile(f filedata) {
s.count++
s.files = append(s.files, f)
}

// Add all the files to a temporary key
s.conn.Send("SADD", s.filesTmpKey, f.path)
func (s *scan) ScannerCommit() error {
s.conn.Send("MULTI")

// Mark the file as being supported by this mirror
rk := fmt.Sprintf("FILEMIRRORS_%s", f.path)
s.conn.Send("SADD", rk, s.mirrorid)
for count, f := range s.files {
// Add all the files to a temporary key
s.conn.Send("SADD", s.filesTmpKey, f.path)

// Save the size of the current file found on this mirror
ik := fmt.Sprintf("FILEINFO_%d_%s", s.mirrorid, f.path)
s.conn.Send("HMSET", ik, "size", f.size, "modTime", f.modTime)
// Mark the file as being supported by this mirror
rk := fmt.Sprintf("FILEMIRRORS_%s", f.path)
s.conn.Send("SADD", rk, s.mirrorid)

// Publish update
database.SendPublish(s.conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", s.mirrorid, f.path))
}
// Save the size of the current file found on this mirror
ik := fmt.Sprintf("FILEINFO_%d_%s", s.mirrorid, f.path)
s.conn.Send("HMSET", ik, "size", f.size, "modTime", f.modTime)

func (s *scan) ScannerDiscard() {
s.conn.Do("DISCARD")
}
// Publish update
database.SendPublish(s.conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", s.mirrorid, f.path))

// Execute the transaction if enough files were added
if count > 0 && count % database.RedisMultiMaxSize == 0 {
_, err := s.conn.Do("EXEC")
if err != nil {
return err
}
s.conn.Send("MULTI")
}
}

func (s *scan) ScannerCommit() error {
_, err := s.conn.Do("EXEC")

return err
}

Expand Down Expand Up @@ -504,16 +521,24 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err

defer lock.Release()

conn.Send("MULTI")

// Remove any left over
conn.Send("DEL", "FILES_TMP")
_, err = conn.Do("DEL", "FILES_TMP")
if err != nil {
return err
}

// Add all the files to a temporary key
count := 0
for _, e := range sourceFiles {
conn.Send("MULTI")
for count, e := range sourceFiles {
conn.Send("SADD", "FILES_TMP", e.path)
count++

if count > 0 && count % database.RedisMultiMaxSize == 0 {
_, err := conn.Do("EXEC")
if err != nil {
return err
}
conn.Send("MULTI")
}
}

_, err = conn.Do("EXEC")
Expand All @@ -523,10 +548,13 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err

// Do a diff between the sets to get the removed files
toremove, err := redis.Values(conn.Do("SDIFF", "FILES", "FILES_TMP"))
if err != nil {
return err
}

// Create/Update the files' hash keys with the fresh infos
conn.Send("MULTI")
for _, e := range sourceFiles {
for count, e := range sourceFiles {
conn.Send("HMSET", fmt.Sprintf("FILE_%s", e.path),
"size", e.size,
"modTime", e.modTime,
Expand All @@ -536,28 +564,52 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err

// Publish update
database.SendPublish(conn, database.FILE_UPDATE, e.path)

if count > 0 && count % database.RedisMultiMaxSize == 0 {
_, err := conn.Do("EXEC")
if err != nil {
return err
}
conn.Send("MULTI")
}
}

_, err = conn.Do("EXEC")
if err != nil {
return err
}

// Remove old keys
if len(toremove) > 0 {
for _, e := range toremove {
conn.Send("MULTI")
for count, e := range toremove {
conn.Send("DEL", fmt.Sprintf("FILE_%s", e))

// Publish update
database.SendPublish(conn, database.FILE_UPDATE, fmt.Sprintf("%s", e))

if count > 0 && count % database.RedisMultiMaxSize == 0 {
_, err = conn.Do("EXEC")
if err != nil {
return err
}
conn.Send("MULTI")
}
}
_, err = conn.Do("EXEC")
if err != nil {
return err
}
}

// Finally rename the temporary sets containing the list
// of files to the production key
conn.Send("RENAME", "FILES_TMP", "FILES")

_, err = conn.Do("EXEC")
_, err = conn.Do("RENAME", "FILES_TMP", "FILES")
if err != nil {
return err
}

log.Infof("[source] Scanned %d files", count)
log.Infof("[source] Scanned %d files, %d removed", len(sourceFiles), len(toremove))

return nil
}