Skip to content

Commit

Permalink
Merge pull request #1985 from keboola/feat-add-http-source-server-res…
Browse files Browse the repository at this point in the history
…ponse-header

feat: Add server response header for HTTP source node
  • Loading branch information
Matovidlo authored Sep 3, 2024
2 parents a8810c7 + 58ff0a5 commit 4ae8247
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/source/httpsource/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func newErrorHandler(cfg Config, logger log.Logger) func(c *fasthttp.RequestCtx,
// Error handler
errorWriter := newErrorWriter(logger)
return func(c *fasthttp.RequestCtx, err error) {
c.Response.Header.Set("Server", ServerHeader)

var smallBufferErr *fasthttp.ErrSmallBuffer

switch {
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/service/stream/source/httpsource/httpsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

const (
ServerHeader = "Keboola/Stream/HTTPSource"
gracefulShutdownTimeout = 30 * time.Second
)

Expand Down Expand Up @@ -53,6 +54,10 @@ func Start(ctx context.Context, d dependencies, cfg Config) error {

// Static routes
router := routing.New()
router.Use(func(c *routing.Context) error {
c.Response.Header.Set("Server", ServerHeader)
return nil
})
router.NotFound(routing.MethodNotAllowedHandler, func(c *routing.Context) error {
errorHandler(c.RequestCtx, svcErrors.NewRouteNotFound(errors.New("not found, please send data using POST /stream/<projectID>/<sourceID>/<secret>")))
return nil
Expand Down
17 changes: 17 additions & 0 deletions internal/pkg/service/stream/source/httpsource/httpsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/pipeline"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/source/httpsource"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
Expand Down Expand Up @@ -166,13 +167,15 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Method: http.MethodGet,
Path: "/health-check",
ExpectedStatusCode: http.StatusOK,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: "OK\n",
},
{
Name: "not found",
Method: http.MethodGet,
Path: "/foo",
ExpectedStatusCode: http.StatusNotFound,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedLogs: `{"level":"info","message":"not found, please send data using POST /stream/<projectID>/<sourceID>/<secret>"}`,
ExpectedBody: `
{
Expand All @@ -189,6 +192,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
ExpectedHeaders: map[string]string{
"Allow": "OPTIONS, POST",
"Content-Length": "0",
"Server": httpsource.ServerHeader,
},
},
{
Expand All @@ -197,6 +201,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Path: "/stream/foo/my-source/my-secret",
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)),
ExpectedStatusCode: http.StatusBadRequest,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: `
{
"statusCode": 400,
Expand All @@ -210,6 +215,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Path: "/stream/1111/my-source/my-secret",
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)),
ExpectedStatusCode: http.StatusNotFound,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: `
{
"statusCode": 404,
Expand All @@ -223,6 +229,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Path: "/stream/123/my-source-1/" + ts.invalidSecret,
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)),
ExpectedStatusCode: http.StatusNotFound,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: `
{
"statusCode": 404,
Expand All @@ -236,6 +243,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Path: "/stream/123/my-source-2/" + ts.validSecret,
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)),
ExpectedStatusCode: http.StatusNotFound,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: `
{
"statusCode": 404,
Expand All @@ -257,6 +265,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
ExpectedStatusCode: http.StatusInternalServerError,
ExpectedHeaders: map[string]string{
"Content-Type": "application/json",
"Server": httpsource.ServerHeader,
},
ExpectedLogs: `
{"level":"error","message":"write record error: cannot open sink pipeline: some open error, next attempt after %s","component":"sink.router"}
Expand Down Expand Up @@ -318,6 +327,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
ExpectedStatusCode: http.StatusInternalServerError,
ExpectedHeaders: map[string]string{
"Content-Type": "application/json",
"Server": httpsource.ServerHeader,
},
ExpectedLogs: `
{"level":"error","message":"write record error: some write error","component":"sink.router"}
Expand Down Expand Up @@ -378,6 +388,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
ExpectedStatusCode: http.StatusAccepted,
ExpectedHeaders: map[string]string{
"Content-Type": "text/plain",
"Server": httpsource.ServerHeader,
},
ExpectedBody: "OK",
},
Expand All @@ -395,6 +406,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
ExpectedStatusCode: http.StatusOK,
ExpectedHeaders: map[string]string{
"Content-Type": "text/plain",
"Server": httpsource.ServerHeader,
},
ExpectedBody: "OK",
},
Expand All @@ -411,6 +423,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Query: "verbose=true",
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)),
ExpectedStatusCode: http.StatusAccepted,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: `
{
"statusCode": 202,
Expand Down Expand Up @@ -460,6 +473,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Query: "verbose=true",
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)),
ExpectedStatusCode: http.StatusOK,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: `
{
"statusCode": 200,
Expand Down Expand Up @@ -502,6 +516,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Path: "/stream/123/my-source-1/" + ts.validSecret,
Headers: map[string]string{"foo": strings.Repeat(".", ts.maxHeaderSize+1)},
ExpectedStatusCode: http.StatusRequestEntityTooLarge,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedLogs: `{"level":"info","message":"request header size is over the maximum \"2000B\"","error.type":"%s/errors.HeaderTooLargeError"}`,
ExpectedBody: `
{
Expand All @@ -516,6 +531,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Path: "/stream/123/my-source/" + ts.validSecret,
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize+1)),
ExpectedStatusCode: http.StatusRequestEntityTooLarge,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedLogs: `{"level":"info","message":"request body size is over the maximum \"8000B\"","error.type":"%s/errors.BodyTooLargeError"}`,
ExpectedBody: `
{
Expand Down Expand Up @@ -544,6 +560,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
Query: "verbose=true",
Body: strings.NewReader("foo"),
ExpectedStatusCode: http.StatusOK,
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
ExpectedBody: `
{
"statusCode": 200,
Expand Down

0 comments on commit 4ae8247

Please sign in to comment.