diff --git a/database/redis.go b/database/redis.go index dd7e3429..f7b27365 100644 --- a/database/redis.go +++ b/database/redis.go @@ -20,6 +20,7 @@ import ( const ( redisConnectionTimeout = 200 * time.Millisecond redisReadWriteTimeout = 300 * time.Second + RedisMultiMaxSize = 5000 ) var ( diff --git a/rpc/rpc.go b/rpc/rpc.go index d31de05c..f9f4288c 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -460,12 +460,26 @@ func (c *CLI) RemoveMirror(ctx context.Context, in *MirrorIDRequest) (*empty.Emp conn.Send("MULTI") // Remove each FILEINFO / FILEMIRRORS - for _, file := range files { + for count, file := range files { conn.Send("DEL", fmt.Sprintf("FILEINFO_%d_%s", in.ID, file)) conn.Send("SREM", fmt.Sprintf("FILEMIRRORS_%s", file), in.ID) conn.Send("PUBLISH", database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", in.ID, file)) + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err = conn.Do("EXEC") + if err != nil { + return nil, errors.Wrap(err, "operation failed") + } + conn.Send("MULTI") + } } + _, err = conn.Do("EXEC") + if err != nil { + return nil, errors.Wrap(err, "operation failed") + } + + conn.Send("MULTI") + // Remove all other keys conn.Send("DEL", fmt.Sprintf("MIRROR_%d", in.ID), diff --git a/scan/scan.go b/scan/scan.go index 06fb2c05..f2e992df 100644 --- a/scan/scan.go +++ b/scan/scan.go @@ -54,7 +54,7 @@ type scan struct { conn redis.Conn mirrorid int filesTmpKey string - count int64 + files []filedata } type ScanResult struct { @@ -82,6 +82,7 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, mirrorid: id, conn: conn, cache: c, + files: make([]filedata, 0, 1000), } var scanner Scanner @@ -128,28 +129,26 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, } }(&err) - conn.Send("MULTI") - filesKey := fmt.Sprintf("MIRRORFILES_%d", id) s.filesTmpKey = fmt.Sprintf("MIRRORFILESTMP_%d", id) // Remove any left over - conn.Send("DEL", s.filesTmpKey) + _, err = conn.Do("DEL", s.filesTmpKey) + if err != nil { + return nil, err + } + // Scan the mirror var precision core.Precision precision, err = scanner.Scan(url, name, conn, stop) if err != nil { - // Discard MULTI - s.ScannerDiscard() - - // Remove the temporary key - conn.Do("DEL", s.filesTmpKey) - log.Errorf("[%s] %s", name, err.Error()) return nil, err } - // Exec multi + log.Infof("[%s] Indexing the files...", name) + + // Commit changes s.ScannerCommit() // Get the list of files no more present on this mirror @@ -162,13 +161,21 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, // Remove this mirror from the given file SET if len(toremove) > 0 { conn.Send("MULTI") - for _, e := range toremove { + for count, e := range toremove { log.Debugf("[%s] Removing %s from mirror", name, e) conn.Send("SREM", fmt.Sprintf("FILEMIRRORS_%s", e), id) conn.Send("DEL", fmt.Sprintf("FILEINFO_%d_%s", id, e)) + // Publish update database.SendPublish(conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", id, e)) + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err = conn.Do("EXEC") + if err != nil { + return nil, err + } + conn.Send("MULTI") + } } _, err = conn.Do("EXEC") if err != nil { @@ -178,7 +185,7 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, // Finally rename the temporary sets containing the list // of files for this mirror to the production key - if s.count > 0 { + if len(s.files) > 0 { _, err = conn.Do("RENAME", s.filesTmpKey, filesKey) if err != nil { return nil, err @@ -202,11 +209,11 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, log.Warningf("Unable to check timezone shifts: %s", err) } - log.Infof("[%s] Indexed %d files (%d known), %d removed", name, s.count, common, len(toremove)) + log.Infof("[%s] Indexed %d files (%d known), %d removed", name, len(s.files), common, len(toremove)) res := &ScanResult{ MirrorID: id, MirrorName: name, - FilesIndexed: s.count, + FilesIndexed: int64(len(s.files)), KnownIndexed: common, Removed: int64(len(toremove)), TZOffsetMs: tzoffset, @@ -223,29 +230,39 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, } func (s *scan) ScannerAddFile(f filedata) { - s.count++ + s.files = append(s.files, f) +} - // Add all the files to a temporary key - s.conn.Send("SADD", s.filesTmpKey, f.path) +func (s *scan) ScannerCommit() error { + s.conn.Send("MULTI") - // Mark the file as being supported by this mirror - rk := fmt.Sprintf("FILEMIRRORS_%s", f.path) - s.conn.Send("SADD", rk, s.mirrorid) + for count, f := range s.files { + // Add all the files to a temporary key + s.conn.Send("SADD", s.filesTmpKey, f.path) - // Save the size of the current file found on this mirror - ik := fmt.Sprintf("FILEINFO_%d_%s", s.mirrorid, f.path) - s.conn.Send("HMSET", ik, "size", f.size, "modTime", f.modTime) + // Mark the file as being supported by this mirror + rk := fmt.Sprintf("FILEMIRRORS_%s", f.path) + s.conn.Send("SADD", rk, s.mirrorid) - // Publish update - database.SendPublish(s.conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", s.mirrorid, f.path)) -} + // Save the size of the current file found on this mirror + ik := fmt.Sprintf("FILEINFO_%d_%s", s.mirrorid, f.path) + s.conn.Send("HMSET", ik, "size", f.size, "modTime", f.modTime) -func (s *scan) ScannerDiscard() { - s.conn.Do("DISCARD") -} + // Publish update + database.SendPublish(s.conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", s.mirrorid, f.path)) + + // Execute the transaction if enough files were added + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err := s.conn.Do("EXEC") + if err != nil { + return err + } + s.conn.Send("MULTI") + } + } -func (s *scan) ScannerCommit() error { _, err := s.conn.Do("EXEC") + return err } @@ -504,16 +521,24 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err defer lock.Release() - conn.Send("MULTI") - // Remove any left over - conn.Send("DEL", "FILES_TMP") + _, err = conn.Do("DEL", "FILES_TMP") + if err != nil { + return err + } // Add all the files to a temporary key - count := 0 - for _, e := range sourceFiles { + conn.Send("MULTI") + for count, e := range sourceFiles { conn.Send("SADD", "FILES_TMP", e.path) - count++ + + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err := conn.Do("EXEC") + if err != nil { + return err + } + conn.Send("MULTI") + } } _, err = conn.Do("EXEC") @@ -523,10 +548,13 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err // Do a diff between the sets to get the removed files toremove, err := redis.Values(conn.Do("SDIFF", "FILES", "FILES_TMP")) + if err != nil { + return err + } // Create/Update the files' hash keys with the fresh infos conn.Send("MULTI") - for _, e := range sourceFiles { + for count, e := range sourceFiles { conn.Send("HMSET", fmt.Sprintf("FILE_%s", e.path), "size", e.size, "modTime", e.modTime, @@ -536,28 +564,52 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err // Publish update database.SendPublish(conn, database.FILE_UPDATE, e.path) + + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err := conn.Do("EXEC") + if err != nil { + return err + } + conn.Send("MULTI") + } + } + + _, err = conn.Do("EXEC") + if err != nil { + return err } // Remove old keys if len(toremove) > 0 { - for _, e := range toremove { + conn.Send("MULTI") + for count, e := range toremove { conn.Send("DEL", fmt.Sprintf("FILE_%s", e)) // Publish update database.SendPublish(conn, database.FILE_UPDATE, fmt.Sprintf("%s", e)) + + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err = conn.Do("EXEC") + if err != nil { + return err + } + conn.Send("MULTI") + } + } + _, err = conn.Do("EXEC") + if err != nil { + return err } } // Finally rename the temporary sets containing the list // of files to the production key - conn.Send("RENAME", "FILES_TMP", "FILES") - - _, err = conn.Do("EXEC") + _, err = conn.Do("RENAME", "FILES_TMP", "FILES") if err != nil { return err } - log.Infof("[source] Scanned %d files", count) + log.Infof("[source] Scanned %d files, %d removed", len(sourceFiles), len(toremove)) return nil }