Skip to content

Commit

Permalink
Allow GateKeeper to be optional (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
cameron-dunn-sublime authored Dec 13, 2021
1 parent 984639d commit f6f6fd3
Showing 1 changed file with 35 additions and 22 deletions.
57 changes: 35 additions & 22 deletions src/go/cmd/strelka-frontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type gate struct {

type server struct {
coordinator coord
gatekeeper gate
gatekeeper *gate
responses chan<- *strelka.ScanResponse
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error {
Time: time.Now().Unix(),
}

if req.Gatekeeper {
if req.Gatekeeper && s.gatekeeper != nil {
lrange := s.gatekeeper.cli.LRange(stream.Context(), sha, 0, -1).Val()
if len(lrange) > 0 {
for _, e := range lrange {
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error {
}

if err := s.coordinator.cli.ZAdd(
stream.Context(),
stream.Context(),
"tasks",
&redis.Z{
Score: float64(deadline.Unix()),
Expand All @@ -150,8 +150,12 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error {
return err
}

tx := s.gatekeeper.cli.TxPipeline()
tx.Del(stream.Context(), sha)
var tx *redis.Pipeliner
if s.gatekeeper != nil {
pipeliner := s.gatekeeper.cli.TxPipeline()
tx = &pipeliner
(*tx).Del(stream.Context(), sha)
}

for {
lpop, err := s.coordinator.cli.LPop(stream.Context(), keye).Result()
Expand All @@ -163,7 +167,9 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error {
break
}

tx.RPush(stream.Context(), sha, lpop)
if tx != nil {
(*tx).RPush(stream.Context(), sha, lpop)
}
if err := json.Unmarshal([]byte(lpop), &em); err != nil {
return err
}
Expand All @@ -184,9 +190,11 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error {
}
}

tx.Expire(stream.Context(), sha, s.gatekeeper.ttl)
if _, err := tx.Exec(stream.Context()); err != nil {
return err
if tx != nil {
(*tx).Expire(stream.Context(), sha, s.gatekeeper.ttl)
if _, err := (*tx).Exec(stream.Context()); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -245,26 +253,31 @@ func main() {
log.Fatalf("failed to connect to coordinator: %v", err)
}

gk := redis.NewClient(&redis.Options{
Addr: conf.Gatekeeper.Addr,
DB: conf.Gatekeeper.DB,
PoolSize: conf.Gatekeeper.Pool,
ReadTimeout: conf.Gatekeeper.Read,
})
if err := gk.Ping(gk.Context()).Err(); err != nil {
log.Fatalf("failed to connect to gatekeeper: %v", err)
var gatekeeper *gate
if conf.Gatekeeper.Addr != "" {
gk := redis.NewClient(&redis.Options{
Addr: conf.Gatekeeper.Addr,
DB: conf.Gatekeeper.DB,
PoolSize: conf.Gatekeeper.Pool,
ReadTimeout: conf.Gatekeeper.Read,
})
if err := gk.Ping(gk.Context()).Err(); err != nil {
log.Fatalf("failed to connect to gatekeeper: %v", err)
}

gatekeeper = &gate{
cli: gk,
ttl: conf.Gatekeeper.TTL,
}
}

s := grpc.NewServer()
opts := &server{
coordinator: coord{
cli: cd,
},
gatekeeper: gate{
cli: gk,
ttl: conf.Gatekeeper.TTL,
},
responses: responses,
gatekeeper: gatekeeper,
responses: responses,
}

strelka.RegisterFrontendServer(s, opts)
Expand Down

0 comments on commit f6f6fd3

Please sign in to comment.