Skip to content

Commit

Permalink
SES: Support templates with if/else (#6867)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers authored Sep 30, 2023
1 parent 504387d commit 6fe4999
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests_real_aws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ jobs:
env:
MOTO_TEST_ALLOW_AWS_REQUEST: ${{ true }}
run: |
pytest -sv tests/test_ec2/ tests/test_s3 -m aws_verified
pytest -sv tests/test_ec2/ tests/test_ses/ tests/test_s3 -m aws_verified
3 changes: 3 additions & 0 deletions moto/ses/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ def render_template(self, render_data: Dict[str, Any]) -> str:
)
return rendered_template

def delete_template(self, name: str) -> None:
self.templates.pop(name)

def create_receipt_rule_set(self, rule_set_name: str) -> None:
if self.receipt_rule_set.get(rule_set_name) is not None:
raise RuleSetNameAlreadyExists("Duplicate Receipt Rule Set Name.")
Expand Down
13 changes: 13 additions & 0 deletions moto/ses/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ def test_render_template(self) -> str:
template = self.response_template(RENDER_TEMPLATE)
return template.render(template=rendered_template)

def delete_template(self) -> str:
name = self._get_param("TemplateName")
self.backend.delete_template(name)
return self.response_template(DELETE_TEMPLATE).render()

def create_receipt_rule_set(self) -> str:
rule_set_name = self._get_param("RuleSetName")
self.backend.create_receipt_rule_set(rule_set_name)
Expand Down Expand Up @@ -627,6 +632,14 @@ def get_identity_verification_attributes(self) -> str:
</TestRenderTemplateResponse>
"""

DELETE_TEMPLATE = """<DeleteTemplateResponse xmlns="http://ses.amazonaws.com/doc/2010-12-01/">
<DeleteTemplateResult>
</DeleteTemplateResult>
<ResponseMetadata>
<RequestId>47e0ef1a-9bf2-11e1-9279-0100e8cf12ba</RequestId>
</ResponseMetadata>
</DeleteTemplateResponse>"""

CREATE_RECEIPT_RULE_SET = """<CreateReceiptRuleSetResponse xmlns="http://ses.amazonaws.com/doc/2010-12-01/">
<CreateReceiptRuleSetResult/>
<ResponseMetadata>
Expand Down
193 changes: 160 additions & 33 deletions moto/ses/template.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,180 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Type

from .exceptions import MissingRenderingAttributeException
from moto.utilities.tokenizer import GenericTokenizer


class BlockProcessor:
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
self.template = template
self.template_data = template_data
self.tokenizer = tokenizer

def parse(self) -> str:
# Added to make MyPy happy
# Not all implementations have this method
# It's up to the caller to know whether to call this method
raise NotImplementedError


class EachBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
self.template = template
self.tokenizer = tokenizer

self.tokenizer.skip_characters("#each")
self.tokenizer.skip_white_space()
var_name = self.tokenizer.read_until("}}").strip()
self.tokenizer.skip_characters("}}")
self.template_data = template_data.get(var_name, [])

def parse(self) -> str:
parsed = ""
current_pos = self.tokenizer.token_pos

for template_data in self.template_data:
self.tokenizer.token_pos = current_pos
for char in self.tokenizer:
if char == "{" and self.tokenizer.peek() == "{":
self.tokenizer.skip_characters("{")
self.tokenizer.skip_white_space()

_processor = get_processor(self.tokenizer)(
self.template, template_data, self.tokenizer # type: ignore
)
# If we've reached the end, we should stop processing
# Our parent will continue with whatever comes after {{/each}}
if type(_processor) == EachEndBlockProcessor:
break
# If we've encountered another processor, they can continue
parsed += _processor.parse()

continue

parsed += char

return parsed


class EachEndBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)

self.tokenizer.skip_characters("/each")
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")


class IfBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)

self.tokenizer.skip_characters("#if")
self.tokenizer.skip_white_space()
condition = self.tokenizer.read_until("}}").strip()
self.tokenizer.skip_characters("}}")
self.parse_contents = template_data.get(condition)

def parse(self) -> str:
parsed = ""

for char in self.tokenizer:
if char == "{" and self.tokenizer.peek() == "{":
self.tokenizer.skip_characters("{")
self.tokenizer.skip_white_space()

_processor = get_processor(self.tokenizer)(
self.template, self.template_data, self.tokenizer
)
if type(_processor) == IfEndBlockProcessor:
break
elif type(_processor) == ElseBlockProcessor:
self.parse_contents = not self.parse_contents
continue
if self.parse_contents:
parsed += _processor.parse()

continue

if self.parse_contents:
parsed += char

return parsed


class IfEndBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)

self.tokenizer.skip_characters("/if")
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")


class ElseBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)

self.tokenizer.skip_characters("else")
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")


class VarBlockProcessor(BlockProcessor):
def parse(self) -> str:
var_name = self.tokenizer.read_until("}}").strip()
if self.template_data.get(var_name) is None:
raise MissingRenderingAttributeException(var_name)
data: str = self.template_data.get(var_name) # type: ignore
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")
return data


def get_processor(tokenizer: GenericTokenizer) -> Type[BlockProcessor]:
if tokenizer.peek(5) == "#each":
return EachBlockProcessor
if tokenizer.peek(5) == "/each":
return EachEndBlockProcessor
if tokenizer.peek(3) == "#if":
return IfBlockProcessor
if tokenizer.peek(3) == "/if":
return IfEndBlockProcessor
if tokenizer.peek(4) == "else":
return ElseBlockProcessor
return VarBlockProcessor


def parse_template(
template: str,
template_data: Dict[str, Any],
tokenizer: Optional[GenericTokenizer] = None,
until: Optional[str] = None,
) -> str:
tokenizer = tokenizer or GenericTokenizer(template)
state = ""

parsed = ""
var_name = ""
for char in tokenizer:
if until is not None and (char + tokenizer.peek(len(until) - 1)) == until:
return parsed
if char == "{" and tokenizer.peek() == "{":
# Two braces next to each other indicate a variable/language construct such as for-each
# We have different processors handling different constructs
tokenizer.skip_characters("{")
tokenizer.skip_white_space()
if tokenizer.peek() == "#":
state = "LOOP"
tokenizer.skip_characters("#each")
tokenizer.skip_white_space()
else:
state = "VAR"
continue
if char == "}":
if state in ["LOOP", "VAR"]:
tokenizer.skip_characters("}")
if state == "VAR":
if template_data.get(var_name) is None:
raise MissingRenderingAttributeException(var_name)
parsed += template_data.get(var_name) # type: ignore
else:
current_position = tokenizer.token_pos
for item in template_data.get(var_name, []):
tokenizer.token_pos = current_position
parsed += parse_template(
template, item, tokenizer, until="{{/each}}"
)
tokenizer.skip_characters("{/each}}")
var_name = ""
state = ""
continue
if state in ["LOOP", "VAR"]:
var_name += char

_processor = get_processor(tokenizer)(template, template_data, tokenizer)
parsed += _processor.parse()
continue

parsed += char
return parsed
8 changes: 8 additions & 0 deletions moto/utilities/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ def __next__(self) -> str:
return ""
raise StopIteration

def read_until(self, phrase: str) -> str:
chars_read = ""
for char in self:
chars_read += char
if self.peek(len(phrase)) == phrase:
return chars_read
return chars_read

def skip_characters(self, phrase: str, case_sensitive: bool = False) -> None:
"""
Skip the characters in the supplied phrase.
Expand Down
29 changes: 28 additions & 1 deletion tests/test_ses/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
# This file is intentionally left blank.
import os
from functools import wraps
from moto import mock_ses


def ses_aws_verified(func):
"""
Function that is verified to work against AWS.
Can be run against AWS at any time by setting:
MOTO_TEST_ALLOW_AWS_REQUEST=true
If this environment variable is not set, the function runs in a `mock_ses` context.
"""

@wraps(func)
def pagination_wrapper():
allow_aws_request = (
os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
)

if allow_aws_request:
resp = func()
else:
with mock_ses():
resp = func()
return resp

return pagination_wrapper
34 changes: 19 additions & 15 deletions tests/test_ses/test_ses_boto3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from botocore.exceptions import ParamValidationError
import pytest
from moto import mock_ses
from . import ses_aws_verified


@mock_ses
Expand Down Expand Up @@ -1296,33 +1297,36 @@ def test_render_template():
)


@mock_ses
def test_render_template__with_foreach():
@ses_aws_verified
def test_render_template__advanced():
conn = boto3.client("ses", region_name="us-east-1")

kwargs = {
"TemplateName": "MTT",
"TemplateData": json.dumps(
{
"items": [
{"type": "dog", "name": "bobby"},
{"type": "cat", "name": "pedro"},
{"type": "dog", "name": "bobby", "best": True},
{"type": "cat", "name": "pedro", "best": False},
]
}
),
}

conn.create_template(
Template={
"TemplateName": "MTT",
"SubjectPart": "..",
"TextPart": "..",
"HtmlPart": "{{#each items}} {{name}} is a {{type}}, {{/each}}",
}
)
result = conn.test_render_template(**kwargs)
assert "bobby is a dog" in result["RenderedTemplate"]
assert "pedro is a cat" in result["RenderedTemplate"]
try:
conn.create_template(
Template={
"TemplateName": "MTT",
"SubjectPart": "..",
"TextPart": "..",
"HtmlPart": "{{#each items}} {{name}} is {{#if best}}the best{{else}}a {{type}}{{/if}}, {{/each}}",
}
)
result = conn.test_render_template(**kwargs)
assert "bobby is the best" in result["RenderedTemplate"]
assert "pedro is a cat" in result["RenderedTemplate"]
finally:
conn.delete_template(TemplateName="MTT")


@mock_ses
Expand Down
Loading

0 comments on commit 6fe4999

Please sign in to comment.