Skip to content

Commit

Permalink
Email notifications working
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudify committed Jul 3, 2019
1 parent fe3988d commit 4326f80
Show file tree
Hide file tree
Showing 15 changed files with 793 additions and 33 deletions.
7 changes: 2 additions & 5 deletions CreateNotificationActivity/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ import {
} from "io-functions-commons/dist/src/models/sender_service";
import { ulidGenerator } from "io-functions-commons/dist/src/utils/strings";

import {
getStoreMessageContentActivityHandler,
ISuccessfulStoreMessageContentActivityResult
} from "../StoreMessageContentActivity/handler";
import { ISuccessfulStoreMessageContentActivityResult } from "../StoreMessageContentActivity/handler";

/**
* Attempt to resolve an email address from
Expand Down Expand Up @@ -95,7 +92,7 @@ type ICreateNotificationActivityResult =
| ICreateNotificationActivityNoneResult;

/**
* Returns a function for handling emailNotificationActivity
* Returns a function for handling createNotificationActivity
*/
export const getCreateNotificationActivityHandler = (
lSenderServiceModel: SenderServiceModel,
Expand Down
2 changes: 1 addition & 1 deletion CreateNotificationActivity/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* function app in Kudu
*/

import { AzureFunction, Context } from "@azure/functions";
import { AzureFunction } from "@azure/functions";
import { DocumentClient as DocumentDBClient } from "documentdb";

import { HttpsUrl } from "io-functions-commons/dist/generated/definitions/HttpsUrl";
Expand Down
132 changes: 109 additions & 23 deletions CreatedMessageOrchestrator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,32 @@ import { IFunctionContext } from "durable-functions/lib/src/classes";
import { ReadableReporter } from "italia-ts-commons/lib/reporters";
import { PromiseType } from "italia-ts-commons/lib/types";

import { NotificationChannelEnum } from "io-functions-commons/dist/generated/definitions/NotificationChannel";
import { NotificationChannelStatusValueEnum } from "io-functions-commons/dist/generated/definitions/NotificationChannelStatusValue";
import { CreatedMessageEvent } from "io-functions-commons/dist/src/models/created_message_event";

import { getStoreMessageContentActivityHandler } from "../StoreMessageContentActivity/handler";
import { getCreateNotificationActivityHandler } from "../CreateNotificationActivity/handler";
import { getEmailNotificationActivityHandler } from "../EmailNotificationActivity/handler";
import { NotificationStatusUpdaterActivityInput } from "../NotificationStatusUpdaterActivity/handler";
import { getStoreMessageContentActivityHandler } from "../StoreMessageContentActivity/handler";

import { HandlerInputType } from "./utils";

/**
* Durable Functions Orchestrator that handles CreatedMessage events
*
* Note that this handler may be executed multiple times for a single job.
* See https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-checkpointing-and-replay
*
*/
function* handler(context: IFunctionContext): IterableIterator<unknown> {
// decode input CreatedMessageEvent
const input = context.df.getInput();

// decode input CreatedMessageEvent
const errorOrCreatedMessageEvent = CreatedMessageEvent.decode(input);
if (errorOrCreatedMessageEvent.isLeft()) {
context.log.error(
`Invalid CreatedMessageEvent received by orchestrator|ORCHESTRATOR_ID=${
`CreatedMessageOrchestrator|Invalid CreatedMessageEvent received|ORCHESTRATOR_ID=${
context.df.instanceId
}|ERRORS=${ReadableReporter.report(errorOrCreatedMessageEvent).join(
" / "
Expand All @@ -39,39 +53,51 @@ function* handler(context: IFunctionContext): IterableIterator<unknown> {
const createdMessageEvent = errorOrCreatedMessageEvent.value;
const newMessageWithContent = createdMessageEvent.message;

context.log.verbose(
`CreatedMessageOrchestrator|CreatedMessageEvent received|ORCHESTRATOR_ID=${context.df.instanceId}|MESSAGE_ID=${newMessageWithContent.id}|RECIPIENT=${newMessageWithContent.fiscalCode}`
);
if (!context.df.isReplaying) {
context.log.verbose(
`CreatedMessageOrchestrator|CreatedMessageEvent received|ORCHESTRATOR_ID=${context.df.instanceId}|MESSAGE_ID=${newMessageWithContent.id}|RECIPIENT=${newMessageWithContent.fiscalCode}`
);
}

// TODO: customize + backoff
// see https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-error-handling#javascript-functions-2x-only-1
const retryOptions = new df.RetryOptions(5000, 10);

try {
// first we store the content of the message in the database
const storeMessageContentActivityResult: PromiseType<
ReturnType<ReturnType<typeof getStoreMessageContentActivityHandler>>
> = yield context.df.callActivityWithRetry(
"StoreMessageContentActivity",
retryOptions,
createdMessageEvent
// The cast is here for making TypeScript check that we're indeed passing
// the right parameters
// tslint:disable-next-line: no-useless-cast
createdMessageEvent as HandlerInputType<
ReturnType<typeof getStoreMessageContentActivityHandler>
>
);

context.log.verbose(
`CreatedMessageOrchestrator|StoreMessageContentActivity completed|ORCHESTRATOR_ID=${
context.df.instanceId
}|MESSAGE_ID=${newMessageWithContent.id}|RESULT=${
storeMessageContentActivityResult.kind === "SUCCESS"
? "SUCCESS"
: "FAILURE/" + storeMessageContentActivityResult.reason
}`
);
if (!context.df.isReplaying) {
context.log.verbose(
`CreatedMessageOrchestrator|StoreMessageContentActivity completed|ORCHESTRATOR_ID=${
context.df.instanceId
}|MESSAGE_ID=${newMessageWithContent.id}|RESULT=${
storeMessageContentActivityResult.kind === "SUCCESS"
? "SUCCESS"
: "FAILURE/" + storeMessageContentActivityResult.reason
}`
);
}

if (storeMessageContentActivityResult.kind !== "SUCCESS") {
// StoreMessageContentActivity failed permanently, we can't proceed with
// delivering the notifications
return [];
}

// then we create a NotificationActivity in the database that will store
// the status of the notification on each channel
const createNotificationActivityResult: PromiseType<
ReturnType<ReturnType<typeof getCreateNotificationActivityHandler>>
> = yield context.df.callActivityWithRetry(
Expand All @@ -80,18 +106,78 @@ function* handler(context: IFunctionContext): IterableIterator<unknown> {
{
createdMessageEvent,
storeMessageContentActivityResult
}
} as HandlerInputType<
ReturnType<typeof getCreateNotificationActivityHandler>
>
);

context.log.verbose(
`createNotificationActivityResult: ${JSON.stringify(
createNotificationActivityResult
)}`
);
if (createNotificationActivityResult.kind === "none") {
// no channel configured, no notifications need to be delivered
context.log.verbose(
`CreatedMessageOrchestrator|No notifications will be delivered|MESSAGE_ID=${newMessageWithContent.id}`
);
return [];
}

// TODO: run all notifications in parallel

if (createNotificationActivityResult.hasEmail) {
// send the email notification
try {
const emailNotificationActivityResult: PromiseType<
ReturnType<ReturnType<typeof getEmailNotificationActivityHandler>>
> = yield context.df.callActivityWithRetry(
"EmailNotificationActivity",
retryOptions,
{
emailNotificationEventJson:
createNotificationActivityResult.notificationEvent
} as HandlerInputType<
ReturnType<typeof getEmailNotificationActivityHandler>
>
);

if (!context.df.isReplaying) {
context.log.verbose(
`CreatedMessageOrchestrator|EmailNotificationActivity result: ${JSON.stringify(
emailNotificationActivityResult
)}`
);
}

// update the notification status
const emailNotificationStatusUpdaterActivityInput = NotificationStatusUpdaterActivityInput.encode(
{
channel: NotificationChannelEnum.EMAIL,
messageId: createdMessageEvent.message.id,
notificationId:
createNotificationActivityResult.notificationEvent.notificationId,
status: NotificationChannelStatusValueEnum.SENT
}
);
try {
yield context.df.callActivityWithRetry(
"NotificationStatusUpdaterActivity",
retryOptions,
emailNotificationStatusUpdaterActivityInput
);
} catch (e) {
// too many failures
context.log.error(
`CreatedMessageOrchestrator|NotificationStatusUpdaterActivity failed too many times|MESSAGE_ID=${createdMessageEvent.message.id}|CHANNEL=email|ERROR=${e}`
);
}
} catch (e) {
// too many failures
context.log.error(
`CreatedMessageOrchestrator|EmailNotificationActivity failed too many times|MESSAGE_ID=${createdMessageEvent.message.id}|ERROR=${e}`
);
}
}
} catch (e) {
// too many retries
context.log.error(
`Fatal error, StoreMessageContentActivity or createNotificationActivity exceeded the max retries|MESSAGE_ID=${createdMessageEvent.message.id}`
`CreatedMessageOrchestrator|Fatal error, StoreMessageContentActivity or CreateNotificationActivity exceeded the max retries|MESSAGE_ID=${createdMessageEvent.message.id}|ERROR=${e}`
);
}

Expand Down
9 changes: 9 additions & 0 deletions CreatedMessageOrchestrator/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Function2 } from "fp-ts/lib/function";

/**
* Extracts the input type of an activity handler
*/
// tslint:disable-next-line: no-any
export type HandlerInputType<T> = T extends Function2<any, infer A, any>
? A
: never;
10 changes: 10 additions & 0 deletions EmailNotificationActivity/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"bindings": [
{
"name": "name",
"type": "activityTrigger",
"direction": "in"
}
],
"scriptFile": "../dist/EmailNotificationActivity/index.js"
}
Loading

0 comments on commit 4326f80

Please sign in to comment.