From c72ca9dde992cb45ad17b859852c140d24a4e1f8 Mon Sep 17 00:00:00 2001 From: Arnaud Rebillout Date: Tue, 10 Oct 2023 13:16:48 +0700 Subject: [PATCH 1/7] Add missing error checking in scan.go --- scan/scan.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scan/scan.go b/scan/scan.go index 06fb2c05..621a6dfd 100644 --- a/scan/scan.go +++ b/scan/scan.go @@ -523,6 +523,9 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err // Do a diff between the sets to get the removed files toremove, err := redis.Values(conn.Do("SDIFF", "FILES", "FILES_TMP")) + if err != nil { + return err + } // Create/Update the files' hash keys with the fresh infos conn.Send("MULTI") From f5625c4a45283e036257719e726f97249e7e4d88 Mon Sep 17 00:00:00 2001 From: Arnaud Rebillout Date: Mon, 9 Oct 2023 16:50:01 +0700 Subject: [PATCH 2/7] Split big multi transactions - part 1: RemoveMirror() --- database/redis.go | 1 + rpc/rpc.go | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/database/redis.go b/database/redis.go index dd7e3429..f7b27365 100644 --- a/database/redis.go +++ b/database/redis.go @@ -20,6 +20,7 @@ import ( const ( redisConnectionTimeout = 200 * time.Millisecond redisReadWriteTimeout = 300 * time.Second + RedisMultiMaxSize = 5000 ) var ( diff --git a/rpc/rpc.go b/rpc/rpc.go index d31de05c..f9f4288c 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -460,12 +460,26 @@ func (c *CLI) RemoveMirror(ctx context.Context, in *MirrorIDRequest) (*empty.Emp conn.Send("MULTI") // Remove each FILEINFO / FILEMIRRORS - for _, file := range files { + for count, file := range files { conn.Send("DEL", fmt.Sprintf("FILEINFO_%d_%s", in.ID, file)) conn.Send("SREM", fmt.Sprintf("FILEMIRRORS_%s", file), in.ID) conn.Send("PUBLISH", database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", in.ID, file)) + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err = conn.Do("EXEC") + if err != nil { + return nil, errors.Wrap(err, "operation failed") + } + conn.Send("MULTI") + } } + _, err = conn.Do("EXEC") + if err != nil { + return nil, errors.Wrap(err, "operation failed") + } + + conn.Send("MULTI") + // Remove all other keys conn.Send("DEL", fmt.Sprintf("MIRROR_%d", in.ID), From d67d0b434a757a3d0372af3ee49fa5283ec87fc9 Mon Sep 17 00:00:00 2001 From: Arnaud Rebillout Date: Mon, 9 Oct 2023 16:55:21 +0700 Subject: [PATCH 3/7] Split big multi transactions - part 2: ScanSource() Remove the variable count, instead we use this variable only to iterate. Hence in the final log, we replace 'count' with 'len(sourceFiles)'. With this commit, we break two MULTI transactions, so let's look at those in details: The first MULTI transaction is: - DEL FILES_TMP - loop on files: - SADD FILES_TMP I think there's no problem in breaking this in chunks, as we just iterate over a temporary key. It doesn't matter if the program is interrupted and we leave a partially updated key behind. Then comes a lone SDIFF FILES FILES_TMP command, that gives us the list of files to remove. The second MULTI transaction is: - loop on files: - HMSET FILE_ - publish FILE_UPDATE - loop on removed files: - DEL FILE_ - publish FILE_UPDATE - RENAME FILES_TMP FILES I don't think it's really needed to have all of that in a single MULTI transaction, i *think* it's Ok to break the two loops in chunks. What really matters is that we rename the keys FILES_TMP to FILES in the last step. --- scan/scan.go | 56 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/scan/scan.go b/scan/scan.go index 621a6dfd..9611794a 100644 --- a/scan/scan.go +++ b/scan/scan.go @@ -504,16 +504,24 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err defer lock.Release() - conn.Send("MULTI") - // Remove any left over - conn.Send("DEL", "FILES_TMP") + _, err = conn.Do("DEL", "FILES_TMP") + if err != nil { + return err + } // Add all the files to a temporary key - count := 0 - for _, e := range sourceFiles { + conn.Send("MULTI") + for count, e := range sourceFiles { conn.Send("SADD", "FILES_TMP", e.path) - count++ + + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err := conn.Do("EXEC") + if err != nil { + return err + } + conn.Send("MULTI") + } } _, err = conn.Do("EXEC") @@ -529,7 +537,7 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err // Create/Update the files' hash keys with the fresh infos conn.Send("MULTI") - for _, e := range sourceFiles { + for count, e := range sourceFiles { conn.Send("HMSET", fmt.Sprintf("FILE_%s", e.path), "size", e.size, "modTime", e.modTime, @@ -539,28 +547,52 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err // Publish update database.SendPublish(conn, database.FILE_UPDATE, e.path) + + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err := conn.Do("EXEC") + if err != nil { + return err + } + conn.Send("MULTI") + } + } + + _, err = conn.Do("EXEC") + if err != nil { + return err } // Remove old keys if len(toremove) > 0 { - for _, e := range toremove { + conn.Send("MULTI") + for count, e := range toremove { conn.Send("DEL", fmt.Sprintf("FILE_%s", e)) // Publish update database.SendPublish(conn, database.FILE_UPDATE, fmt.Sprintf("%s", e)) + + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err = conn.Do("EXEC") + if err != nil { + return err + } + conn.Send("MULTI") + } + } + _, err = conn.Do("EXEC") + if err != nil { + return err } } // Finally rename the temporary sets containing the list // of files to the production key - conn.Send("RENAME", "FILES_TMP", "FILES") - - _, err = conn.Do("EXEC") + _, err = conn.Do("RENAME", "FILES_TMP", "FILES") if err != nil { return err } - log.Infof("[source] Scanned %d files", count) + log.Infof("[source] Scanned %d files", len(sourceFiles)) return nil } From cf806da0684188365f2a13fd6ee48b7260846e0b Mon Sep 17 00:00:00 2001 From: Arnaud Rebillout Date: Tue, 10 Oct 2023 14:38:06 +0700 Subject: [PATCH 4/7] Split big multi transactions - part 3a: Scan() (rework internals) Rework how the files are committed to the db. Before: we'd create a MULTI, the scan. The scan function iterates over the scan results and call ScannerAddFile(), which would send commands to Redis. In case of failure, we'd discard the MULTI transaction, remove the temporary key, and bail out. In case of success, we'd finally call ScannerCommit() which was just about calling EXEC to execute the MULTI transaction. With this commit: we now keep an internal slice of filedata. Calling ScannerAddFile() just adds a filedata to the slice. In case of failure, it's easier, we can just return. In case of success, it's now the ScannerCommit() function that does the bulk of the job: sent a MULTI command, then iterate on the files to enqueue all the commands, and finally EXEC. This change of behaviour is needed for what comes next: breaking the MULTI transaction in chunks. --- scan/scan.go | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/scan/scan.go b/scan/scan.go index 9611794a..d837d0e5 100644 --- a/scan/scan.go +++ b/scan/scan.go @@ -54,6 +54,7 @@ type scan struct { conn redis.Conn mirrorid int filesTmpKey string + files []filedata count int64 } @@ -82,6 +83,7 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, mirrorid: id, conn: conn, cache: c, + files: make([]filedata, 0, 1000), } var scanner Scanner @@ -128,28 +130,24 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, } }(&err) - conn.Send("MULTI") - filesKey := fmt.Sprintf("MIRRORFILES_%d", id) s.filesTmpKey = fmt.Sprintf("MIRRORFILESTMP_%d", id) // Remove any left over - conn.Send("DEL", s.filesTmpKey) + _, err = conn.Do("DEL", s.filesTmpKey) + if err != nil { + return nil, err + } + // Scan the mirror var precision core.Precision precision, err = scanner.Scan(url, name, conn, stop) if err != nil { - // Discard MULTI - s.ScannerDiscard() - - // Remove the temporary key - conn.Do("DEL", s.filesTmpKey) - log.Errorf("[%s] %s", name, err.Error()) return nil, err } - // Exec multi + // Commit changes s.ScannerCommit() // Get the list of files no more present on this mirror @@ -224,28 +222,30 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, func (s *scan) ScannerAddFile(f filedata) { s.count++ + s.files = append(s.files, f) +} - // Add all the files to a temporary key - s.conn.Send("SADD", s.filesTmpKey, f.path) +func (s *scan) ScannerCommit() error { + s.conn.Send("MULTI") - // Mark the file as being supported by this mirror - rk := fmt.Sprintf("FILEMIRRORS_%s", f.path) - s.conn.Send("SADD", rk, s.mirrorid) + for _, f := range s.files { + // Add all the files to a temporary key + s.conn.Send("SADD", s.filesTmpKey, f.path) - // Save the size of the current file found on this mirror - ik := fmt.Sprintf("FILEINFO_%d_%s", s.mirrorid, f.path) - s.conn.Send("HMSET", ik, "size", f.size, "modTime", f.modTime) + // Mark the file as being supported by this mirror + rk := fmt.Sprintf("FILEMIRRORS_%s", f.path) + s.conn.Send("SADD", rk, s.mirrorid) - // Publish update - database.SendPublish(s.conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", s.mirrorid, f.path)) -} + // Save the size of the current file found on this mirror + ik := fmt.Sprintf("FILEINFO_%d_%s", s.mirrorid, f.path) + s.conn.Send("HMSET", ik, "size", f.size, "modTime", f.modTime) -func (s *scan) ScannerDiscard() { - s.conn.Do("DISCARD") -} + // Publish update + database.SendPublish(s.conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", s.mirrorid, f.path)) + } -func (s *scan) ScannerCommit() error { _, err := s.conn.Do("EXEC") + return err } From 7d0fc2822978f2b8977b75caf09b74e875122971 Mon Sep 17 00:00:00 2001 From: Arnaud Rebillout Date: Tue, 10 Oct 2023 14:48:58 +0700 Subject: [PATCH 5/7] Split big multi transactions - part 3b: Scan() (drop s.count) We don't need to maintain a counter, as we keep a slice with the files returned by the scan, so len() does the job. --- scan/scan.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/scan/scan.go b/scan/scan.go index d837d0e5..24d5cf5a 100644 --- a/scan/scan.go +++ b/scan/scan.go @@ -55,7 +55,6 @@ type scan struct { mirrorid int filesTmpKey string files []filedata - count int64 } type ScanResult struct { @@ -176,7 +175,7 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, // Finally rename the temporary sets containing the list // of files for this mirror to the production key - if s.count > 0 { + if len(s.files) > 0 { _, err = conn.Do("RENAME", s.filesTmpKey, filesKey) if err != nil { return nil, err @@ -200,11 +199,11 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, log.Warningf("Unable to check timezone shifts: %s", err) } - log.Infof("[%s] Indexed %d files (%d known), %d removed", name, s.count, common, len(toremove)) + log.Infof("[%s] Indexed %d files (%d known), %d removed", name, len(s.files), common, len(toremove)) res := &ScanResult{ MirrorID: id, MirrorName: name, - FilesIndexed: s.count, + FilesIndexed: int64(len(s.files)), KnownIndexed: common, Removed: int64(len(toremove)), TZOffsetMs: tzoffset, @@ -221,7 +220,6 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, } func (s *scan) ScannerAddFile(f filedata) { - s.count++ s.files = append(s.files, f) } From e2866327bf1979b7950d022f12c6346bc7d2678e Mon Sep 17 00:00:00 2001 From: Arnaud Rebillout Date: Tue, 10 Oct 2023 14:51:07 +0700 Subject: [PATCH 6/7] Split big multi transactions - part 3c: Scan() (split multi) With this commit we split two big multi transactions in chunks. Let's have a look at those in details. One was about removing the files to remove. I don't think it really matters if it's all done at once, or in several transaction. The other in ScannerCommit() is about committing all the files that were returned by the scan. Once again, I have the impression that it doesn't really matter if it's done all at once, or if the transaction is split in chunks. --- scan/scan.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/scan/scan.go b/scan/scan.go index 24d5cf5a..97200fd6 100644 --- a/scan/scan.go +++ b/scan/scan.go @@ -159,13 +159,21 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, // Remove this mirror from the given file SET if len(toremove) > 0 { conn.Send("MULTI") - for _, e := range toremove { + for count, e := range toremove { log.Debugf("[%s] Removing %s from mirror", name, e) conn.Send("SREM", fmt.Sprintf("FILEMIRRORS_%s", e), id) conn.Send("DEL", fmt.Sprintf("FILEINFO_%d_%s", id, e)) + // Publish update database.SendPublish(conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", id, e)) + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err = conn.Do("EXEC") + if err != nil { + return nil, err + } + conn.Send("MULTI") + } } _, err = conn.Do("EXEC") if err != nil { @@ -226,7 +234,7 @@ func (s *scan) ScannerAddFile(f filedata) { func (s *scan) ScannerCommit() error { s.conn.Send("MULTI") - for _, f := range s.files { + for count, f := range s.files { // Add all the files to a temporary key s.conn.Send("SADD", s.filesTmpKey, f.path) @@ -240,6 +248,15 @@ func (s *scan) ScannerCommit() error { // Publish update database.SendPublish(s.conn, database.MIRROR_FILE_UPDATE, fmt.Sprintf("%d %s", s.mirrorid, f.path)) + + // Execute the transaction if enough files were added + if count > 0 && count % database.RedisMultiMaxSize == 0 { + _, err := s.conn.Do("EXEC") + if err != nil { + return err + } + s.conn.Send("MULTI") + } } _, err := s.conn.Do("EXEC") From 7c1e44e11a6a3f8d005b334a0d85cf5b3c1c86f6 Mon Sep 17 00:00:00 2001 From: Arnaud Rebillout Date: Tue, 10 Oct 2023 13:44:15 +0700 Subject: [PATCH 7/7] Minor improvements in logs Looking at the logs when the source is scan: scan.go:494 2023/10/10 06:18:58.633 UTC [source] Scanning the filesystem... scan.go:512 2023/10/10 06:19:22.079 UTC [source] Indexing the files... scan.go:624 2023/10/10 06:19:27.745 UTC [source] Scanned 544001 files And the logs when a mirror is scanned: rsync.go:89 2023/10/10 06:13:23.634 UTC [ftp.jaist.ac.jp] Requesting file list via rsync... trace.go:129 2023/10/10 06:13:23.979 UTC [ftp.jaist.ac.jp] trace last sync: 2023-10-10 00:00:01 +0000 UTC scan.go:221 2023/10/10 06:18:49.781 UTC [ftp.jaist.ac.jp] Indexed 544001 files (544000 known), 0 removed This commit brings two minor improvements: * log the number of files that were removed for the source, similar to how it's done with mirrors. * add a log "Indexing the files" after the mirror scan returns, similar to how it's done for the source. --- scan/scan.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scan/scan.go b/scan/scan.go index 97200fd6..f2e992df 100644 --- a/scan/scan.go +++ b/scan/scan.go @@ -146,6 +146,8 @@ func Scan(typ core.ScannerType, r *database.Redis, c *mirrors.Cache, url string, return nil, err } + log.Infof("[%s] Indexing the files...", name) + // Commit changes s.ScannerCommit() @@ -607,7 +609,7 @@ func ScanSource(r *database.Redis, forceRehash bool, stop <-chan struct{}) (err return err } - log.Infof("[source] Scanned %d files", len(sourceFiles)) + log.Infof("[source] Scanned %d files, %d removed", len(sourceFiles), len(toremove)) return nil }