Skip to content

Commit

Permalink
Merge pull request #2 from G-Research/master
Browse files Browse the repository at this point in the history
Merge recent changes in master
  • Loading branch information
ericjohnsohnisc authored and Eric Jonsohn committed Dec 23, 2020
2 parents 7ff8518 + 33ac218 commit cd43d4a
Show file tree
Hide file tree
Showing 41 changed files with 958 additions and 226 deletions.
1 change: 1 addition & 0 deletions cmd/armada/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {
shutdownGateway := grpc.ServeGateway(
config.HttpPort,
config.GrpcPort,
config.CorsAllowedOrigins,
api.SwaggerJsonTemplate(),
api.RegisterSubmitHandler,
api.RegisterEventHandler,
Expand Down
1 change: 1 addition & 0 deletions cmd/lookout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func main() {
mux, shutdownGateway := grpc.CreateGatewayHandler(
config.GrpcPort,
"/api/",
[]string{},
lookoutApi.SwaggerJsonTemplate(),
lookoutApi.RegisterLookoutHandler)

Expand Down
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
grpcPort: 50051
httpPort: 8080
metricsPort: 9000
corsAllowedOrigins: []
priorityHalfTime: 20m
redis:
addrs:
Expand Down
1 change: 1 addition & 0 deletions e2e/setup/kafka/armada-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
corsAllowedOrigins: ["http://localhost:3000"]
eventsKafka:
brokers:
- "localhost:9092"
Expand Down
1 change: 1 addition & 0 deletions e2e/setup/nats/armada-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
corsAllowedOrigins: ["http://localhost:3000"]
eventsNats:
Servers:
- "nats://localhost:4223"
Expand Down
3 changes: 3 additions & 0 deletions internal/armada/authorization/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -88,6 +89,8 @@ func CreateMiddlewareAuthFunction(authServices []AuthService) grpc_auth.AuthFunc
if err != nil {
return nil, err
}
// record user name for request logging
grpc_ctxtags.Extract(ctx).Set("user", principal.GetName())
return WithPrincipal(ctx, principal), nil
}
return nil, status.Errorf(codes.Unauthenticated, "Request in not authenticated with any of the supported schemes.")
Expand Down
1 change: 1 addition & 0 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type ArmadaConfig struct {
GrpcPort uint16
HttpPort uint16
MetricsPort uint16
CorsAllowedOrigins []string
PriorityHalfTime time.Duration
Redis redis.UniversalOptions
EventsKafka KafkaConfig
Expand Down
28 changes: 26 additions & 2 deletions internal/common/grpc/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ import (

protoutil "github.com/G-Research/armada/internal/armada/protoutils"
"github.com/G-Research/armada/internal/common"
"github.com/G-Research/armada/internal/common/util"
)

func ServeGateway(
port uint16,
grpcPort uint16,
corsAllowedOrigins []string,
spec string,
handlers ...func(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error) (shutdown func()) {

mux, shutdownGateway := CreateGatewayHandler(grpcPort, "/", spec, handlers...)
mux, shutdownGateway := CreateGatewayHandler(grpcPort, "/", corsAllowedOrigins, spec, handlers...)
cancel := common.ServeHttp(port, mux)

return func() {
Expand All @@ -34,6 +36,7 @@ func ServeGateway(
func CreateGatewayHandler(
grpcPort uint16,
apiBasePath string,
corsAllowedOrigins []string,
spec string,
handlers ...func(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error) (handler *http.ServeMux, shutdown func()) {

Expand Down Expand Up @@ -66,7 +69,7 @@ func CreateGatewayHandler(
}
}

mux.Handle(apiBasePath, gw)
mux.Handle(apiBasePath, allowCORS(gw, corsAllowedOrigins))
mux.Handle(path.Join(apiBasePath, "swagger.json"), middleware.Spec(apiBasePath, []byte(spec), nil))

return mux, func() {
Expand All @@ -75,6 +78,27 @@ func CreateGatewayHandler(
}
}

func allowCORS(h http.Handler, corsAllowedOrigins []string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if origin := r.Header.Get("Origin"); origin != "" && util.ContainsString(corsAllowedOrigins, origin) {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Credentials", "true")
if r.Method == "OPTIONS" && r.Header.Get("Access-Control-Request-Method") != "" {
preflightHandler(w)
return
}
}
h.ServeHTTP(w, r)
})
}

func preflightHandler(w http.ResponseWriter) {
headers := []string{"Content-Type", "Accept", "Authorization"}
w.Header().Set("Access-Control-Allow-Headers", strings.Join(headers, ","))
methods := []string{"GET", "HEAD", "POST", "PUT", "DELETE"}
w.Header().Set("Access-Control-Allow-Methods", strings.Join(methods, ","))
}

func health(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
11 changes: 11 additions & 0 deletions internal/common/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand All @@ -21,6 +23,15 @@ func CreateGrpcServer(authServices []authorization.AuthService) *grpc.Server {
unaryInterceptors := []grpc.UnaryServerInterceptor{}
streamInterceptors := []grpc.StreamServerInterceptor{}

messageDefault := log.NewEntry(log.StandardLogger())
tagsExtractor := grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)
unaryInterceptors = append(unaryInterceptors,
grpc_ctxtags.UnaryServerInterceptor(tagsExtractor),
grpc_logrus.UnaryServerInterceptor(messageDefault))
streamInterceptors = append(streamInterceptors,
grpc_ctxtags.StreamServerInterceptor(tagsExtractor),
grpc_logrus.StreamServerInterceptor(messageDefault))

authFunction := authorization.CreateMiddlewareAuthFunction(authServices)
unaryInterceptors = append(unaryInterceptors, grpc_auth.UnaryServerInterceptor(authFunction))
streamInterceptors = append(streamInterceptors, grpc_auth.StreamServerInterceptor(authFunction))
Expand Down
12 changes: 12 additions & 0 deletions internal/common/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,22 @@ func ConfigureCommandLineLogging() {
}

func ConfigureLogging() {
log.SetLevel(readEnvironmentLogLevel())
log.SetFormatter(&log.TextFormatter{ForceColors: true, FullTimestamp: true})
log.SetOutput(os.Stdout)
}

func readEnvironmentLogLevel() log.Level {
level, ok := os.LookupEnv("LOG_LEVEL")
if ok {
logLevel, err := log.ParseLevel(level)
if err == nil {
return logLevel
}
}
return log.InfoLevel
}

func ServeMetrics(port uint16) (shutdown func()) {
hook := promrus.MustNewPrometheusHook()
log.AddHook(hook)
Expand Down
9 changes: 9 additions & 0 deletions internal/common/util/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,12 @@ func StringListToSet(list []string) map[string]bool {
}
return set
}

func ContainsString(list []string, val string) bool {
for _, elem := range list {
if elem == val {
return true
}
}
return false
}
16 changes: 14 additions & 2 deletions internal/lookout/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"sync"

"github.com/doug-martin/goqu/v9"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
_ "github.com/lib/pq"
log "github.com/sirupsen/logrus"

Expand All @@ -21,6 +23,12 @@ import (
"github.com/G-Research/armada/pkg/api/lookout"
)

type LogRusLogger struct{}

func (l LogRusLogger) Printf(format string, v ...interface{}) {
log.Debugf(format, v...)
}

func StartUp(config configuration.LookoutConfiguration) (func(), *sync.WaitGroup) {

wg := &sync.WaitGroup{}
Expand All @@ -32,9 +40,11 @@ func StartUp(config configuration.LookoutConfiguration) (func(), *sync.WaitGroup
if err != nil {
panic(err)
}
goquDb := goqu.New("postgres", db)
goquDb.Logger(&LogRusLogger{})

jobStore := repository.NewSQLJobStore(db)
jobRepository := repository.NewSQLJobRepository(db)
jobStore := repository.NewSQLJobStore(goquDb)
jobRepository := repository.NewSQLJobRepository(goquDb)

conn, err := stanUtil.DurableConnect(
config.Nats.ClusterID,
Expand All @@ -51,6 +61,8 @@ func StartUp(config configuration.LookoutConfiguration) (func(), *sync.WaitGroup
lookoutServer := server.NewLookoutServer(jobRepository)
lookout.RegisterLookoutServer(grpcServer, lookoutServer)

grpc_prometheus.Register(grpcServer)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort))
if err != nil {
log.Fatalf("failed to listen: %v", err)
Expand Down
5 changes: 2 additions & 3 deletions internal/lookout/repository/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ type SQLJobRepository struct {
goquDb *goqu.Database
}

func NewSQLJobRepository(db *sql.DB) *SQLJobRepository {
goquDb := goqu.New("postgres", db)
return &SQLJobRepository{goquDb: goquDb}
func NewSQLJobRepository(db *goqu.Database) *SQLJobRepository {
return &SQLJobRepository{goquDb: db}
}

var (
Expand Down
Loading

0 comments on commit cd43d4a

Please sign in to comment.