diff --git a/mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go b/mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go index a4fac5decc..f21c1c8386 100644 --- a/mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go +++ b/mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go @@ -1,205 +1,85 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload (interfaces: Uploader,HttpClient,Poller,ProfileExtractor,UploadStats) +// Source: github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload (interfaces: KlaviyoAPIService) // // Generated by this command: // -// mockgen -destination=../../../../mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go -package=mocks github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload Uploader,HttpClient,Poller,ProfileExtractor,UploadStats +// mockgen -destination=../../../../mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go -package=mocks github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload KlaviyoAPIService // // Package mocks is a generated GoMock package. package mocks import ( - http "net/http" reflect "reflect" - common "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" klaviyobulkupload "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload" gomock "go.uber.org/mock/gomock" ) -// MockUploader is a mock of Uploader interface. -type MockUploader struct { +// MockKlaviyoAPIService is a mock of KlaviyoAPIService interface. +type MockKlaviyoAPIService struct { ctrl *gomock.Controller - recorder *MockUploaderMockRecorder + recorder *MockKlaviyoAPIServiceMockRecorder } -// MockUploaderMockRecorder is the mock recorder for MockUploader. -type MockUploaderMockRecorder struct { - mock *MockUploader +// MockKlaviyoAPIServiceMockRecorder is the mock recorder for MockKlaviyoAPIService. +type MockKlaviyoAPIServiceMockRecorder struct { + mock *MockKlaviyoAPIService } -// NewMockUploader creates a new mock instance. -func NewMockUploader(ctrl *gomock.Controller) *MockUploader { - mock := &MockUploader{ctrl: ctrl} - mock.recorder = &MockUploaderMockRecorder{mock} +// NewMockKlaviyoAPIService creates a new mock instance. +func NewMockKlaviyoAPIService(ctrl *gomock.Controller) *MockKlaviyoAPIService { + mock := &MockKlaviyoAPIService{ctrl: ctrl} + mock.recorder = &MockKlaviyoAPIServiceMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockUploader) EXPECT() *MockUploaderMockRecorder { +func (m *MockKlaviyoAPIService) EXPECT() *MockKlaviyoAPIServiceMockRecorder { return m.recorder } -// Upload mocks base method. -func (m *MockUploader) Upload(arg0 *common.AsyncDestinationStruct) common.AsyncUploadOutput { +// GetUploadErrors mocks base method. +func (m *MockKlaviyoAPIService) GetUploadErrors(arg0 string) (*klaviyobulkupload.UploadStatusResp, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Upload", arg0) - ret0, _ := ret[0].(common.AsyncUploadOutput) - return ret0 -} - -// Upload indicates an expected call of Upload. -func (mr *MockUploaderMockRecorder) Upload(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upload", reflect.TypeOf((*MockUploader)(nil).Upload), arg0) -} - -// MockHttpClient is a mock of HttpClient interface. -type MockHttpClient struct { - ctrl *gomock.Controller - recorder *MockHttpClientMockRecorder -} - -// MockHttpClientMockRecorder is the mock recorder for MockHttpClient. -type MockHttpClientMockRecorder struct { - mock *MockHttpClient -} - -// NewMockHttpClient creates a new mock instance. -func NewMockHttpClient(ctrl *gomock.Controller) *MockHttpClient { - mock := &MockHttpClient{ctrl: ctrl} - mock.recorder = &MockHttpClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockHttpClient) EXPECT() *MockHttpClientMockRecorder { - return m.recorder -} - -// Do mocks base method. -func (m *MockHttpClient) Do(arg0 *http.Request) (*http.Response, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Do", arg0) - ret0, _ := ret[0].(*http.Response) + ret := m.ctrl.Call(m, "GetUploadErrors", arg0) + ret0, _ := ret[0].(*klaviyobulkupload.UploadStatusResp) ret1, _ := ret[1].(error) return ret0, ret1 } -// Do indicates an expected call of Do. -func (mr *MockHttpClientMockRecorder) Do(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Do", reflect.TypeOf((*MockHttpClient)(nil).Do), arg0) -} - -// MockPoller is a mock of Poller interface. -type MockPoller struct { - ctrl *gomock.Controller - recorder *MockPollerMockRecorder -} - -// MockPollerMockRecorder is the mock recorder for MockPoller. -type MockPollerMockRecorder struct { - mock *MockPoller -} - -// NewMockPoller creates a new mock instance. -func NewMockPoller(ctrl *gomock.Controller) *MockPoller { - mock := &MockPoller{ctrl: ctrl} - mock.recorder = &MockPollerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockPoller) EXPECT() *MockPollerMockRecorder { - return m.recorder -} - -// Poll mocks base method. -func (m *MockPoller) Poll(arg0 common.AsyncPoll) common.PollStatusResponse { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Poll", arg0) - ret0, _ := ret[0].(common.PollStatusResponse) - return ret0 -} - -// Poll indicates an expected call of Poll. -func (mr *MockPollerMockRecorder) Poll(arg0 any) *gomock.Call { +// GetUploadErrors indicates an expected call of GetUploadErrors. +func (mr *MockKlaviyoAPIServiceMockRecorder) GetUploadErrors(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Poll", reflect.TypeOf((*MockPoller)(nil).Poll), arg0) -} - -// MockProfileExtractor is a mock of ProfileExtractor interface. -type MockProfileExtractor struct { - ctrl *gomock.Controller - recorder *MockProfileExtractorMockRecorder -} - -// MockProfileExtractorMockRecorder is the mock recorder for MockProfileExtractor. -type MockProfileExtractorMockRecorder struct { - mock *MockProfileExtractor -} - -// NewMockProfileExtractor creates a new mock instance. -func NewMockProfileExtractor(ctrl *gomock.Controller) *MockProfileExtractor { - mock := &MockProfileExtractor{ctrl: ctrl} - mock.recorder = &MockProfileExtractorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockProfileExtractor) EXPECT() *MockProfileExtractorMockRecorder { - return m.recorder + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadErrors", reflect.TypeOf((*MockKlaviyoAPIService)(nil).GetUploadErrors), arg0) } -// ExtractProfiles mocks base method. -func (m *MockProfileExtractor) ExtractProfiles(arg0 klaviyobulkupload.Input) klaviyobulkupload.Profile { +// GetUploadStatus mocks base method. +func (m *MockKlaviyoAPIService) GetUploadStatus(arg0 string) (*klaviyobulkupload.PollResp, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ExtractProfiles", arg0) - ret0, _ := ret[0].(klaviyobulkupload.Profile) - return ret0 + ret := m.ctrl.Call(m, "GetUploadStatus", arg0) + ret0, _ := ret[0].(*klaviyobulkupload.PollResp) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// ExtractProfiles indicates an expected call of ExtractProfiles. -func (mr *MockProfileExtractorMockRecorder) ExtractProfiles(arg0 any) *gomock.Call { +// GetUploadStatus indicates an expected call of GetUploadStatus. +func (mr *MockKlaviyoAPIServiceMockRecorder) GetUploadStatus(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExtractProfiles", reflect.TypeOf((*MockProfileExtractor)(nil).ExtractProfiles), arg0) -} - -// MockUploadStats is a mock of UploadStats interface. -type MockUploadStats struct { - ctrl *gomock.Controller - recorder *MockUploadStatsMockRecorder -} - -// MockUploadStatsMockRecorder is the mock recorder for MockUploadStats. -type MockUploadStatsMockRecorder struct { - mock *MockUploadStats -} - -// NewMockUploadStats creates a new mock instance. -func NewMockUploadStats(ctrl *gomock.Controller) *MockUploadStats { - mock := &MockUploadStats{ctrl: ctrl} - mock.recorder = &MockUploadStatsMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockUploadStats) EXPECT() *MockUploadStatsMockRecorder { - return m.recorder + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadStatus", reflect.TypeOf((*MockKlaviyoAPIService)(nil).GetUploadStatus), arg0) } -// GetUploadStats mocks base method. -func (m *MockUploadStats) GetUploadStats(arg0 common.GetUploadStatsInput) common.GetUploadStatsResponse { +// UploadProfiles mocks base method. +func (m *MockKlaviyoAPIService) UploadProfiles(arg0 klaviyobulkupload.Payload) (*klaviyobulkupload.UploadResp, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetUploadStats", arg0) - ret0, _ := ret[0].(common.GetUploadStatsResponse) - return ret0 + ret := m.ctrl.Call(m, "UploadProfiles", arg0) + ret0, _ := ret[0].(*klaviyobulkupload.UploadResp) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// GetUploadStats indicates an expected call of GetUploadStats. -func (mr *MockUploadStatsMockRecorder) GetUploadStats(arg0 any) *gomock.Call { +// UploadProfiles indicates an expected call of UploadProfiles. +func (mr *MockKlaviyoAPIServiceMockRecorder) UploadProfiles(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadStats", reflect.TypeOf((*MockUploadStats)(nil).GetUploadStats), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadProfiles", reflect.TypeOf((*MockKlaviyoAPIService)(nil).UploadProfiles), arg0) } diff --git a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go index 98985d454d..588beaf2b2 100644 --- a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go +++ b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bingads_test.go @@ -539,14 +539,14 @@ var _ = Describe("Bing ads Offline Conversions", func() { Expect(err).To(BeNil()) }) - It("Transform() Test -> conversionAdjustedTime not available", func() { + It("Transform() Test -> adjustedConversionTime not available", func() { job := &jobsdb.JobT{ EventPayload: []byte("{\"type\": \"record\", \"action\": \"update\", \"fields\": {\"conversionName\": \"Test-Integration\", \"conversionTime\": \"5/22/2023 6:27:54 AM\", \"conversionValue\": \"100\", \"microsoftClickId\": \"click_id\", \"conversionCurrencyCode\": \"USD\"}}"), } uploader := &BingAdsBulkUploader{} // Execute _, err := uploader.Transform(job) - expectedResult := fmt.Errorf(" conversionAdjustedTime field not defined") + expectedResult := fmt.Errorf(" adjustedConversionTime field not defined") Expect(err.Error()).To(Equal(expectedResult.Error())) }) }) diff --git a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go index 7ea421d839..e7cb07f010 100644 --- a/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go +++ b/router/batchrouter/asyncdestinationmanager/bing-ads/offline-conversions/bulk_uploader.go @@ -69,7 +69,7 @@ func (b *BingAdsBulkUploader) Transform(job *jobsdb.JobT) (string, error) { } if event.Action != "insert" { // validate for adjusted time - err := validateField(fields, "conversionAdjustedTime") + err := validateField(fields, "adjustedConversionTime") if err != nil { return payload, err } diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go new file mode 100644 index 0000000000..addf95acf6 --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go @@ -0,0 +1,131 @@ +package klaviyobulkupload + +import ( + "bytes" + "fmt" + "io" + "net/http" + "time" + + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" +) + +const ( + KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/" +) + +type KlaviyoAPIServiceImpl struct { + client *http.Client + PrivateAPIKey string + logger logger.Logger + statsFactory stats.Stats + statLabels stats.Tags +} + +func setRequestHeaders(req *http.Request, apiKey string) { + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Klaviyo-API-Key "+apiKey) + req.Header.Set("revision", "2024-05-15") +} + +func (k *KlaviyoAPIServiceImpl) UploadProfiles(profiles Payload) (*UploadResp, error) { + payloadJSON, err := json.Marshal(profiles) + if err != nil { + return nil, err + } + payloadSizeStat := k.statsFactory.NewTaggedStat("payload_size", stats.HistogramType, k.statLabels) + payloadSizeStat.Observe(float64(len(payloadJSON))) + + startTime := time.Now() + req, err := http.NewRequest("POST", KlaviyoAPIURL, bytes.NewBuffer(payloadJSON)) + if err != nil { + return nil, err + } + setRequestHeaders(req, k.PrivateAPIKey) + resp, err := k.client.Do(req) + if err != nil { + return nil, err + } + + var uploadResp UploadResp + uploadBodyBytes, _ := io.ReadAll(resp.Body) + defer func() { _ = resp.Body.Close() }() + uploadRespErr := json.Unmarshal(uploadBodyBytes, &uploadResp) + if uploadRespErr != nil { + return nil, uploadRespErr + } + if len(uploadResp.Errors) > 0 { + return &uploadResp, fmt.Errorf("upload failed with errors: %+v", uploadResp.Errors) + } + uploadTimeStat := k.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, k.statLabels) + uploadTimeStat.Since(startTime) + + return &uploadResp, uploadRespErr +} + +func (k *KlaviyoAPIServiceImpl) GetUploadStatus(importId string) (*PollResp, error) { + pollUrl := KlaviyoAPIURL + importId + req, err := http.NewRequest("GET", pollUrl, nil) + if err != nil { + return nil, err + } + setRequestHeaders(req, k.PrivateAPIKey) + resp, err := k.client.Do(req) + if err != nil { + return nil, err + } + var pollBodyBytes []byte + var pollresp PollResp + pollBodyBytes, _ = io.ReadAll(resp.Body) + defer func() { _ = resp.Body.Close() }() + + pollRespErr := json.Unmarshal(pollBodyBytes, &pollresp) + if pollRespErr != nil { + return nil, pollRespErr + } + if len(pollresp.Errors) > 0 { + return &pollresp, fmt.Errorf("GetUploadStatus failed with errors: %+v", pollresp.Errors) + } + return &pollresp, pollRespErr +} + +func (k *KlaviyoAPIServiceImpl) GetUploadErrors(importId string) (*UploadStatusResp, error) { + importErrorUrl := KlaviyoAPIURL + importId + "/import-errors" + req, err := http.NewRequest("GET", importErrorUrl, nil) + if err != nil { + return nil, err + } + setRequestHeaders(req, k.PrivateAPIKey) + resp, err := k.client.Do(req) + if err != nil { + return nil, err + } + var importErrorBodyBytes []byte + var importErrorResp UploadStatusResp + importErrorBodyBytes, _ = io.ReadAll(resp.Body) + defer func() { _ = resp.Body.Close() }() + importErrorRespErr := json.Unmarshal(importErrorBodyBytes, &importErrorResp) + if importErrorRespErr != nil { + return nil, importErrorRespErr + } + if len(importErrorResp.Errors) > 0 { + return &importErrorResp, fmt.Errorf("GetUploadErrors failed with errors: %+v", importErrorResp.Errors) + } + return &importErrorResp, importErrorRespErr +} + +func NewKlaviyoAPIService(destination *backendconfig.DestinationT, logger logger.Logger, statsFactory stats.Stats) KlaviyoAPIService { + return &KlaviyoAPIServiceImpl{ + client: http.DefaultClient, + PrivateAPIKey: destination.Config["privateApiKey"].(string), + logger: logger, + statsFactory: statsFactory, + statLabels: stats.Tags{ + "module": "batch_router", + "destType": destination.Name, + "destID": destination.ID, + }, + } +} diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go index 6c98da9337..7209eb9954 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go @@ -2,18 +2,14 @@ package klaviyobulkupload import ( "bufio" - "bytes" - "errors" "fmt" - "io" - "net/http" "os" "strconv" "strings" - "time" jsoniter "github.com/json-iterator/go" "github.com/samber/lo" + "github.com/tidwall/gjson" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -24,10 +20,10 @@ import ( ) const ( - KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/" - BATCHSIZE = 10000 - MAXPAYLOADSIZE = 4900000 - IMPORT_ID_SEPARATOR = ":" + BATCHSIZE = 10000 + MAXALLOWEDPROFILESIZE = 512000 + MAXPAYLOADSIZE = 4900000 + IMPORT_ID_SEPARATOR = ":" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary @@ -36,7 +32,7 @@ func createFinalPayload(combinedProfiles []Profile, listId string) Payload { payload := Payload{ Data: Data{ Type: "profile-bulk-import-job", - Attributes: Attributes{ + Attributes: PayloadAttributes{ Profiles: Profiles{ Data: combinedProfiles, }, @@ -60,12 +56,14 @@ func createFinalPayload(combinedProfiles []Profile, listId string) Payload { return payload } -func NewManager(logger logger.Logger, statsFactory stats.Stats, destination *backendconfig.DestinationT) (*KlaviyoBulkUploader, error) { +func NewManager(logger logger.Logger, StatsFactory stats.Stats, destination *backendconfig.DestinationT) (*KlaviyoBulkUploader, error) { + klaviyoLogger := logger.Child("KlaviyoBulkUpload").Child("KlaviyoBulkUploader") return &KlaviyoBulkUploader{ - destName: destination.DestinationDefinition.Name, - destinationConfig: destination.Config, - logger: logger.Child("KlaviyoBulkUpload").Child("KlaviyoBulkUploader"), - statsFactory: statsFactory, + DestName: destination.DestinationDefinition.Name, + DestinationConfig: destination.Config, + Logger: klaviyoLogger, + StatsFactory: StatsFactory, + KlaviyoAPIService: NewKlaviyoAPIService(destination, klaviyoLogger, StatsFactory), }, nil } @@ -80,7 +78,7 @@ func chunkBySizeAndElements(combinedProfiles []Profile, maxBytes, maxElements in return nil, fmt.Errorf("failed to marshal profile: %w", err) } - profileSize := len(profileJSON) + profileSize := len(profileJSON) + 1 // +1 for comma character if (chunkSize+profileSize >= maxBytes || len(chunk) == maxElements) && len(chunk) > 0 { chunks = append(chunks, chunk) @@ -100,9 +98,6 @@ func chunkBySizeAndElements(combinedProfiles []Profile, maxBytes, maxElements in } func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse { - client := &http.Client{} - destConfig := kbu.destinationConfig - privateApiKey, _ := destConfig["privateApiKey"].(string) importIds := strings.Split(pollInput.ImportId, IMPORT_ID_SEPARATOR) importStatuses := make(map[string]string) failedImports := make([]string, 0) @@ -115,47 +110,15 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat for importId, status := range importStatuses { if status != "complete" { allComplete = false - pollUrl := KlaviyoAPIURL + importId - req, err := http.NewRequest("GET", pollUrl, nil) + pollresp, err := kbu.KlaviyoAPIService.GetUploadStatus(importId) if err != nil { return common.PollStatusResponse{ - Complete: true, - InProgress: false, - HasFailed: true, - Error: err.Error(), - } - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Klaviyo-API-Key "+privateApiKey) - req.Header.Set("revision", "2024-05-15") - resp, err := client.Do(req) - if err != nil { - return common.PollStatusResponse{ - Complete: true, - InProgress: false, - StatusCode: 0, - HasFailed: true, - Error: err.Error(), - HasWarning: false, + Complete: true, + HasFailed: true, + Error: err.Error(), } } - var pollBodyBytes []byte - var pollresp PollResp - pollBodyBytes, _ = io.ReadAll(resp.Body) - defer func() { _ = resp.Body.Close() }() - - pollRespErr := json.Unmarshal(pollBodyBytes, &pollresp) - if pollRespErr != nil { - return common.PollStatusResponse{ - Complete: true, - InProgress: false, - StatusCode: 0, - HasFailed: true, - Error: pollRespErr.Error(), - HasWarning: false, - } - } // Update the status in the map importStatuses[importId] = pollresp.Data.Attributes.Status @@ -190,9 +153,6 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat } func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUploadStatsInput) common.GetUploadStatsResponse { - client := &http.Client{} - destConfig := kbu.destinationConfig - privateApiKey, _ := destConfig["privateApiKey"].(string) pollResultImportIds := strings.Split(UploadStatsInput.FailedJobURLs, IMPORT_ID_SEPARATOR) // make a map of jobId to error reason @@ -204,21 +164,12 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload jobIDs = append(jobIDs, job.JobID) } - ErrorMap := kbu.jobIdToIdentifierMap + ErrorMap := kbu.JobIdToIdentifierMap var successKeys []int64 var failedJobIds []int64 for _, pollResultImportId := range pollResultImportIds { - importErrorUrl := KlaviyoAPIURL + pollResultImportId + "/import-errors" - req, err := http.NewRequest("GET", importErrorUrl, nil) - if err != nil { - return common.GetUploadStatsResponse{} - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", - "Klaviyo-API-Key "+privateApiKey) - req.Header.Set("revision", "2024-05-15") - resp, err := client.Do(req) + uploadStatsResp, err := kbu.KlaviyoAPIService.GetUploadErrors(pollResultImportId) if err != nil { return common.GetUploadStatsResponse{ StatusCode: 400, @@ -226,18 +177,6 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload } } - var uploadStatsBodyBytes []byte - var uploadStatsResp UploadStatusResp - uploadStatsBodyBytes, _ = io.ReadAll(resp.Body) - defer func() { _ = resp.Body.Close() }() - - uploadStatsBodyBytesErr := json.Unmarshal(uploadStatsBodyBytes, &uploadStatsResp) - if uploadStatsBodyBytesErr != nil { - return common.GetUploadStatsResponse{ - StatusCode: 400, - Error: uploadStatsBodyBytesErr.Error(), - } - } // Iterate over the Data array and get the jobId and error detail and store in jobIdToErrorMap for _, item := range uploadStatsResp.Data { orgPayload := item.Attributes.OriginalPayload @@ -266,7 +205,7 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload } func (kbu *KlaviyoBulkUploader) generateKlaviyoErrorOutput(errorString string, err error, importingJobIds []int64, destinationID string) common.AsyncUploadOutput { - eventsAbortedStat := kbu.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ + eventsAbortedStat := kbu.StatsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ "module": "batch_router", "destType": "KLAVIYO_BULK_UPLOAD", }) @@ -279,11 +218,7 @@ func (kbu *KlaviyoBulkUploader) generateKlaviyoErrorOutput(errorString string, e } } -func (kbu *KlaviyoBulkUploader) ExtractProfile(input Input) Profile { - Message := input.Message - Body := Message.Body - Json := Body.JSON - Data := Json.Data +func (kbu *KlaviyoBulkUploader) ExtractProfile(Data Data) Profile { Attributes := Data.Attributes if len(Attributes.Profiles.Data) == 0 { return Profile{} @@ -293,10 +228,10 @@ func (kbu *KlaviyoBulkUploader) ExtractProfile(input Input) Profile { jobIdentifier := profileObject.Attributes.JobIdentifier jobIdentifierArray := strings.Split(jobIdentifier, ":") jobIdentifierValue, _ := strconv.ParseInt(jobIdentifierArray[1], 10, 64) - if kbu.jobIdToIdentifierMap == nil { - kbu.jobIdToIdentifierMap = make(map[string]int64) + if kbu.JobIdToIdentifierMap == nil { + kbu.JobIdToIdentifierMap = make(map[string]int64) } - kbu.jobIdToIdentifierMap[jobIdentifierArray[0]] = jobIdentifierValue + kbu.JobIdToIdentifierMap[jobIdentifierArray[0]] = jobIdentifierValue // delete jobIdentifier from the attributes map as it is not required in the final payload profileObject.Attributes.JobIdentifier = "" @@ -305,9 +240,10 @@ func (kbu *KlaviyoBulkUploader) ExtractProfile(input Input) Profile { } func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput { - startTime := time.Now() destination := asyncDestStruct.Destination var failedJobs []int64 + var abortedJobs []int64 + var abortReason string var successJobs []int64 filePath := asyncDestStruct.FileName importingJobIDs := asyncDestStruct.ImportingJobIDs @@ -317,6 +253,7 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS statLabels := stats.Tags{ "module": "batch_router", "destType": destType, + "destID": destinationID, } file, err := os.Open(filePath) if err != nil { @@ -325,68 +262,49 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS defer file.Close() var combinedProfiles []Profile scanner := bufio.NewScanner(file) + profileSizeStat := kbu.StatsFactory.NewTaggedStat("profile_size", stats.HistogramType, statLabels) for scanner.Scan() { - var input Input + var data Data + var metadata Metadata line := scanner.Text() - err := json.Unmarshal([]byte(line), &input) + + err := json.Unmarshal([]byte(gjson.Get(line, "message.body.JSON.data").String()), &data) if err != nil { - return kbu.generateKlaviyoErrorOutput("Error while parsing JSON.", err, importingJobIDs, destinationID) + return kbu.generateKlaviyoErrorOutput("Error while parsing JSON Data.", err, importingJobIDs, destinationID) + } + err = json.Unmarshal([]byte(gjson.Get(line, "metadata").String()), &metadata) + if err != nil { + return kbu.generateKlaviyoErrorOutput("Error while parsing JSON Metadata.", err, importingJobIDs, destinationID) + } + profileStructure := kbu.ExtractProfile(data) + // if profileStructure length is more than 500 kB, throw an error + profileStructureJSON, _ := json.Marshal(profileStructure) + profileSize := float64(len(profileStructureJSON)) + profileSizeStat.Observe(float64(profileSize)) // Record the size in the histogram + if float64(len(profileStructureJSON)) >= MAXALLOWEDPROFILESIZE { + abortReason = "Error while marshaling profiles. The profile size exceeds Klaviyo's limit of 500 kB for a single profile." + abortedJobs = append(abortedJobs, int64(metadata.JobID)) + continue } - profileStructure := kbu.ExtractProfile(input) combinedProfiles = append(combinedProfiles, profileStructure) } chunks, _ := chunkBySizeAndElements(combinedProfiles, MAXPAYLOADSIZE, BATCHSIZE) - eventsSuccessStat := kbu.statsFactory.NewTaggedStat("success_job_count", stats.CountType, statLabels) + eventsSuccessStat := kbu.StatsFactory.NewTaggedStat("success_job_count", stats.CountType, statLabels) var importIds []string // DelimitedImportIds is : separated importIds for idx, chunk := range chunks { combinedPayload := createFinalPayload(chunk, listId) - - // Convert combined payload to JSON - outputJSON, err := json.Marshal(combinedPayload) - if err != nil { - return kbu.generateKlaviyoErrorOutput("Error while marshaling combined JSON.", err, importingJobIDs, destinationID) - } - uploadURL := KlaviyoAPIURL - client := &http.Client{} - req, err := http.NewRequest("POST", uploadURL, bytes.NewBuffer(outputJSON)) - if err != nil { - return kbu.generateKlaviyoErrorOutput("Error while creating request.", err, importingJobIDs, destinationID) - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Klaviyo-API-Key "+destination.Config["privateApiKey"].(string)) - req.Header.Set("revision", "2024-05-15") - - uploadTimeStat := kbu.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, statLabels) - payloadSizeStat := kbu.statsFactory.NewTaggedStat("payload_size", stats.HistogramType, statLabels) - payloadSizeStat.Observe(float64(len(outputJSON))) - - resp, err := client.Do(req) + uploadResp, err := kbu.KlaviyoAPIService.UploadProfiles(combinedPayload) if err != nil { failedJobs = append(failedJobs, importingJobIDs[idx]) - kbu.logger.Error("Error while sending request.", err) + kbu.Logger.Error("Error while uploading profiles", err, uploadResp.Errors) + continue } - var bodyBytes []byte - bodyBytes, _ = io.ReadAll(resp.Body) - defer func() { _ = resp.Body.Close() }() - uploadTimeStat.Since(startTime) - - if resp.StatusCode != 202 { - failedJobs = append(failedJobs, importingJobIDs[idx]) - kbu.logger.Error("Got non 202 as statusCode.", errors.New(string(bodyBytes))) - } - var uploadresp UploadResp - uploadRespErr := json.Unmarshal((bodyBytes), &uploadresp) - if uploadRespErr != nil { - failedJobs = append(failedJobs, importingJobIDs[idx]) - kbu.logger.Error("Error while unmarshaling response.", uploadRespErr) - } - importIds = append(importIds, uploadresp.Data.Id) + importIds = append(importIds, uploadResp.Data.Id) } importParameters, err := json.Marshal(common.ImportParameters{ ImportId: strings.Join(importIds, IMPORT_ID_SEPARATOR), @@ -400,6 +318,8 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS return common.AsyncUploadOutput{ ImportingParameters: importParameters, FailedJobIDs: failedJobs, + AbortJobIDs: abortedJobs, + AbortReason: abortReason, FailedCount: len(failedJobs), ImportingJobIDs: successJobs, DestinationID: destination.ID, diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go index 330dbcc937..ecc38caaea 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go @@ -2,24 +2,28 @@ package klaviyobulkupload_test import ( "encoding/json" - "reflect" + "fmt" + "net/http" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/logger" - "go.uber.org/mock/gomock" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" - mocks "github.com/rudderlabs/rudder-server/mocks/router/klaviyobulkupload" + mockAPIService "github.com/rudderlabs/rudder-server/mocks/router/klaviyobulkupload" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" - "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload" + klaviyobulkupload "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload" ) +var currentDir, _ = os.Getwd() + var destination = &backendconfig.DestinationT{ ID: "1", Name: "KLAVIYO_BULK_UPLOAD", @@ -27,7 +31,7 @@ var destination = &backendconfig.DestinationT{ Name: "KLAVIYO_BULK_UPLOAD", }, Config: map[string]interface{}{ - "privateApiKey": "1234", + "privateApiKey": "1223", }, Enabled: true, WorkspaceID: "1", @@ -44,132 +48,377 @@ func TestUpload(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockUploader := mocks.NewMockUploader(ctrl) - - expectedOutput := common.AsyncUploadOutput{ - ImportingJobIDs: []int64{1, 2, 3}, + mockKlaviyoAPIService := mockAPIService.NewMockKlaviyoAPIService(ctrl) + testLogger := logger.NewLogger().Child("klaviyo-bulk-upload-test") + + uploader := klaviyobulkupload.KlaviyoBulkUploader{ + DestName: "Klaviyo Bulk Upload", + DestinationConfig: destination.Config, + Logger: testLogger, + StatsFactory: stats.NOP, + KlaviyoAPIService: mockKlaviyoAPIService, + JobIdToIdentifierMap: map[string]int64{ + "111222334": 1, + "222333445": 2, + }, } - mockUploader.EXPECT().Upload(gomock.Any()).Return(expectedOutput).Times(1) + // Create a temporary file with test data + tempFile, err := os.CreateTemp("", "test_upload_*.jsonl") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tempFile.Name()) - output := mockUploader.Upload(&common.AsyncDestinationStruct{ - ImportingJobIDs: []int64{1, 2, 3}, + testData := []byte(`{"message":{"body":{"JSON":{"data":{"type":"profile-bulk-import-job","attributes":{"profiles":{"data":[{"type":"profile","attributes":{"email":"qwe22@mail.com","first_name":"Testqwe0022","last_name":"user","phone_number":"+919902330123","location":{"address1":"dallas street","address2":"oppenheimer market","city":"delhi","country":"India","ip":"213.5.6.41"},"anonymous_id":"user1","jobIdentifier":"user1:1"}}]}},"relationships":{"lists":{"data":[{"type":"list","id":"list101"}]}}}}}},"metadata":{"jobId":1}}`) + _, err = tempFile.Write(testData) + if err != nil { + t.Fatal(err) + } + tempFile.Close() + + t.Run("Successful Upload", func(t *testing.T) { + mockKlaviyoAPIService.EXPECT(). + UploadProfiles(gomock.Any()). + Return(&klaviyobulkupload.UploadResp{ + Data: struct { + Id string "json:\"id\"" + }{ + Id: "importId1", + }, + Errors: nil, + }, nil) + + asyncDestStruct := &common.AsyncDestinationStruct{ + Destination: destination, + FileName: tempFile.Name(), + ImportingJobIDs: []int64{1}, + } + + output := uploader.Upload(asyncDestStruct) + assert.NotNil(t, output) + assert.Equal(t, destination.ID, output.DestinationID) + assert.Empty(t, output.FailedJobIDs) + assert.Empty(t, output.AbortJobIDs) + assert.Empty(t, output.AbortReason) + assert.NotEmpty(t, output.ImportingJobIDs) }) - if !reflect.DeepEqual(output, expectedOutput) { - t.Errorf("Expected %v but got %v", expectedOutput, output) - } + t.Run("Unsuccessful Upload", func(t *testing.T) { + mockKlaviyoAPIService.EXPECT(). + UploadProfiles(gomock.Any()). + Return(&klaviyobulkupload.UploadResp{ + Errors: []klaviyobulkupload.ErrorDetail{ + {Detail: "upload failed"}, + }, + }, fmt.Errorf("upload failed with errors: %+v", []klaviyobulkupload.ErrorDetail{ + {Detail: "upload failed"}, + })) + + asyncDestStruct := &common.AsyncDestinationStruct{ + Destination: destination, + FileName: tempFile.Name(), + ImportingJobIDs: []int64{1}, + } + + output := uploader.Upload(asyncDestStruct) + assert.NotNil(t, output) + assert.Equal(t, destination.ID, output.DestinationID) + assert.NotEmpty(t, output.FailedJobIDs) + assert.Empty(t, output.ImportingJobIDs) + }) } -// File is successfully opened and read line by line -func TestFileReadSuccess(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockUploader := mocks.NewMockUploader(ctrl) +func TestExtractProfileValidInput(t *testing.T) { + kbu := klaviyobulkupload.KlaviyoBulkUploader{} - destination := &backendconfig.DestinationT{ - DestinationDefinition: backendconfig.DestinationDefinitionT{ - Name: "Klaviyo", + dataPayloadJSON := `{ + "attributes": { + "profiles": { + "data": [ + { + "attributes": { + "anonymous_id": 111222334, + "email": "qwe122@mail.com", + "first_name": "Testqwe0122", + "jobIdentifier": "111222334:1", + "last_name": "user0122", + "location": { + "city": "delhi", + "country": "India", + "ip": "213.5.6.41" + }, + "phone_number": "+919912000123" + }, + "id": "111222334", + "type": "profile" + } + ] + } }, - Config: map[string]interface{}{ - "listId": "123", - "privateApiKey": "test-api-key", + "relationships": { + "lists": { + "data": [ + { + "id": "UKth4J", + "type": "list" + } + ] + } }, - ID: "dest-123", + "type": "profile-bulk-import-job" + }` + var data klaviyobulkupload.Data + err := json.Unmarshal([]byte(dataPayloadJSON), &data) + if err != nil { + t.Errorf("json.Unmarshal failed: %v", err) } + expectedProfile := `{"attributes":{"email":"qwe122@mail.com","phone_number":"+919912000123","first_name":"Testqwe0122","last_name":"user0122","location":{"city":"delhi","country":"India","ip":"213.5.6.41"}},"id":"111222334","type":"profile"}` + result := kbu.ExtractProfile(data) + profileJson, _ := json.Marshal(result) + assert.JSONEq(t, expectedProfile, string(profileJson)) +} + +// Test case for doing integration test of Upload method +func TestUploadIntegration(t *testing.T) { + t.Skip("Skipping this integ test for now.") + kbu, err := klaviyobulkupload.NewManager(logger.NOP, stats.NOP, destination) + assert.NoError(t, err) + assert.NotNil(t, kbu) + asyncDestStruct := &common.AsyncDestinationStruct{ Destination: destination, - FileName: "testfile.txt", + FileName: filepath.Join(currentDir, "testdata/uploadData.jsonl"), ImportingJobIDs: []int64{1, 2, 3}, } - expectedOutput := common.AsyncUploadOutput{ - ImportingJobIDs: []int64{1, 2, 3}, - } - - mockUploader.EXPECT().Upload(asyncDestStruct).Return(expectedOutput).Times(1) - - output := mockUploader.Upload(asyncDestStruct) - - if !reflect.DeepEqual(output, expectedOutput) { - t.Errorf("Expected %v but got %v", expectedOutput, output) - } + output := kbu.Upload(asyncDestStruct) + assert.NotNil(t, output) + assert.Equal(t, destination.ID, output.DestinationID) + assert.Empty(t, output.FailedJobIDs) + assert.Empty(t, output.AbortJobIDs) + assert.Empty(t, output.AbortReason) + assert.NotEmpty(t, output.ImportingJobIDs) } func TestPoll(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockUploader := mocks.NewMockPoller(ctrl) - - pollInput := common.AsyncPoll{ - ImportId: "123", - } - - expectedOutput := common.PollStatusResponse{ - Complete: true, - InProgress: false, - StatusCode: 200, - HasFailed: false, - HasWarning: false, + mockKlaviyoAPIService := mockAPIService.NewMockKlaviyoAPIService(ctrl) + testLogger := logger.NewLogger().Child("klaviyo-bulk-upload-test") + + uploader := klaviyobulkupload.KlaviyoBulkUploader{ + DestName: "Klaviyo Bulk Upload", + DestinationConfig: destination.Config, + Logger: testLogger, + StatsFactory: stats.NOP, + KlaviyoAPIService: mockKlaviyoAPIService, + JobIdToIdentifierMap: map[string]int64{ + "111222334": 1, + "222333445": 2, + }, } - mockUploader.EXPECT().Poll(pollInput).Return(expectedOutput).Times(1) - - output := mockUploader.Poll(pollInput) + t.Run("Successful Poll", func(t *testing.T) { + pollStatusResp := &klaviyobulkupload.PollResp{ + Data: struct { + Id string `json:"id"` + Attributes struct { + Total_count int `json:"total_count"` + Completed_count int `json:"completed_count"` + Failed_count int `json:"failed_count"` + Status string `json:"status"` + } `json:"attributes"` + }{ + Id: "importId1", + Attributes: struct { + Total_count int `json:"total_count"` + Completed_count int `json:"completed_count"` + Failed_count int `json:"failed_count"` + Status string `json:"status"` + }{ + Total_count: 1, + Completed_count: 1, + Failed_count: 0, + Status: "complete", + }, + }, + } + + mockKlaviyoAPIService.EXPECT(). + GetUploadStatus("importId1"). + Return(pollStatusResp, nil) + + pollInput := common.AsyncPoll{ + ImportId: "importId1", + } + + jobStatus := uploader.Poll(pollInput) + assert.NotNil(t, jobStatus) + assert.Equal(t, true, jobStatus.Complete) + assert.Equal(t, http.StatusOK, jobStatus.StatusCode) + assert.Equal(t, false, jobStatus.HasFailed) + assert.Equal(t, false, jobStatus.HasWarning) + assert.Empty(t, jobStatus.FailedJobURLs) + assert.Empty(t, jobStatus.WarningJobURLs) + assert.Empty(t, jobStatus.Error) + }) - if !reflect.DeepEqual(output, expectedOutput) { - t.Errorf("Expected %v but got %v", expectedOutput, output) - } + t.Run("Poll with Errors", func(t *testing.T) { + pollStatusFailedResp := &klaviyobulkupload.PollResp{ + Data: struct { + Id string `json:"id"` + Attributes struct { + Total_count int `json:"total_count"` + Completed_count int `json:"completed_count"` + Failed_count int `json:"failed_count"` + Status string `json:"status"` + } `json:"attributes"` + }{ + Id: "importId2", + Attributes: struct { + Total_count int `json:"total_count"` + Completed_count int `json:"completed_count"` + Failed_count int `json:"failed_count"` + Status string `json:"status"` + }{ + Total_count: 1, + Completed_count: 0, + Failed_count: 1, + Status: "complete", + }, + }, + } + + mockKlaviyoAPIService.EXPECT(). + GetUploadStatus("importId2"). + Return(pollStatusFailedResp, fmt.Errorf("The import job failed")) + + pollInput := common.AsyncPoll{ + ImportId: "importId2", + } + + jobStatus := uploader.Poll(pollInput) + assert.NotNil(t, jobStatus) + assert.Equal(t, true, jobStatus.Complete) + assert.Equal(t, true, jobStatus.HasFailed) + assert.Equal(t, false, jobStatus.HasWarning) + assert.Empty(t, jobStatus.WarningJobURLs) + assert.Equal(t, "The import job failed", jobStatus.Error) + }) } func TestGetUploadStats(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockUploader := mocks.NewMockUploadStats(ctrl) - - importedJobSlice := []int64{1, 2, 3, 4, 5, 6} - - jobs := make([]*jobsdb.JobT, len(importedJobSlice)) - for i := range importedJobSlice { - job := jobsdb.JobT{} - jobs[i] = &job - } - - statsInput := common.GetUploadStatsInput{ - ImportingList: jobs, - } - - expectedOutput := common.GetUploadStatsResponse{ - StatusCode: 200, - Metadata: common.EventStatMeta{ - FailedKeys: []int64{1, 2, 3}, - SucceededKeys: []int64{4, 5, 6}, + mockKlaviyoAPIService := mockAPIService.NewMockKlaviyoAPIService(ctrl) + testLogger := logger.NewLogger().Child("klaviyo-bulk-upload-test") + + uploader := klaviyobulkupload.KlaviyoBulkUploader{ + DestName: "Klaviyo Bulk Upload", + DestinationConfig: destination.Config, + Logger: testLogger, + StatsFactory: stats.NOP, + KlaviyoAPIService: mockKlaviyoAPIService, + JobIdToIdentifierMap: map[string]int64{ + "111222334": 1, + "222333445": 2, }, } - mockUploader.EXPECT().GetUploadStats(statsInput).Return(expectedOutput).Times(1) - - output := mockUploader.GetUploadStats(statsInput) - - if !reflect.DeepEqual(output, expectedOutput) { - t.Errorf("Expected %v but got %v", expectedOutput, output) - } -} - -func TestExtractProfileValidInput(t *testing.T) { - kbu := klaviyobulkupload.KlaviyoBulkUploader{} + t.Run("Failure GetUploadStats: Import Job Failed", func(t *testing.T) { + uploadStatsResp := &klaviyobulkupload.UploadStatusResp{ + Data: []struct { + Type string `json:"type"` + ID string `json:"id"` + Attributes struct { + Code string `json:"code"` + Title string `json:"title"` + Detail string `json:"detail"` + Source struct { + Pointer string `json:"pointer"` + } `json:"source"` + OriginalPayload struct { + Id string `json:"id"` + AnonymousId string `json:"anonymous_id"` + } `json:"original_payload"` + } `json:"attributes"` + Links struct { + Self string `json:"self"` + } `json:"links"` + }{ + { + Type: "error", + ID: "1", + Attributes: struct { + Code string `json:"code"` + Title string `json:"title"` + Detail string `json:"detail"` + Source struct { + Pointer string `json:"pointer"` + } `json:"source"` + OriginalPayload struct { + Id string `json:"id"` + AnonymousId string `json:"anonymous_id"` + } `json:"original_payload"` + }{ + Code: "400", + Title: "Bad Request", + Detail: "The import job failed", + Source: struct { + Pointer string `json:"pointer"` + }{Pointer: "importId1"}, + OriginalPayload: struct { + Id string `json:"id"` + AnonymousId string `json:"anonymous_id"` + }{Id: "1", AnonymousId: "111222334"}, + }, + Links: struct { + Self string `json:"self"` + }{Self: "selfLink"}, + }, + }, + } + + mockKlaviyoAPIService.EXPECT(). + GetUploadErrors("importId1"). + Return(uploadStatsResp, nil) + + uploadStatsInput := common.GetUploadStatsInput{ + FailedJobURLs: "importId1", + ImportingList: []*jobsdb.JobT{ + {JobID: 1}, + {JobID: 2}, + }, + } + + statsResponse := uploader.GetUploadStats(uploadStatsInput) + assert.NotNil(t, statsResponse) + assert.Equal(t, http.StatusOK, statsResponse.StatusCode) + // assert.Equal(t, "The import job failed", statsResponse.Error) + assert.NotEmpty(t, statsResponse.Metadata.FailedKeys) + assert.NotEmpty(t, statsResponse.Metadata.FailedReasons) + assert.NotEmpty(t, statsResponse.Metadata.SucceededKeys) + }) - inputPayloadJSON := `{"message":{"body":{"FORM":{},"JSON":{"data":{"attributes":{"profiles":{"data":[{"attributes":{"anonymous_id":111222334,"email":"qwe122@mail.com","first_name":"Testqwe0122","jobIdentifier":"111222334:1","last_name":"user0122","location":{"city":"delhi","country":"India","ip":"213.5.6.41"},"phone_number":"+919912000123"},"id":"111222334","type":"profile"}]}},"relationships":{"lists":{"data":[{"id":"UKth4J","type":"list"}]}},"type":"profile-bulk-import-job"}},"JSON_ARRAY":{},"XML":{}},"endpoint":"","files":{},"headers":{},"method":"POST","params":{},"type":"REST","userId":"","version":"1"},"metadata":{"job_id":1}}` - var inputPayload klaviyobulkupload.Input - err := json.Unmarshal([]byte(inputPayloadJSON), &inputPayload) - if err != nil { - t.Errorf("json.Unmarshal failed: %v", err) - } - expectedProfile := `{"attributes":{"email":"qwe122@mail.com","phone_number":"+919912000123","first_name":"Testqwe0122","last_name":"user0122","location":{"city":"delhi","country":"India","ip":"213.5.6.41"}},"id":"111222334","type":"profile"}` - result := kbu.ExtractProfile(inputPayload) - profileJson, _ := json.Marshal(result) - assert.JSONEq(t, expectedProfile, string(profileJson)) + t.Run("GetUploadStats with Errors", func(t *testing.T) { + mockKlaviyoAPIService.EXPECT(). + GetUploadErrors("importId1"). + Return(nil, fmt.Errorf("some error")) + + uploadStatsInput := common.GetUploadStatsInput{ + FailedJobURLs: "importId1", + ImportingList: []*jobsdb.JobT{ + {JobID: 1}, + {JobID: 2}, + }, + } + + statsResponse := uploader.GetUploadStats(uploadStatsInput) + assert.NotNil(t, statsResponse) + assert.Equal(t, http.StatusBadRequest, statsResponse.StatusCode) + assert.Equal(t, "some error", statsResponse.Error) + }) } diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/testdata/uploadData.jsonl b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/testdata/uploadData.jsonl new file mode 100644 index 0000000000..ac190e297f --- /dev/null +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/testdata/uploadData.jsonl @@ -0,0 +1,5 @@ +{"message":{"body":{"JSON":{"data":{"type":"profile-bulk-import-job","attributes":{"profiles":{"data":[{"type":"profile","attributes":{"email":"qwe22@mail.com","first_name":"Testqwe0022","last_name":"user","phone_number":"+919902330123","location":{"address1":"dallas street","address2":"oppenheimer market","city":"delhi","country":"India","ip":"213.5.6.41"},"anonymous_id":"user1","jobIdentifier":"user1:1"}}]}},"relationships":{"lists":{"data":[{"type":"list","id":"list101"}]}}}}}},"metadata":{"jobId":1}} +{"message":{"body":{"JSON":{"data":{"type":"profile-bulk-import-job","attributes":{"profiles":{"data":[{"type":"profile","attributes":{"email":"example@mail.com","first_name":"Example","last_name":"User","phone_number":"+919902330124","location":{"address1":"main street","address2":"central market","city":"mumbai","country":"India","ip":"213.5.6.42"},"anonymous_id":"user2","jobIdentifier":"user2:2"}}]}},"relationships":{"lists":{"data":[{"type":"list","id":"list102"}]}}}}}},"metadata":{"jobId":2}} +{"message":{"body":{"JSON":{"data":{"type":"profile-bulk-import-job","attributes":{"profiles":{"data":[{"type":"profile","attributes":{"email":"sample@mail.com","first_name":"Sample","last_name":"User","phone_number":"+919902330125","location":{"address1":"high street","address2":"local market","city":"bangalore","country":"India","ip":"213.5.6.43"},"anonymous_id":"user3","jobIdentifier":"user3:3"}}]}},"relationships":{"lists":{"data":[{"type":"list","id":"list103"}]}}}}}},"metadata":{"jobId":3}} +{"message":{"body":{"JSON":{"data":{"type":"profile-bulk-import-job","attributes":{"profiles":{"data":[{"type":"profile","attributes":{"email":"test@mail.com","first_name":"Test","last_name":"User","phone_number":"+919902330126","location":{"address1":"park street","address2":"mall market","city":"chennai","country":"India","ip":"213.5.6.44"},"anonymous_id":"user4","jobIdentifier":"user4:4"}}]}},"relationships":{"lists":{"data":[{"type":"list","id":"list104"}]}}}}}},"metadata":{"jobId":4}} +{"message":{"body":{"JSON":{"data":{"type":"profile-bulk-import-job","attributes":{"profiles":{"data":[{"type":"profile","attributes":{"email":"demo@mail.com","first_name":"Demo","last_name":"User","phone_number":"+919902330127","location":{"address1":"lake street","address2":"supermarket","city":"kolkata","country":"India","ip":"213.5.6.45"},"anonymous_id":"user5","jobIdentifier":"user5:5"}}]}},"relationships":{"lists":{"data":[{"type":"list","id":"list105"}]}}}}}},"metadata":{"jobId":5}} \ No newline at end of file diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go index 9cf55edd88..560eb8d787 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go @@ -1,49 +1,45 @@ package klaviyobulkupload -//go:generate mockgen -destination=../../../../mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go -package=mocks github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload Uploader,HttpClient,Poller,ProfileExtractor,UploadStats +//go:generate mockgen -destination=../../../../mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go -package=mocks github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/klaviyobulkupload KlaviyoAPIService import ( - "net/http" - "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" - - "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" ) -type Uploader interface { - Upload(*common.AsyncDestinationStruct) common.AsyncUploadOutput -} - -type HttpClient interface { - Do(req *http.Request) (*http.Response, error) +type KlaviyoAPIService interface { + UploadProfiles(profiles Payload) (*UploadResp, error) + GetUploadStatus(importId string) (*PollResp, error) + GetUploadErrors(importId string) (*UploadStatusResp, error) } -type Poller interface { - Poll(input common.AsyncPoll) common.PollStatusResponse +type KlaviyoBulkUploader struct { + DestName string + DestinationConfig map[string]interface{} + Logger logger.Logger + StatsFactory stats.Stats + KlaviyoAPIService KlaviyoAPIService + JobIdToIdentifierMap map[string]int64 } -type ProfileExtractor interface { - ExtractProfiles(input Input) Profile +type ErrorDetail struct { + ID string `json:"id"` + Code string `json:"code"` + Title string `json:"title"` + Detail string `json:"detail"` + Source ErrorSource `json:"source"` } -type UploadStats interface { - GetUploadStats(common.GetUploadStatsInput) common.GetUploadStatsResponse -} - -type KlaviyoBulkUploader struct { - destName string - destinationConfig map[string]interface{} - logger logger.Logger - statsFactory stats.Stats - Client *http.Client - jobIdToIdentifierMap map[string]int64 +type ErrorSource struct { + Pointer string `json:"pointer"` + Parameter string `json:"parameter"` } type UploadResp struct { Data struct { Id string `json:"id"` } `json:"data"` + Errors []ErrorDetail `json:"errors"` } type PollResp struct { @@ -56,6 +52,7 @@ type PollResp struct { Status string `json:"status"` } `json:"attributes"` } `json:"data"` + Errors []ErrorDetail `json:"errors"` } type UploadStatusResp struct { @@ -85,6 +82,7 @@ type UploadStatusResp struct { Prev string `json:"prev"` Next string `json:"next"` } `json:"links"` + Errors []ErrorDetail `json:"errors"` } type Payload struct { @@ -92,15 +90,40 @@ type Payload struct { } type Data struct { - Type string `json:"type"` - Attributes Attributes `json:"attributes"` - Relationships *Relationships `json:"relationships,omitempty"` + Type string `json:"type"` + Attributes PayloadAttributes `json:"attributes"` + Relationships *Relationships `json:"relationships,omitempty"` } -type Attributes struct { +type PayloadAttributes struct { Profiles Profiles `json:"profiles"` } +type ProfileAttributes struct { + Email string `json:"email,omitempty"` + Phone string `json:"phone_number,omitempty"` + ExternalId string `json:"external_id,omitempty"` + FirstName string `json:"first_name,omitempty"` + JobIdentifier string `json:"jobIdentifier,omitempty"` + LastName string `json:"last_name,omitempty"` + Organization string `json:"organization,omitempty"` + Title string `json:"title,omitempty"` + Image string `json:"image,omitempty"` + Location struct { + Address1 string `json:"address1,omitempty"` + Address2 string `json:"address2,omitempty"` + City string `json:"city,omitempty"` + Country string `json:"country,omitempty"` + Latitude string `json:"latitude,omitempty"` + Longitude string `json:"longitude,omitempty"` + Region string `json:"region,omitempty"` + Zip string `json:"zip,omitempty"` + Timezone string `json:"timezone,omitempty"` + IP string `json:"ip,omitempty"` + } `json:"location,omitempty"` + Properties map[string]interface{} `json:"properties,omitempty"` +} + type Profiles struct { Data []Profile `json:"data"` } @@ -117,60 +140,12 @@ type List struct { Type string `json:"type,omitempty"` ID string `json:"id,omitempty"` } - -type Input struct { - Message struct { - Body struct { - Form struct{} `json:"FORM,omitempty"` - JSON struct { - Data struct { - Attributes Attributes `json:"attributes"` - Relationships Relationships `json:"relationships,omitempty"` - Type string `json:"type,omitempty"` - } `json:"data,omitempty"` - } `json:"JSON,omitempty"` - JSONArray struct{} `json:"JSON_ARRAY,omitempty"` - XML struct{} `json:"XML,omitempty"` - } `json:"body,omitempty"` - Endpoint string `json:"endpoint,omitempty"` - Files struct{} `json:"files,omitempty"` - Headers struct{} `json:"headers,omitempty"` - Method string `json:"method,omitempty"` - Params struct{} `json:"params,omitempty"` - Type string `json:"type,omitempty"` - UserID string `json:"userId,omitempty"` - Version string `json:"version,omitempty"` - } `json:"message,omitempty"` - Metadata struct { - JobID int `json:"job_id,omitempty"` - } `json:"metadata,omitempty"` +type Metadata struct { + JobID int `json:"jobId,omitempty"` } type Profile struct { - Attributes struct { - Email string `json:"email,omitempty"` - Phone string `json:"phone_number,omitempty"` - ExternalId string `json:"external_id,omitempty"` - FirstName string `json:"first_name,omitempty"` - JobIdentifier string `json:"jobIdentifier,omitempty"` - LastName string `json:"last_name,omitempty"` - Organization string `json:"organization,omitempty"` - Title string `json:"title,omitempty"` - Image string `json:"image,omitempty"` - Location struct { - Address1 string `json:"address1,omitempty"` - Address2 string `json:"address2,omitempty"` - City string `json:"city,omitempty"` - Country string `json:"country,omitempty"` - Latitude string `json:"latitude,omitempty"` - Longitude string `json:"longitude,omitempty"` - Region string `json:"region,omitempty"` - Zip string `json:"zip,omitempty"` - Timezone string `json:"timezone,omitempty"` - IP string `json:"ip,omitempty"` - } `json:"location,omitempty"` - Properties map[string]interface{} `json:"properties,omitempty"` - } `json:"attributes,omitempty"` - ID string `json:"id,omitempty"` - Type string `json:"type,omitempty"` + Attributes ProfileAttributes `json:"attributes,omitempty"` + ID string `json:"id,omitempty"` + Type string `json:"type,omitempty"` }