From 0c872fda4107189da72d3e3f18206e2570166548 Mon Sep 17 00:00:00 2001 From: Cameron Dunn Date: Fri, 10 Dec 2021 12:19:30 -0800 Subject: [PATCH] Allow GateKeeper to be optional --- src/go/cmd/strelka-frontend/main.go | 57 ++++++++++++++++++----------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/src/go/cmd/strelka-frontend/main.go b/src/go/cmd/strelka-frontend/main.go index fb62db47..7164723d 100644 --- a/src/go/cmd/strelka-frontend/main.go +++ b/src/go/cmd/strelka-frontend/main.go @@ -34,7 +34,7 @@ type gate struct { type server struct { coordinator coord - gatekeeper gate + gatekeeper *gate responses chan<- *strelka.ScanResponse } @@ -107,7 +107,7 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { Time: time.Now().Unix(), } - if req.Gatekeeper { + if req.Gatekeeper && s.gatekeeper != nil { lrange := s.gatekeeper.cli.LRange(stream.Context(), sha, 0, -1).Val() if len(lrange) > 0 { for _, e := range lrange { @@ -140,7 +140,7 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { } if err := s.coordinator.cli.ZAdd( - stream.Context(), + stream.Context(), "tasks", &redis.Z{ Score: float64(deadline.Unix()), @@ -150,8 +150,12 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { return err } - tx := s.gatekeeper.cli.TxPipeline() - tx.Del(stream.Context(), sha) + var tx *redis.Pipeliner + if s.gatekeeper != nil { + pipeliner := s.gatekeeper.cli.TxPipeline() + tx = &pipeliner + (*tx).Del(stream.Context(), sha) + } for { lpop, err := s.coordinator.cli.LPop(stream.Context(), keye).Result() @@ -163,7 +167,9 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { break } - tx.RPush(stream.Context(), sha, lpop) + if tx != nil { + (*tx).RPush(stream.Context(), sha, lpop) + } if err := json.Unmarshal([]byte(lpop), &em); err != nil { return err } @@ -184,9 +190,11 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { } } - tx.Expire(stream.Context(), sha, s.gatekeeper.ttl) - if _, err := tx.Exec(stream.Context()); err != nil { - return err + if tx != nil { + (*tx).Expire(stream.Context(), sha, s.gatekeeper.ttl) + if _, err := (*tx).Exec(stream.Context()); err != nil { + return err + } } return nil @@ -245,14 +253,22 @@ func main() { log.Fatalf("failed to connect to coordinator: %v", err) } - gk := redis.NewClient(&redis.Options{ - Addr: conf.Gatekeeper.Addr, - DB: conf.Gatekeeper.DB, - PoolSize: conf.Gatekeeper.Pool, - ReadTimeout: conf.Gatekeeper.Read, - }) - if err := gk.Ping(gk.Context()).Err(); err != nil { - log.Fatalf("failed to connect to gatekeeper: %v", err) + var gatekeeper *gate + if conf.Gatekeeper.Addr != "" { + gk := redis.NewClient(&redis.Options{ + Addr: conf.Gatekeeper.Addr, + DB: conf.Gatekeeper.DB, + PoolSize: conf.Gatekeeper.Pool, + ReadTimeout: conf.Gatekeeper.Read, + }) + if err := gk.Ping(gk.Context()).Err(); err != nil { + log.Fatalf("failed to connect to gatekeeper: %v", err) + } + + gatekeeper = &gate{ + cli: gk, + ttl: conf.Gatekeeper.TTL, + } } s := grpc.NewServer() @@ -260,11 +276,8 @@ func main() { coordinator: coord{ cli: cd, }, - gatekeeper: gate{ - cli: gk, - ttl: conf.Gatekeeper.TTL, - }, - responses: responses, + gatekeeper: gatekeeper, + responses: responses, } strelka.RegisterFrontendServer(s, opts)