Skip to content

Commit

Permalink
fix: Recreate Analytics clients when credentials change (#12789)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimblanc authored Jan 4, 2024
1 parent ef15561 commit 1bcbae4
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { EventBuffer, groupBy, IAnalyticsClient } from '../../../utils';
import {
AWSCredentials,
haveCredentialsChanged
} from '@aws-amplify/core/internals/utils';
import {
FirehoseClient,
PutRecordBatchCommand,
} from '@aws-sdk/client-firehose';
import {
EventBuffer,
groupBy,
IAnalyticsClient
} from '../../../utils';
import {
KinesisFirehoseBufferEvent,
KinesisFirehoseEventBufferConfig,
Expand All @@ -23,7 +31,7 @@ const eventBufferMap: Record<
string,
EventBuffer<KinesisFirehoseBufferEvent>
> = {};
const cachedClients: Record<string, FirehoseClient> = {};
const cachedClients: Record<string, [FirehoseClient, AWSCredentials]> = {};

const createPutRecordsBatchCommand = (
streamName: string,
Expand Down Expand Up @@ -76,18 +84,29 @@ export const getEventBuffer = ({
userAgentValue,
}: KinesisFirehoseEventBufferConfig): EventBuffer<KinesisFirehoseBufferEvent> => {
const sessionIdentityKey = [region, identityId].filter(id => !!id).join('-');
const [ cachedClient, cachedCredentials ] = cachedClients[sessionIdentityKey] ?? [];
let credentialsHaveChanged = false;

if (!eventBufferMap[sessionIdentityKey]) {
// Check if credentials have changed for the cached client
if (cachedClient) {
credentialsHaveChanged = haveCredentialsChanged(cachedCredentials, credentials);
}

if (!eventBufferMap[sessionIdentityKey] || credentialsHaveChanged) {
const getClient = (): IAnalyticsClient<KinesisFirehoseBufferEvent> => {
if (!cachedClients[sessionIdentityKey]) {
cachedClients[sessionIdentityKey] = new FirehoseClient({
region,
credentials,
customUserAgent: userAgentValue,
});
if (!cachedClient || credentialsHaveChanged) {
cachedClients[sessionIdentityKey] = [
new FirehoseClient({
region,
credentials,
customUserAgent: userAgentValue,
}),
credentials
];
}

const firehoseClient = cachedClients[sessionIdentityKey];
const [ firehoseClient ] = cachedClients[sessionIdentityKey];

return events => submitEvents(events, firehoseClient, resendLimit);
};

Expand Down
42 changes: 31 additions & 11 deletions packages/analytics/src/providers/kinesis/utils/getEventBuffer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { KinesisBufferEvent, KinesisEventBufferConfig } from '../types';
import { EventBuffer, groupBy, IAnalyticsClient } from '../../../utils';
import {
AWSCredentials,
haveCredentialsChanged
} from '@aws-amplify/core/internals/utils';
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
import { KinesisBufferEvent, KinesisEventBufferConfig } from '../types';
import {
EventBuffer,
groupBy,
IAnalyticsClient
} from '../../../utils';

/**
* These Records hold cached event buffers and AWS clients.
Expand All @@ -14,7 +22,7 @@ import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
* When a new session is initiated, the previous ones should be released.
* */
const eventBufferMap: Record<string, EventBuffer<KinesisBufferEvent>> = {};
const cachedClients: Record<string, KinesisClient> = {};
const cachedClients: Record<string, [KinesisClient, AWSCredentials]> = {};

const createKinesisPutRecordsCommand = (
streamName: string,
Expand Down Expand Up @@ -67,19 +75,31 @@ export const getEventBuffer = ({
userAgentValue,
}: KinesisEventBufferConfig): EventBuffer<KinesisBufferEvent> => {
const sessionIdentityKey = [region, identityId].filter(x => !!x).join('-');
const [ cachedClient, cachedCredentials ] = cachedClients[sessionIdentityKey] ?? [];
let credentialsHaveChanged = false;

if (!eventBufferMap[sessionIdentityKey]) {
// Check if credentials have changed for the cached client
if (cachedClient) {
credentialsHaveChanged = haveCredentialsChanged(cachedCredentials, credentials);
}

if (!eventBufferMap[sessionIdentityKey] || credentialsHaveChanged) {
const getKinesisClient = (): IAnalyticsClient<KinesisBufferEvent> => {
if (!cachedClients[sessionIdentityKey]) {
cachedClients[sessionIdentityKey] = new KinesisClient({
credentials,
region,
customUserAgent: userAgentValue,
});
if (!cachedClient || credentialsHaveChanged) {
cachedClients[sessionIdentityKey] = [
new KinesisClient({
credentials,
region,
customUserAgent: userAgentValue,
}),
credentials
];
}

const [ kinesisClient ] = cachedClients[sessionIdentityKey];

return events =>
submitEvents(events, cachedClients[sessionIdentityKey], resendLimit);
submitEvents(events, kinesisClient, resendLimit);
};

// create new session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { EventBuffer, groupBy, IAnalyticsClient } from '../../../utils';
import { PersonalizeBufferConfig, PersonalizeBufferEvent } from '../types';
import {
AWSCredentials,
haveCredentialsChanged
} from '@aws-amplify/core/internals/utils';
import {
PersonalizeEventsClient,
PutEventsCommand,
} from '@aws-sdk/client-personalize-events';
import {
EventBuffer,
groupBy,
IAnalyticsClient
} from '../../../utils';
import { PersonalizeBufferConfig, PersonalizeBufferEvent } from '../types';

/**
* These Records hold cached event buffers and AWS clients.
Expand All @@ -17,7 +25,7 @@ import {
* When a new session is initiated, the previous ones should be released.
* */
const eventBufferMap: Record<string, EventBuffer<PersonalizeBufferEvent>> = {};
const cachedClients: Record<string, PersonalizeEventsClient> = {};
const cachedClients: Record<string, [PersonalizeEventsClient, AWSCredentials]> = {};

const DELIMITER = '#';

Expand Down Expand Up @@ -71,17 +79,30 @@ export const getEventBuffer = ({
userAgentValue,
}: PersonalizeBufferConfig): EventBuffer<PersonalizeBufferEvent> => {
const sessionIdentityKey = [region, identityId].filter(x => !!x).join('-');
const [ cachedClient, cachedCredentials ] = cachedClients[sessionIdentityKey] ?? [];
let credentialsHaveChanged = false;

// Check if credentials have changed for the cached client
if (cachedClient) {
credentialsHaveChanged = haveCredentialsChanged(cachedCredentials, credentials);
}

if (!eventBufferMap[sessionIdentityKey]) {
if (!eventBufferMap[sessionIdentityKey] || credentialsHaveChanged) {
const getClient = (): IAnalyticsClient<PersonalizeBufferEvent> => {
if (!cachedClients[sessionIdentityKey]) {
cachedClients[sessionIdentityKey] = new PersonalizeEventsClient({
region,
credentials,
customUserAgent: userAgentValue,
});
if (!cachedClient || credentialsHaveChanged) {
cachedClients[sessionIdentityKey] = [
new PersonalizeEventsClient({
region,
credentials,
customUserAgent: userAgentValue,
}),
credentials
];
}
return events => submitEvents(events, cachedClients[sessionIdentityKey]);

const [ personalizeClient ] = cachedClients[sessionIdentityKey];

return events => submitEvents(events, personalizeClient);
};

eventBufferMap[sessionIdentityKey] =
Expand Down
2 changes: 1 addition & 1 deletion packages/analytics/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
"strict": true,
"noImplicitAny": true
},
"include": ["./src"]
"include": ["./src", "../core/src/utils/haveCredentialsChanged.ts"]
}
2 changes: 1 addition & 1 deletion packages/aws-amplify/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@
"name": "[Analytics] record (Kinesis)",
"path": "./dist/esm/analytics/kinesis/index.mjs",
"import": "{ record }",
"limit": "44.40 kB"
"limit": "44.47 kB"
},
{
"name": "[Analytics] record (Kinesis Firehose)",
Expand Down
42 changes: 42 additions & 0 deletions packages/core/__tests__/utils/haveCredentialsChanged.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { haveCredentialsChanged } from '../../src/utils/haveCredentialsChanged';

const MOCK_AWS_CREDS = {
accessKeyId: 'mock-access-key',
secretAccessKey: 'mock-secret-key',
sessionToken: 'mock-session-token'
};

describe('haveCredentialsChanged', () => {
it('returns true if access key has changed', () => {
const credentialsHaveChanged = haveCredentialsChanged(MOCK_AWS_CREDS, {
...MOCK_AWS_CREDS,
secretAccessKey: 'mock-secret-key-alt',
});

expect(credentialsHaveChanged).toBe(true);
});

it('returns true if access key has changed', () => {
const credentialsHaveChanged = haveCredentialsChanged(MOCK_AWS_CREDS, {
...MOCK_AWS_CREDS,
accessKeyId: 'mock-access-key-alt',
});

expect(credentialsHaveChanged).toBe(true);
});

it('returns true if session token has changed', () => {
const credentialsHaveChanged = haveCredentialsChanged(MOCK_AWS_CREDS, {
...MOCK_AWS_CREDS,
sessionToken: 'mock-session-token-alt',
});

expect(credentialsHaveChanged).toBe(true);
});

it('returns false if credentials have not changed', () => {
const credentialsHaveChanged = haveCredentialsChanged(MOCK_AWS_CREDS, MOCK_AWS_CREDS);

expect(credentialsHaveChanged).toBe(false);
});
})
1 change: 1 addition & 0 deletions packages/core/src/libraryUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export {
AuthVerifiableAttributeKey,
AWSCredentials,
} from './singleton/Auth/types';
export { haveCredentialsChanged } from './utils/haveCredentialsChanged';

// Platform & user-agent utilities
export {
Expand Down
17 changes: 17 additions & 0 deletions packages/core/src/utils/haveCredentialsChanged.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { AWSCredentials } from '../libraryUtils';

/**
* Utility for determining if credentials have changed.
*
* @internal
*/
export const haveCredentialsChanged = (cachedCredentials: AWSCredentials, credentials: AWSCredentials) => {
return (
cachedCredentials.accessKeyId !== credentials.accessKeyId ||
cachedCredentials.sessionToken !== credentials.sessionToken ||
cachedCredentials.secretAccessKey !== credentials.secretAccessKey
);
};

0 comments on commit 1bcbae4

Please sign in to comment.