Skip to content

Commit

Permalink
aws s3 bug fixes (#2200)
Browse files Browse the repository at this point in the history
  • Loading branch information
joe-ayoub-segment authored Jul 19, 2024
1 parent 7453408 commit f0e5ac2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 34 deletions.
78 changes: 49 additions & 29 deletions packages/destination-actions/src/destinations/aws-s3/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,60 +25,60 @@ function generateFile(payloads: Payload[], audienceSettings: AudienceSettings):
const additionalColumns = payloads[0].additional_identifiers_and_traits_columns ?? []

Object.entries(columnsField).forEach(([_, value]) => {
if (value !== undefined) {
if (![undefined, null, ''].includes(value)) {
headers.push(value)
}
})

additionalColumns.forEach((additionalColumn) => {
headers.push(additionalColumn.value)
if (![undefined, null, ''].includes(additionalColumn.value)) {
headers.push(additionalColumn.value)
}
})

const headerString = `${headers.join(audienceSettings.delimiter === 'tab' ? '\t' : audienceSettings.delimiter)}\n`

const rows: string[] = [headerString]

payloads.forEach((payload, index, arr) => {
const action = payload.propertiesOrTraits[payload.audienceName]

const row: string[] = []
if (headers.includes('audience_name')) {
row.push(enquoteIdentifier(String(payload.audienceName ?? '')))
if (![undefined, null, ''].includes(columnsField.audience_name)) {
row.push(encodeString(String(payload.audienceName ?? '')))
}
if (headers.includes('audience_id')) {
row.push(enquoteIdentifier(String(payload.audienceId ?? '')))
if (![undefined, null, ''].includes(columnsField.audience_id)) {
row.push(encodeString(String(payload.audienceId ?? '')))
}
if (headers.includes('audience_action')) {
row.push(enquoteIdentifier(String(action ?? '')))
if (![undefined, null, ''].includes(columnsField.audience_action)) {
row.push(encodeString(String(action ?? '')))
}
if (headers.includes('email')) {
row.push(enquoteIdentifier(String(payload.email ?? '')))
if (![undefined, null, ''].includes(columnsField.email)) {
row.push(encodeString(String(payload.email ?? '')))
}
if (headers.includes('user_id')) {
row.push(enquoteIdentifier(String(payload.userId ?? '')))
if (![undefined, null, ''].includes(columnsField.user_id)) {
row.push(encodeString(String(payload.userId ?? '')))
}
if (headers.includes('anonymous_id')) {
row.push(enquoteIdentifier(String(payload.anonymousId ?? '')))
if (![undefined, null, ''].includes(columnsField.anonymous_id)) {
row.push(encodeString(String(payload.anonymousId ?? '')))
}
if (headers.includes('timestamp')) {
row.push(enquoteIdentifier(String(payload.timestamp ?? '')))
if (![undefined, null, ''].includes(columnsField.timestamp)) {
row.push(encodeString(String(payload.timestamp ?? '')))
}
if (headers.includes('message_id')) {
row.push(enquoteIdentifier(String(payload.messageId ?? '')))
if (![undefined, null, ''].includes(columnsField.message_id)) {
row.push(encodeString(String(payload.messageId ?? '')))
}
if (headers.includes('space_id')) {
row.push(enquoteIdentifier(String(payload.spaceId ?? '')))
if (![undefined, null, ''].includes(columnsField.space_id)) {
row.push(encodeString(String(payload.spaceId ?? '')))
}
if (headers.includes('integrations_object')) {
row.push(enquoteIdentifier(String(JSON.stringify(payload.integrationsObject) ?? '')))
if (![undefined, null, ''].includes(columnsField.integrations_object)) {
row.push(encodeString(String(JSON.stringify(payload.integrationsObject) ?? '')))
}
if (headers.includes('properties_or_traits')) {
row.push(enquoteIdentifier(String(JSON.stringify(payload.propertiesOrTraits) ?? '')))
if (![undefined, null, ''].includes(columnsField.properties_or_traits)) {
row.push(encodeString(String(JSON.stringify(payload.propertiesOrTraits) ?? '')))
}

additionalColumns.forEach((additionalColumn) => {
//row.push(enquoteIdentifier(String(JSON.stringify(payload.propertiesOrTraits[additionalColumn.key]) ?? '')))
row.push(enquoteIdentifier(String(JSON.stringify(additionalColumn.key) ?? '')))
row.push(encodeString(String(JSON.stringify(payload.propertiesOrTraits[additionalColumn.key]) ?? '')))
})

const isLastRow = arr.length === index + 1
Expand All @@ -92,8 +92,28 @@ function generateFile(payloads: Payload[], audienceSettings: AudienceSettings):
return rows.join('')
}

function enquoteIdentifier(str: string) {
function encodeString(str: string) {
return `"${String(str).replace(/"/g, '""')}"`
}

export { generateFile, enquoteIdentifier }
function validate(payloads: Payload[], audienceSettings: AudienceSettings) {
const delimiter = audienceSettings.delimiter
const columns = payloads[0].columns
const additionalIdentifierColumns = payloads[0].additional_identifiers_and_traits_columns

// ensure column names do not contain delimiter
Object.values(columns).forEach((columnName) => {
if (columnName.includes(delimiter)) {
throw new Error(`Column name ${columnName} cannot contain delimiter: ${delimiter}`)
}
})

// ensure additional identifier column names do not contain delimiter
additionalIdentifierColumns?.forEach((column) => {
if (column.value.includes(delimiter)) {
throw new Error(`Column name ${column.value} cannot contain delimiter: ${delimiter}`)
}
})
}

export { generateFile, validate }

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { ActionDefinition } from '@segment/actions-core'
import type { Settings, AudienceSettings } from '../generated-types'
import type { Payload } from './generated-types'
import { generateFile } from '../operations'
import { generateFile, validate } from '../operations'
import { S3CSVClient } from './s3'

const action: ActionDefinition<Settings, Payload, AudienceSettings> = {
Expand Down Expand Up @@ -52,7 +52,7 @@ const action: ActionDefinition<Settings, Payload, AudienceSettings> = {
description: 'Name of column for timestamp for when the user was added or removed from the Audience',
type: 'string'
},
messageId: {
message_id: {
label: 'Message ID',
description: 'Name of column for the unique identifier for the message.',
type: 'string'
Expand Down Expand Up @@ -241,10 +241,9 @@ const action: ActionDefinition<Settings, Payload, AudienceSettings> = {
}

async function processData(payloads: Payload[], settings: Settings, audienceSettings?: AudienceSettings) {
validate(payloads, audienceSettings as AudienceSettings)
const fileContent = generateFile(payloads, audienceSettings as AudienceSettings)

const s3Client = new S3CSVClient(settings.s3_aws_region, settings.iam_role_arn, settings.iam_external_id)

await s3Client.uploadS3(settings, audienceSettings as AudienceSettings, fileContent)
}

Expand Down

0 comments on commit f0e5ac2

Please sign in to comment.