Skip to content

Commit

Permalink
fix: uses new http server logic in engine metrics server now
Browse files Browse the repository at this point in the history
  • Loading branch information
norwoodj committed Aug 28, 2021
1 parent 0880940 commit b3a0be8
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 73 deletions.
114 changes: 79 additions & 35 deletions cmd/hashbash-engine/engine.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,74 @@
package main

import (
"context"
"github.com/coreos/go-systemd/activation"
"github.com/gorilla/handlers"
"golang.org/x/sync/errgroup"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/norwoodj/hashbash-backend-go/pkg/dao"
"github.com/norwoodj/hashbash-backend-go/pkg/metrics"
"github.com/norwoodj/hashbash-backend-go/pkg/rabbitmq"
"github.com/norwoodj/hashbash-backend-go/pkg/rainbow"
"github.com/norwoodj/hashbash-backend-go/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

func startConsumersAndHandleSignals(
consumers rabbitmq.HashbashMqConsumerWorkers,
shutdownGraceDuration time.Duration,
waitGroup *sync.WaitGroup,
) {
defer waitGroup.Done()
consumerStartErrorChannels := []chan error{make(chan error), make(chan error), make(chan error)}
quit := make(chan bool)

log.Infof("Starting hashbash consumers...")
go consumers.HashbashDeleteRainbowTableConsumer.ConsumeMessages(quit, consumerStartErrorChannels[0])
go consumers.HashbashGenerateRainbowTableConsumer.ConsumeMessages(quit, consumerStartErrorChannels[1])
go consumers.HashbashSearchRainbowTableConsumer.ConsumeMessages(quit, consumerStartErrorChannels[2])
func toErrFunc(f func(chan error)) func() error {
startErrors := make(chan error)

for _, errorChannel := range consumerStartErrorChannels {
consumerStartError := <-errorChannel
return func() error {
f(startErrors)

if consumerStartError != nil {
log.Error(consumerStartError)
os.Exit(1)
for e := range startErrors {
return e
}

return nil
}
}

gracefulStop := make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGTERM)
signal.Notify(gracefulStop, syscall.SIGINT)
func startConsumers(
consumers rabbitmq.HashbashMqConsumerWorkers,
startErrGroup *errgroup.Group,
shutdownErrGroup *errgroup.Group,
) chan bool {
quit := make(chan bool)

shutdownSignal := <-gracefulStop
log.Info().Msg("Starting hashbash consumers...")
startErrGroup.Go(toErrFunc(
func(startErrors chan error) {
consumers.HashbashDeleteRainbowTableConsumer.ConsumeMessages(quit, startErrors)
},
))

startErrGroup.Go(toErrFunc(
func(startErrors chan error) {
consumers.HashbashGenerateRainbowTableConsumer.ConsumeMessages(quit, startErrors)
},
))

startErrGroup.Go(toErrFunc(
func(startErrors chan error) {
consumers.HashbashSearchRainbowTableConsumer.ConsumeMessages(quit, startErrors)
},
))

return quit
}

log.Infof("Received Signal %s, shutting down gracefully, waiting %s for channels to close...", shutdownSignal, shutdownGraceDuration)
func registerShutdownConsumers(quit chan bool, ctx context.Context) {
<-ctx.Done()
close(quit)
time.Sleep(shutdownGraceDuration)
}

func hashbashEngine(_ *cobra.Command, _ []string) {
err := util.SetupLogging()
if err != nil {
log.Error(err)
log.Error().Err(err).Msg("Failed to setup logging")
os.Exit(1)
}

Expand Down Expand Up @@ -107,13 +121,43 @@ func hashbashEngine(_ *cobra.Command, _ []string) {
)

if err != nil {
log.Errorf("Failed to instantiate rabbitmq consumers: %s", err)
log.Error().Err(err).Msg("Failed to instantiate rabbitmq consumers")
os.Exit(1)
}

waitGroup := sync.WaitGroup{}
waitGroup.Add(2)
done, cancel := context.WithCancel(context.Background())
startErrGroup, _ := errgroup.WithContext(done)
shutdownErrGroup, _ := errgroup.WithContext(done)

quit := startConsumers(hashbashConsumers, startErrGroup, shutdownErrGroup)
go registerShutdownConsumers(quit, done)

systemdListenersByName, err := activation.ListenersWithNames()

if err != nil {
log.Fatal().
Err(err).
Msg("Failed to retrieve systemd sockets by name")
}

managementHandler := handlers.LoggingHandler(os.Stdout, util.GetManagementHandler())
for _, addr := range viper.GetStringSlice("management-addr") {
listener := util.GetTcpListenerOrDie(addr)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, listener, managementHandler)
}

for _, socketPath := range viper.GetStringSlice("management-sock") {
listener := util.GetUnixSocketListenerOrDie(socketPath)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, listener, managementHandler)
}

for _, socketFdName := range viper.GetStringSlice("management-name") {
listeners := util.GetSystemdListenersOrDie(socketFdName, systemdListenersByName)

for _, l := range listeners {
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, l, managementHandler)
}
}

go startConsumersAndHandleSignals(hashbashConsumers, viper.GetDuration("shutdown-timeout"), &waitGroup)
waitGroup.Wait()
util.WaitForSignalGracefulShutdown(cancel, startErrGroup, shutdownErrGroup)
}
38 changes: 7 additions & 31 deletions cmd/hashbash-webapp/webapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package main
import (
"context"
"github.com/coreos/go-systemd/activation"
"net"
"net/http"
"os"
"strings"

Expand Down Expand Up @@ -54,12 +52,6 @@ func walkRoutes(router *mux.Router) {
})
}

func startHttpHandler(startErrGroup *errgroup.Group, shutdownErrGroup *errgroup.Group, done context.Context, listener net.Listener, handler http.Handler) {
server := util.GetServerForHandler(handler)
startErrGroup.Go(func() error { return util.StartServer(server, listener) })
shutdownErrGroup.Go(func() error { return util.HandleServerShutdown(done, server, listener) })
}

func hashbashWebapp(_ *cobra.Command, _ []string) {
err := util.SetupLogging()
if err != nil {
Expand Down Expand Up @@ -112,56 +104,40 @@ func hashbashWebapp(_ *cobra.Command, _ []string) {

for _, addr := range viper.GetStringSlice("http-addr") {
listener := util.GetTcpListenerOrDie(addr)
startHttpHandler(startErrGroup, shutdownErrGroup, done, listener, loggedRouter)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, listener, loggedRouter)
}

for _, socketPath := range viper.GetStringSlice("http-sock") {
listener := util.GetUnixSocketListenerOrDie(socketPath)
startHttpHandler(startErrGroup, shutdownErrGroup, done, listener, loggedRouter)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, listener, loggedRouter)
}

for _, socketFdName := range viper.GetStringSlice("http-name") {
listeners := util.GetSystemdListenersOrDie(socketFdName, systemdListenersByName)

for _, l := range listeners {
startHttpHandler(startErrGroup, shutdownErrGroup, done, l, loggedRouter)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, l, loggedRouter)
}
}

managementHandler := handlers.LoggingHandler(os.Stdout, util.GetManagementHandler())
for _, addr := range viper.GetStringSlice("management-addr") {
listener := util.GetTcpListenerOrDie(addr)
startHttpHandler(startErrGroup, shutdownErrGroup, done, listener, managementHandler)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, listener, managementHandler)
}

for _, socketPath := range viper.GetStringSlice("management-sock") {
listener := util.GetUnixSocketListenerOrDie(socketPath)
startHttpHandler(startErrGroup, shutdownErrGroup, done, listener, managementHandler)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, listener, managementHandler)
}

for _, socketFdName := range viper.GetStringSlice("management-name") {
listeners := util.GetSystemdListenersOrDie(socketFdName, systemdListenersByName)

for _, l := range listeners {
startHttpHandler(startErrGroup, shutdownErrGroup, done, l, managementHandler)
util.StartHttpHandler(startErrGroup, shutdownErrGroup, done, l, managementHandler)
}
}

go util.WaitForSignalGracefulShutdown(cancel)

go func() {
if err := startErrGroup.Wait(); err != nil {
log.Fatal().
Err(err).
Msg("Failed to start servers")
}
}()

if err := shutdownErrGroup.Wait(); err != nil {
log.Fatal().
Err(err).
Msg("Error shutting down servers")
}

log.Info().Msg("Shutdown successful")
util.WaitForSignalGracefulShutdown(cancel, startErrGroup, shutdownErrGroup)
}
40 changes: 33 additions & 7 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"context"
"golang.org/x/sync/errgroup"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -82,14 +83,38 @@ func GetSystemdListenersOrDie(socketFdName string, listenersByName map[string][]
return listener
}

func WaitForSignalGracefulShutdown(cancel context.CancelFunc) {
gracefulStop := make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGTERM)
signal.Notify(gracefulStop, syscall.SIGINT)
func WaitForSignalGracefulShutdown(cancel context.CancelFunc, startErrGroup *errgroup.Group, shutdownErrGroup *errgroup.Group) {
go func() {
gracefulStop := make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGTERM)
signal.Notify(gracefulStop, syscall.SIGINT)

shutdownSignal := <-gracefulStop
log.Info().Msgf("Received signal %s, stopping servers...", shutdownSignal)
cancel()
}()

go func() {
if err := startErrGroup.Wait(); err != nil {
log.Fatal().
Err(err).
Msg("Failed to start servers")
}
}()

if err := shutdownErrGroup.Wait(); err != nil {
log.Fatal().
Err(err).
Msg("Error shutting down servers")
}

shutdownSignal := <-gracefulStop
log.Info().Msgf("Received signal %s, stopping servers...", shutdownSignal)
cancel()
log.Info().Msg("Shutdown successful")
}

func StartHttpHandler(startErrGroup *errgroup.Group, shutdownErrGroup *errgroup.Group, done context.Context, listener net.Listener, handler http.Handler) {
server := GetServerForHandler(handler)
startErrGroup.Go(func() error { return StartServer(server, listener) })
shutdownErrGroup.Go(func() error { return HandleServerShutdown(done, server, listener) })
}

func GetServerForHandler(handler http.Handler) http.Server {
Expand Down Expand Up @@ -133,6 +158,7 @@ func HandleServerShutdown(done context.Context, server http.Server, listener net
return nil
}


func GetManagementHandler() http.Handler {
healthcheckHandler := healthcheck.NewHandler()
prometheusHandler := promhttp.Handler()
Expand Down

0 comments on commit b3a0be8

Please sign in to comment.