Skip to content

Commit

Permalink
Rename output and stop in sigint and sigterm properly
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr committed May 26, 2021
1 parent e0a8999 commit 8884c52
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 23 deletions.
11 changes: 6 additions & 5 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

// Register outputs.
_ "github.com/elastic/stream/pkg/output/gcppubsub"
_ "github.com/elastic/stream/pkg/output/httpserver"
_ "github.com/elastic/stream/pkg/output/tcp"
_ "github.com/elastic/stream/pkg/output/tls"
_ "github.com/elastic/stream/pkg/output/udp"
Expand Down Expand Up @@ -74,11 +75,11 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag")

// HTTP output flags.
rootCmd.PersistentFlags().DurationVar(&opts.HTTPSrvOptions.ReadTimeout, "httpsrv-read-timeout", 5*time.Second, "HTTP Server read timeout")
rootCmd.PersistentFlags().DurationVar(&opts.HTTPSrvOptions.WriteTimeout, "httpsrv-write-timeout", 5*time.Second, "HTTP Server write timeout")
rootCmd.PersistentFlags().StringVar(&opts.HTTPSrvOptions.TLSCertificate, "httpsrv-tls-cert", "", "Path to the TLS certificate")
rootCmd.PersistentFlags().StringVar(&opts.HTTPSrvOptions.TLSKey, "httpsrv-tls-key", "", "Path to the TLS key file")
rootCmd.PersistentFlags().StringArrayVar(&opts.HTTPSrvOptions.ResponseHeaders, "httpsrv-response-headers", []string{"content-type", "application/json"}, "List of headers key-values")
rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.ReadTimeout, "http-server-read-timeout", 5*time.Second, "HTTP Server read timeout")
rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.WriteTimeout, "http-server-write-timeout", 5*time.Second, "HTTP Server write timeout")
rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSCertificate, "http-server-tls-cert", "", "Path to the TLS certificate")
rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSKey, "http-server-tls-key", "", "Path to the TLS key file")
rootCmd.PersistentFlags().StringArrayVar(&opts.HTTPServerOptions.ResponseHeaders, "http-server-response-headers", []string{"content-type", "application/json"}, "List of headers key-values")

// Sub-commands.
rootCmd.AddCommand(newLogRunner(&opts, logger))
Expand Down
41 changes: 25 additions & 16 deletions pkg/output/http/http.go → pkg/output/httpserver/httpserver.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package http
package httpserver

import (
"context"
Expand All @@ -10,36 +10,39 @@ import (
"io/ioutil"
"net/http"
"strings"
"syscall"
"time"

"github.com/elastic/go-concert/ctxtool/osctx"
"go.uber.org/zap"

"github.com/elastic/stream/pkg/log"
"github.com/elastic/stream/pkg/output"
)

func init() {
output.Register("httpsrv", New)
output.Register("http-server", New)
}

type Output struct {
logger *zap.SugaredLogger
opts *output.Options
server *http.Server
logChan chan []byte
ctx context.Context
}

func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("a listen address is required")
}

if !(opts.HTTPSrvOptions.TLSCertificate == "" && opts.HTTPSrvOptions.TLSKey == "") &&
!(opts.HTTPSrvOptions.TLSCertificate != "" && opts.HTTPSrvOptions.TLSKey != "") {
if !(opts.HTTPServerOptions.TLSCertificate == "" && opts.HTTPServerOptions.TLSKey == "") &&
!(opts.HTTPServerOptions.TLSCertificate != "" && opts.HTTPServerOptions.TLSKey != "") {
return nil, errors.New("both TLS certificate and key files must be defined")
}

if len(opts.HTTPSrvOptions.ResponseHeaders)%2 != 0 {
if len(opts.HTTPServerOptions.ResponseHeaders)%2 != 0 {
return nil, errors.New("response headers must be a list of pairs")
}

Expand All @@ -52,8 +55,8 @@ func New(opts *output.Options) (output.Output, error) {
logChan := make(chan []byte)
server := &http.Server{
Addr: opts.Addr,
ReadTimeout: opts.HTTPSrvOptions.ReadTimeout,
WriteTimeout: opts.HTTPSrvOptions.WriteTimeout,
ReadTimeout: opts.HTTPServerOptions.ReadTimeout,
WriteTimeout: opts.HTTPServerOptions.WriteTimeout,
MaxHeaderBytes: 1 << 20,
Handler: newHandler(opts, logChan, slogger),
}
Expand All @@ -63,6 +66,7 @@ func New(opts *output.Options) (output.Output, error) {
opts: opts,
server: server,
logChan: logChan,
ctx: newContext(),
}, nil
}

Expand All @@ -78,19 +82,19 @@ func (o *Output) DialContext(ctx context.Context) error {
func (o *Output) Close() error {
defer close(o.logChan)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
o.logger.Infow("shutting down http_server...")

ctx, cancel := context.WithTimeout(o.ctx, time.Second)
defer cancel()

_ = o.server.Shutdown(ctx)
return nil
return o.server.Shutdown(ctx)
}

func (o *Output) Write(b []byte) (int, error) {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-timer.C:
return 0, errors.New("waiting to write for too long")
case <-o.ctx.Done():
o.logger.Infow("the output has been closed")
return 0, nil
case o.logChan <- b:
return len(b), nil
}
Expand All @@ -103,8 +107,8 @@ func newHandler(opts *output.Options, logChan <-chan []byte, logger *zap.Sugared
defer r.Body.Close()
logger.Debug(strRequest(r))

for i := 0; i < len(opts.HTTPSrvOptions.ResponseHeaders); i += 2 {
w.Header().Add(opts.HTTPSrvOptions.ResponseHeaders[i], opts.HTTPSrvOptions.ResponseHeaders[i+1])
for i := 0; i < len(opts.HTTPServerOptions.ResponseHeaders); i += 2 {
w.Header().Add(opts.HTTPServerOptions.ResponseHeaders[i], opts.HTTPServerOptions.ResponseHeaders[i+1])
}

_, _ = w.Write(b)
Expand All @@ -124,3 +128,8 @@ func strRequest(r *http.Request) string {
b.Write(body)
return b.String()
}

func newContext() context.Context {
startCtx, _ := osctx.WithSignal(context.Background(), syscall.SIGINT, syscall.SIGTERM)
return startCtx
}
4 changes: 2 additions & 2 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Options struct {

WebhookOptions
GCPPubsubOptions
HTTPSrvOptions
HTTPServerOptions
}

type WebhookOptions struct {
Expand All @@ -34,7 +34,7 @@ type GCPPubsubOptions struct {
Clear bool // Clear will clear all topics and subscriptions before running.
}

type HTTPSrvOptions struct {
type HTTPServerOptions struct {
TLSCertificate string // TLS certificate file path.
TLSKey string // TLS key file path.
ResponseHeaders []string // KV list of response headers.
Expand Down

0 comments on commit 8884c52

Please sign in to comment.