Skip to content

Commit

Permalink
Merge branch 'master' into fix.sendLiveEventsRouterTransformFailures
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Nov 3, 2022
2 parents 83c17d0 + c9bd5d4 commit 7ecd780
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 72 deletions.
22 changes: 18 additions & 4 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,10 +662,24 @@ func (worker *workerT) processDestinationJobs() {
if worker.rt.transformerProxy {
jobID := destinationJob.JobMetadataArray[0].JobID
pkgLogger.Debugf(`[TransformerProxy] (Dest-%[1]v) {Job - %[2]v} Request started`, worker.rt.destName, jobID)

// setting metadata
firstJobMetadata := destinationJob.JobMetadataArray[0]
proxyReqparams := &transformer.ProxyRequestParams{
DestName: worker.rt.destName,
JobID: jobID,
ResponseData: val,
DestName: worker.rt.destName,
JobID: jobID,
ResponseData: transformer.ProxyRequestPayload{
PostParametersT: val,
Metadata: transformer.ProxyRequestMetadata{
SourceID: firstJobMetadata.SourceID,
DestinationID: firstJobMetadata.DestinationID,
WorkspaceID: firstJobMetadata.WorkspaceID,
JobID: firstJobMetadata.JobID,
AttemptNum: firstJobMetadata.AttemptNum,
DestInfo: firstJobMetadata.DestInfo,
Secret: firstJobMetadata.Secret,
},
},
}
rtlTime := time.Now()
respStatusCode, respBodyTemp, respContentType = worker.rt.transformer.ProxyRequest(ctx, proxyReqparams)
Expand All @@ -681,7 +695,7 @@ func (worker *workerT) processDestinationJobs() {
workerID: worker.workerID,
trRespStCd: respStatusCode,
trRespBody: respBodyTemp,
secret: destinationJob.JobMetadataArray[0].Secret,
secret: firstJobMetadata.Secret,
})
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/tidwall/gjson"
"math"
"testing"
"time"

"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-server/enterprise/reporting"

jsoniter "github.com/json-iterator/go"
Expand Down
18 changes: 17 additions & 1 deletion router/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package transformer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -48,8 +49,23 @@ type handle struct {
logger logger.Logger
}

type ProxyRequestMetadata struct {
JobID int64 `json:"jobId"`
AttemptNum int `json:"attemptNum"`
UserID string `json:"userId"`
SourceID string `json:"sourceId"`
DestinationID string `json:"destinationId"`
WorkspaceID string `json:"workspaceId"`
Secret json.RawMessage `json:"secret"`
DestInfo json.RawMessage `json:"destInfo,omitempty"`
}

type ProxyRequestPayload struct {
integrations.PostParametersT
Metadata ProxyRequestMetadata `json:"metadata,omitempty"`
}
type ProxyRequestParams struct {
ResponseData integrations.PostParametersT
ResponseData ProxyRequestPayload
DestName string
JobID int64
BaseUrl string
Expand Down
142 changes: 76 additions & 66 deletions router/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestProxyRequest(t *testing.T) {
// For http client timeout scenarios, we need to have a proxyTimeout which is > rtTimeout + <timeout_at_router_transform>
rtTimeout time.Duration
// Transformed response that needs to be sent to destination
postParameters integrations.PostParametersT
postParameters ProxyRequestPayload
context proxyContext
}

Expand All @@ -93,21 +93,23 @@ func TestProxyRequest(t *testing.T) {
timeout: 0,
},
rtTimeout: 10 * time.Millisecond,
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -125,21 +127,23 @@ func TestProxyRequest(t *testing.T) {
timeout: time.Duration(1.2 * 1e9),
},
rtTimeout: 8 * time.Millisecond,
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest_1.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.good_dest_1.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -159,21 +163,23 @@ func TestProxyRequest(t *testing.T) {
context: proxyContext{
timeout: 2 * time.Millisecond,
},
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -192,21 +198,23 @@ func TestProxyRequest(t *testing.T) {
context: proxyContext{
cancel: true,
},
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.ctx_timeout_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
{
Expand All @@ -223,21 +231,23 @@ func TestProxyRequest(t *testing.T) {
response: `Not Found`,
},
rtTimeout: 10 * time.Millisecond,
postParameters: integrations.PostParametersT{
Type: "REST",
URL: "http://www.not_found_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
postParameters: ProxyRequestPayload{
PostParametersT: integrations.PostParametersT{
Type: "REST",
URL: "http://www.not_found_dest.domain.com",
RequestMethod: http.MethodPost,
QueryParams: map[string]interface{}{},
Body: map[string]interface{}{
"JSON": map[string]interface{}{
"key_1": "val_1",
"key_2": "val_2",
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
},
"FORM": map[string]interface{}{},
"JSON_ARRAY": map[string]interface{}{},
"XML": map[string]interface{}{},
Files: map[string]interface{}{},
},
Files: map[string]interface{}{},
},
},
}
Expand Down
1 change: 1 addition & 0 deletions router/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type JobMetadataT struct {
Secret json.RawMessage `json:"secret"`
JobT *jobsdb.JobT `json:"jobsT"`
WorkerAssignedTime time.Time `json:"workerAssignedTime"`
DestInfo json.RawMessage `json:"destInfo,omitempty"`
}

// TransformMessageT is used to pass message to the transformer workers
Expand Down

0 comments on commit 7ecd780

Please sign in to comment.