diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/BaseEmailTest.java b/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/BaseEmailTest.java
index c7c79e5df2..22b13a35c2 100644
--- a/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/BaseEmailTest.java
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/BaseEmailTest.java
@@ -38,7 +38,6 @@ public class BaseEmailTest {
private static final GreenMail greenMail = new GreenMail();
@TempDir File tempDir;
private GreenMailUser greenMailUser = greenMail.setUser("test@camunda.com", "password");
- ;
@BeforeAll
static void setup() {
diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/InboundEmailTest.java b/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/InboundEmailTest.java
new file mode 100644
index 0000000000..6431a69cd1
--- /dev/null
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/InboundEmailTest.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
+ * under one or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. Camunda licenses this file to you under the Apache License,
+ * Version 2.0; you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.camunda.connector.e2e;
+
+import static io.camunda.connector.e2e.BpmnFile.replace;
+import static org.mockito.Mockito.when;
+
+import io.camunda.connector.e2e.app.TestConnectorRuntimeApplication;
+import io.camunda.connector.runtime.inbound.state.ProcessImportResult;
+import io.camunda.connector.runtime.inbound.state.ProcessStateStore;
+import io.camunda.operate.CamundaOperateClient;
+import io.camunda.operate.exception.OperateException;
+import io.camunda.operate.model.ProcessDefinition;
+import io.camunda.zeebe.client.ZeebeClient;
+import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
+import io.camunda.zeebe.model.bpmn.instance.Process;
+import io.camunda.zeebe.process.test.assertions.BpmnAssert;
+import io.camunda.zeebe.spring.test.ZeebeSpringTest;
+import jakarta.mail.Flags;
+import jakarta.mail.MessagingException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest(
+ classes = {TestConnectorRuntimeApplication.class},
+ properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ },
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ZeebeSpringTest
+@ExtendWith(MockitoExtension.class)
+public class InboundEmailTest extends BaseEmailTest {
+
+ private static final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor();
+ private static AtomicLong counter = new AtomicLong(1);
+ @Autowired ProcessStateStore processStateStore;
+ @Autowired CamundaOperateClient camundaOperateClient;
+ @Mock private ProcessDefinition processDef;
+ @Autowired private ZeebeClient zeebeClient;
+
+ @BeforeEach
+ public void beforeEach() {
+ super.reset();
+ }
+
+ @Test
+ public void shouldReceiveEmailAndSetAsSeen() throws OperateException, MessagingException {
+ var model =
+ replace(
+ "email-inbound-connector-intermediate_unseen_read.bpmn",
+ BpmnFile.Replace.replace("55555", super.getUnsecureImapPort()));
+
+ mockProcessDefinition(model);
+
+ scheduler.schedule(
+ () -> super.sendEmail("test@camunda.com", "test", "hey"), 2, TimeUnit.SECONDS);
+
+ processStateStore.update(
+ new ProcessImportResult(
+ Map.of(
+ new ProcessImportResult.ProcessDefinitionIdentifier(
+ processDef.getBpmnProcessId(), processDef.getTenantId()),
+ new ProcessImportResult.ProcessDefinitionVersion(
+ processDef.getKey(), processDef.getVersion().intValue()))));
+ var bpmnTest = ZeebeTest.with(zeebeClient).deploy(model).createInstance();
+
+ bpmnTest = bpmnTest.waitForProcessCompletion();
+
+ Assertions.assertTrue(
+ Arrays.stream(super.getLastReceivedEmails())
+ .findFirst()
+ .get()
+ .getFlags()
+ .contains(Flags.Flag.SEEN));
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("subject", "test");
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("plainTextBody", "hey");
+ }
+
+ @Test
+ public void shouldThrowWhenAllMessageAreSeen() throws OperateException, MessagingException {
+ var model =
+ replace(
+ "email-inbound-connector-intermediate_unseen_read.bpmn",
+ BpmnFile.Replace.replace("55555", super.getUnsecureImapPort()));
+
+ mockProcessDefinition(model);
+
+ super.sendEmail("test@camunda.com", "test", "hey");
+
+ Arrays.stream(getLastReceivedEmails()).findFirst().get().setFlag(Flags.Flag.SEEN, true);
+
+ processStateStore.update(
+ new ProcessImportResult(
+ Map.of(
+ new ProcessImportResult.ProcessDefinitionIdentifier(
+ processDef.getBpmnProcessId(), processDef.getTenantId()),
+ new ProcessImportResult.ProcessDefinitionVersion(
+ processDef.getKey(), processDef.getVersion().intValue()))));
+ var bpmnTest = ZeebeTest.with(zeebeClient).deploy(model).createInstance();
+
+ Assertions.assertThrows(ConditionTimeoutException.class, bpmnTest::waitForProcessCompletion);
+ }
+
+ @Test
+ public void shouldReceiveEmailAndDelete() throws OperateException, MessagingException {
+ var model =
+ replace(
+ "email-inbound-connector-intermediate_unseen_delete.bpmn",
+ BpmnFile.Replace.replace("55555", super.getUnsecureImapPort()));
+
+ mockProcessDefinition(model);
+
+ scheduler.schedule(
+ () -> super.sendEmail("test@camunda.com", "test", "hey"), 2, TimeUnit.SECONDS);
+
+ processStateStore.update(
+ new ProcessImportResult(
+ Map.of(
+ new ProcessImportResult.ProcessDefinitionIdentifier(
+ processDef.getBpmnProcessId(), processDef.getTenantId()),
+ new ProcessImportResult.ProcessDefinitionVersion(
+ processDef.getKey(), processDef.getVersion().intValue()))));
+
+ var bpmnTest = ZeebeTest.with(zeebeClient).deploy(model).createInstance();
+
+ bpmnTest = bpmnTest.waitForProcessCompletion();
+
+ Assertions.assertTrue(
+ Arrays.stream(super.getLastReceivedEmails())
+ .findFirst()
+ .get()
+ .getFlags()
+ .contains(Flags.Flag.DELETED));
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("subject", "test");
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("plainTextBody", "hey");
+ }
+
+ @Test
+ public void shouldReceiveEmailAndMove() throws OperateException, MessagingException {
+ var model =
+ replace(
+ "email-inbound-connector-intermediate_unseen_move.bpmn",
+ BpmnFile.Replace.replace("55555", super.getUnsecureImapPort()));
+
+ mockProcessDefinition(model);
+
+ scheduler.schedule(
+ () -> super.sendEmail("test@camunda.com", "test", "hey"), 2, TimeUnit.SECONDS);
+
+ processStateStore.update(
+ new ProcessImportResult(
+ Map.of(
+ new ProcessImportResult.ProcessDefinitionIdentifier(
+ processDef.getBpmnProcessId(), processDef.getTenantId()),
+ new ProcessImportResult.ProcessDefinitionVersion(
+ processDef.getKey(), processDef.getVersion().intValue()))));
+
+ var bpmnTest = ZeebeTest.with(zeebeClient).deploy(model).createInstance();
+
+ bpmnTest = bpmnTest.waitForProcessCompletion();
+
+ Assertions.assertEquals(2, getLastReceivedEmails().length);
+ Assertions.assertTrue(
+ Arrays.stream(getLastReceivedEmails())
+ .findFirst()
+ .get()
+ .getFlags()
+ .contains(Flags.Flag.DELETED));
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("subject", "test");
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("plainTextBody", "hey");
+ }
+
+ private void mockProcessDefinition(BpmnModelInstance model) throws OperateException {
+ when(camundaOperateClient.getProcessDefinitionModel(1L)).thenReturn(model);
+ when(processDef.getKey()).thenReturn(1L);
+ when(processDef.getTenantId()).thenReturn(zeebeClient.getConfiguration().getDefaultTenantId());
+ when(processDef.getBpmnProcessId())
+ .thenReturn(model.getModelElementsByType(Process.class).stream().findFirst().get().getId());
+ when(processDef.getVersion()).thenReturn(counter.getAndIncrement());
+ }
+
+ @Test
+ public void shouldPollEmailAndMove() throws OperateException, MessagingException {
+ var model =
+ replace(
+ "email-inbound-connector-intermediate_all_delete.bpmn",
+ BpmnFile.Replace.replace("55555", super.getUnsecureImapPort()));
+
+ mockProcessDefinition(model);
+
+ super.sendEmail("test@camunda.com", "test", "hey");
+
+ Arrays.stream(super.getLastReceivedEmails()).findFirst().get().setFlag(Flags.Flag.SEEN, true);
+
+ processStateStore.update(
+ new ProcessImportResult(
+ Map.of(
+ new ProcessImportResult.ProcessDefinitionIdentifier(
+ processDef.getBpmnProcessId(), processDef.getTenantId()),
+ new ProcessImportResult.ProcessDefinitionVersion(
+ processDef.getKey(), processDef.getVersion().intValue()))));
+
+ var bpmnTest = ZeebeTest.with(zeebeClient).deploy(model).createInstance();
+
+ bpmnTest = bpmnTest.waitForProcessCompletion();
+
+ Assertions.assertTrue(
+ Arrays.stream(super.getLastReceivedEmails())
+ .findFirst()
+ .get()
+ .getFlags()
+ .contains(Flags.Flag.DELETED));
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("subject", "test");
+ BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
+ .hasVariableWithValue("plainTextBody", "hey");
+ }
+}
diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/EmailTests.java b/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/OutboundEmailTests.java
similarity index 99%
rename from connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/EmailTests.java
rename to connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/OutboundEmailTests.java
index d8326142fa..36cad8a345 100644
--- a/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/EmailTests.java
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/java/io/camunda/connector/e2e/OutboundEmailTests.java
@@ -47,7 +47,7 @@
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ZeebeSpringTest
@ExtendWith(MockitoExtension.class)
-public class EmailTests extends BaseEmailTest {
+public class OutboundEmailTests extends BaseEmailTest {
private static final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_all_delete.bpmn b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_all_delete.bpmn
new file mode 100644
index 0000000000..76a9b9c9f7
--- /dev/null
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_all_delete.bpmn
@@ -0,0 +1,64 @@
+
+
+
+
+ Flow_1vp6srd
+
+
+ Flow_0lofu08
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Flow_0lofu08
+ Flow_1vp6srd
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_all_move.bpmn b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_all_move.bpmn
new file mode 100644
index 0000000000..27d4d11d4e
--- /dev/null
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_all_move.bpmn
@@ -0,0 +1,65 @@
+
+
+
+
+ Flow_1vp6srd
+
+
+ Flow_0lofu08
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Flow_0lofu08
+ Flow_1vp6srd
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_delete.bpmn b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_delete.bpmn
new file mode 100644
index 0000000000..6ae220a00f
--- /dev/null
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_delete.bpmn
@@ -0,0 +1,64 @@
+
+
+
+
+ Flow_1vp6srd
+
+
+ Flow_0lofu08
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Flow_0lofu08
+ Flow_1vp6srd
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_move.bpmn b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_move.bpmn
new file mode 100644
index 0000000000..1b88dff42c
--- /dev/null
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_move.bpmn
@@ -0,0 +1,65 @@
+
+
+
+
+ Flow_1vp6srd
+
+
+ Flow_0lofu08
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Flow_0lofu08
+ Flow_1vp6srd
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_read.bpmn b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_read.bpmn
new file mode 100644
index 0000000000..ce2472e664
--- /dev/null
+++ b/connectors-e2e-test/connectors-e2e-test-mail/src/test/resources/email-inbound-connector-intermediate_unseen_read.bpmn
@@ -0,0 +1,64 @@
+
+
+
+
+ Flow_1vp6srd
+
+
+ Flow_0lofu08
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Flow_0lofu08
+ Flow_1vp6srd
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/JakartaEmailListener.java b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/JakartaEmailListener.java
index 78b4958ce0..452ca52799 100644
--- a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/JakartaEmailListener.java
+++ b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/JakartaEmailListener.java
@@ -11,12 +11,9 @@
import io.camunda.connector.email.client.jakarta.utils.JakartaUtils;
import java.util.Objects;
import java.util.concurrent.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class JakartaEmailListener implements EmailListener {
- private static final Logger log = LoggerFactory.getLogger(JakartaEmailListener.class);
private ScheduledExecutorService scheduledExecutorService;
private PollingManager pollingManager;
diff --git a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/PollingManager.java b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/PollingManager.java
index ea7cf8cc16..b604379f31 100644
--- a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/PollingManager.java
+++ b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/inbound/PollingManager.java
@@ -17,23 +17,29 @@
import java.util.Arrays;
import java.util.Objects;
import org.eclipse.angus.mail.imap.IMAPMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PollingManager {
+ private static final Logger log = LoggerFactory.getLogger(PollingManager.class);
private final InboundConnectorContext connectorContext;
private final EmailListenerConfig emailListenerConfig;
private final JakartaUtils jakartaUtils;
private final Folder folder;
private final Store store;
+ private final Authentication authentication;
public PollingManager(
InboundConnectorContext connectorContext,
EmailListenerConfig emailListenerConfig,
+ Authentication authentication,
JakartaUtils jakartaUtils,
Folder folder,
Store store) {
this.connectorContext = connectorContext;
this.emailListenerConfig = emailListenerConfig;
+ this.authentication = authentication;
this.jakartaUtils = jakartaUtils;
this.folder = folder;
this.store = store;
@@ -52,16 +58,15 @@ public static PollingManager create(
jakartaUtils.createSession(emailInboundConnectorProperties.data().imapConfig());
store = session.getStore();
jakartaUtils.connectStore(store, authentication);
- folder =
- jakartaUtils.findImapFolder(
- store.getDefaultFolder(), emailListenerConfig.folderToListen());
+ folder = jakartaUtils.findImapFolder(store, emailListenerConfig.folderToListen());
folder.open(Folder.READ_WRITE);
if (emailListenerConfig.pollingConfig().handlingStrategy().equals(HandlingStrategy.MOVE)
&& (Objects.isNull(emailListenerConfig.pollingConfig().targetFolder())
|| emailListenerConfig.pollingConfig().targetFolder().isBlank()))
throw new RuntimeException(
"If the post process action is `MOVE`, a target folder must be specified");
- return new PollingManager(connectorContext, emailListenerConfig, jakartaUtils, folder, store);
+ return new PollingManager(
+ connectorContext, emailListenerConfig, authentication, jakartaUtils, folder, store);
} catch (MessagingException e) {
try {
if (folder != null && folder.isOpen()) {
@@ -78,12 +83,32 @@ public static PollingManager create(
}
public void poll() {
+ this.prepareForPolling();
switch (this.emailListenerConfig.pollingConfig()) {
case PollAll pollAll -> pollAllAndProcess(pollAll);
case PollUnseen pollUnseen -> pollUnseenAndProcess(pollUnseen);
}
}
+ private void prepareForPolling() {
+ if (!this.store.isConnected()) {
+ try {
+ this.jakartaUtils.connectStore(store, authentication);
+ } catch (MessagingException e) {
+ log.error("Could not reconnect to store", e);
+ throw new RuntimeException("Could not reconnect to store");
+ }
+ }
+ if (!this.folder.isOpen()) {
+ try {
+ this.folder.open(Folder.READ_WRITE);
+ } catch (MessagingException e) {
+ log.error("Could not reopen folder", e);
+ throw new RuntimeException("Could not reopen folder");
+ }
+ }
+ }
+
private void pollAllAndProcess(PollAll pollAll) {
try {
Message[] messages = this.folder.getMessages();
diff --git a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/outbound/JakartaEmailActionExecutor.java b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/outbound/JakartaEmailActionExecutor.java
index c0040c4d8a..69e2fa4340 100644
--- a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/outbound/JakartaEmailActionExecutor.java
+++ b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/outbound/JakartaEmailActionExecutor.java
@@ -64,9 +64,8 @@ private List imapSearchEmails(
ImapSearchEmails imapSearchEmails, Authentication authentication, Session session) {
try (Store store = session.getStore()) {
this.jakartaUtils.connectStore(store, authentication);
- Folder defaultFolder = store.getDefaultFolder();
String targetFolder = imapSearchEmails.searchEmailFolder();
- try (Folder imapFolder = this.jakartaUtils.findImapFolder(defaultFolder, targetFolder)) {
+ try (Folder imapFolder = this.jakartaUtils.findImapFolder(store, targetFolder)) {
return searchEmails(imapFolder, imapSearchEmails.criteria());
}
} catch (MessagingException e) {
@@ -78,9 +77,8 @@ private ReadEmailResponse imapReadEmail(
ImapReadEmail imapReadEmail, Authentication authentication, Session session) {
try (Store store = session.getStore()) {
this.jakartaUtils.connectStore(store, authentication);
- Folder defaultFolder = store.getDefaultFolder();
String targetFolder = imapReadEmail.readEmailFolder();
- try (Folder imapFolder = this.jakartaUtils.findImapFolder(defaultFolder, targetFolder)) {
+ try (Folder imapFolder = this.jakartaUtils.findImapFolder(store, targetFolder)) {
imapFolder.open(Folder.READ_ONLY);
Message[] messages = imapFolder.search(new MessageIDTerm(imapReadEmail.messageId()));
return Arrays.stream(messages)
@@ -108,9 +106,8 @@ private DeleteEmailResponse imapDeleteEmail(
ImapDeleteEmail imapDeleteEmail, Authentication authentication, Session session) {
try (Store store = session.getStore()) {
this.jakartaUtils.connectStore(store, authentication);
- Folder defaultFolder = store.getDefaultFolder();
String targetFolder = imapDeleteEmail.deleteEmailFolder();
- try (Folder folder = this.jakartaUtils.findImapFolder(defaultFolder, targetFolder)) {
+ try (Folder folder = this.jakartaUtils.findImapFolder(store, targetFolder)) {
return deleteEmail(folder, imapDeleteEmail.messageId());
}
} catch (MessagingException e) {
@@ -122,18 +119,9 @@ private MoveEmailResponse imapMoveEmails(
ImapMoveEmail imapMoveEmail, Authentication authentication, Session session) {
try (Store store = session.getStore()) {
this.jakartaUtils.connectStore(store, authentication);
- Folder rootFolder = store.getDefaultFolder();
String fromFolder = imapMoveEmail.fromFolder();
- String toFolder = imapMoveEmail.toFolder();
- Folder sourceImapFolder = this.jakartaUtils.findImapFolder(rootFolder, fromFolder);
- if (!sourceImapFolder.exists()) throw new MessagingException("Source folder does not exist");
+ Folder sourceImapFolder = this.jakartaUtils.findImapFolder(store, fromFolder);
sourceImapFolder.open(Folder.READ_WRITE);
- Folder targetImapFolder =
- store.getFolder(
- String.join(String.valueOf(rootFolder.getSeparator()), toFolder.split("\\.")));
- if (!targetImapFolder.exists()) targetImapFolder.create(Folder.HOLDS_MESSAGES);
- targetImapFolder.open(Folder.READ_WRITE);
-
Message[] messages = sourceImapFolder.search(new MessageIDTerm(imapMoveEmail.messageId()));
Message message =
Arrays.stream(messages)
@@ -143,10 +131,8 @@ private MoveEmailResponse imapMoveEmails(
new MessagingException(
"Email with messageId %s does not exist"
.formatted(imapMoveEmail.messageId())));
- sourceImapFolder.copyMessages(new Message[] {message}, targetImapFolder);
- this.jakartaUtils.markAsDeleted(message);
+ this.jakartaUtils.moveMessage(store, message, imapMoveEmail.toFolder());
sourceImapFolder.close();
- targetImapFolder.close();
return new MoveEmailResponse(
imapMoveEmail.messageId(), imapMoveEmail.fromFolder(), imapMoveEmail.toFolder());
} catch (MessagingException e) {
@@ -158,9 +144,8 @@ private List imapListEmails(
ImapListEmails imapListEmails, Authentication authentication, Session session) {
try (Store store = session.getStore()) {
this.jakartaUtils.connectStore(store, authentication);
- Folder rootFolder = store.getDefaultFolder();
String targetFolder = imapListEmails.listEmailsFolder();
- try (Folder imapFolder = this.jakartaUtils.findImapFolder(rootFolder, targetFolder)) {
+ try (Folder imapFolder = this.jakartaUtils.findImapFolder(store, targetFolder)) {
imapFolder.open(Folder.READ_ONLY);
return Arrays.stream(imapFolder.getMessages())
.map(this.jakartaUtils::createBodylessEmail)
diff --git a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/utils/JakartaUtils.java b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/utils/JakartaUtils.java
index 0157bc5196..a4a227d815 100644
--- a/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/utils/JakartaUtils.java
+++ b/connectors/email/src/main/java/io/camunda/connector/email/client/jakarta/utils/JakartaUtils.java
@@ -33,6 +33,7 @@
public class JakartaUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(JakartaUtils.class);
+ private static final String REGEX_PATH_SPLITTER = "[./]";
public Session createSession(Configuration configuration) {
return Session.getInstance(
@@ -165,31 +166,19 @@ public Comparator retrieveEmailComparator(
};
}
- private Folder findFolderRecursively(Folder rootFolder, String targetFolder)
- throws MessagingException {
- if (targetFolder == null || targetFolder.isEmpty() || "INBOX".equals(targetFolder)) {
- return rootFolder.getFolder("INBOX");
- }
- Folder[] folders = rootFolder.list();
- for (Folder folder : folders) {
- if (folder.getName().equals(targetFolder)) {
- return folder;
- } else {
- Folder folderReturned = findFolderRecursively(folder, targetFolder);
- if (folderReturned != null) {
- return folderReturned;
- }
- }
- }
- return null;
- }
-
- public Folder findImapFolder(Folder rootFolder, String targetFolder) throws MessagingException {
- Folder folder = findFolderRecursively(rootFolder, targetFolder);
- if (folder != null) {
- return folder;
+ public Folder findImapFolder(Store store, String folderPath) throws MessagingException {
+ if (folderPath == null || folderPath.isEmpty() || "INBOX".equalsIgnoreCase(folderPath)) {
+ return store.getFolder("INBOX");
}
- throw new MessagingException("Unable to find IMAP folder");
+ char separator = store.getDefaultFolder().getSeparator();
+ String formattedPath =
+ Optional.of(folderPath)
+ .map(string -> string.split(REGEX_PATH_SPLITTER))
+ .map(strings -> String.join(String.valueOf(separator), strings))
+ .orElseThrow(() -> new RuntimeException("No folder has been set"));
+ Folder folder = store.getFolder(formattedPath);
+ if (!folder.exists()) throw new RuntimeException("Folder " + formattedPath + " does not exist");
+ return folder;
}
public Email createBodylessEmail(Message message) {
@@ -321,9 +310,9 @@ public void moveMessage(Store store, Message message, String targetFolder) {
char separator = imapFolder.getSeparator();
String targetFolderFormatted =
Optional.ofNullable(targetFolder)
- .map(string -> string.split("\\."))
+ .map(string -> string.split(REGEX_PATH_SPLITTER))
.map(strings -> String.join(String.valueOf(separator), strings))
- .orElse("temp");
+ .orElseThrow(() -> new RuntimeException("No folder has been set"));
Folder targetImapFolder = store.getFolder(targetFolderFormatted);
if (!targetImapFolder.exists()) targetImapFolder.create(Folder.HOLDS_MESSAGES);
targetImapFolder.open(Folder.READ_WRITE);
diff --git a/connectors/email/src/test/java/io/camunda/connector/email/client/jakarta/JakartaExecutorTest.java b/connectors/email/src/test/java/io/camunda/connector/email/client/jakarta/JakartaExecutorTest.java
index 5250fda8f8..cd6d95fc79 100644
--- a/connectors/email/src/test/java/io/camunda/connector/email/client/jakarta/JakartaExecutorTest.java
+++ b/connectors/email/src/test/java/io/camunda/connector/email/client/jakarta/JakartaExecutorTest.java
@@ -21,10 +21,7 @@
import io.camunda.connector.email.outbound.protocols.Protocol;
import io.camunda.connector.email.outbound.protocols.Smtp;
import io.camunda.connector.email.outbound.protocols.actions.*;
-import io.camunda.connector.email.response.DeleteEmailResponse;
-import io.camunda.connector.email.response.ListEmailsResponse;
-import io.camunda.connector.email.response.ReadEmailResponse;
-import io.camunda.connector.email.response.SearchEmailsResponse;
+import io.camunda.connector.email.response.*;
import jakarta.mail.*;
import java.io.IOException;
import java.nio.file.Files;
@@ -513,6 +510,68 @@ void executeImapSearchEmails() throws MessagingException, IOException {
Assertions.assertInstanceOf(List.class, object);
}
+ @Test
+ void executeImapMoveEmail() throws MessagingException, IOException {
+ JakartaUtils sessionFactory = mock(JakartaUtils.class);
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JakartaEmailActionExecutor actionExecutor =
+ JakartaEmailActionExecutor.create(sessionFactory, objectMapper);
+
+ EmailRequest emailRequest = mock(EmailRequest.class);
+ ImapMoveEmail imapMoveEmail = mock(ImapMoveEmail.class);
+ SimpleAuthentication simpleAuthentication = mock(SimpleAuthentication.class);
+ Protocol protocol = mock(Imap.class);
+ Session session = mock(Session.class);
+ Store store = mock(Store.class);
+ Folder folder = mock(Folder.class);
+ Folder defaultFolder = mock(Folder.class);
+ Folder targetFolder = mock(Folder.class);
+ Message message = mock(Message.class);
+
+ when(sessionFactory.createSession(any())).thenReturn(session);
+
+ // Authentication
+ when(simpleAuthentication.username()).thenReturn("user");
+ when(simpleAuthentication.password()).thenReturn("secret");
+ doNothing().when(store).connect(any(), any());
+
+ when(sessionFactory.findImapFolder(any(), any())).thenReturn(folder);
+ when(folder.search(any())).thenReturn(new Message[] {message});
+ when(store.getDefaultFolder()).thenReturn(defaultFolder);
+ when(defaultFolder.getSeparator()).thenReturn('|');
+ when(folder.exists()).thenReturn(Boolean.TRUE);
+ when(message.getContent()).thenReturn("string");
+ when(message.isMimeType("text/plain")).thenReturn(true);
+ when(message.getHeader(any())).thenReturn(new String[] {"1"});
+ when(emailRequest.authentication()).thenReturn(simpleAuthentication);
+ when(session.getProperties()).thenReturn(new Properties());
+ when(session.getStore()).thenReturn(store);
+ when(emailRequest.data()).thenReturn(protocol);
+ when(protocol.getProtocolAction()).thenReturn(imapMoveEmail);
+ when(imapMoveEmail.fromFolder()).thenReturn("");
+ when(imapMoveEmail.toFolder()).thenReturn("test.to/folder");
+ when(store.getFolder("test|to|folder")).thenReturn(targetFolder);
+ when(sessionFactory.createBodylessEmail(any()))
+ .thenReturn(
+ new Email(
+ null,
+ "1",
+ "",
+ List.of(),
+ "",
+ List.of(""),
+ List.of(""),
+ OffsetDateTime.now(),
+ OffsetDateTime.now(),
+ 1));
+ doNothing().when(store).connect(any(), any());
+
+ Object object = actionExecutor.execute(emailRequest);
+
+ Assertions.assertInstanceOf(MoveEmailResponse.class, object);
+ }
+
@Test
void executeImapSearchEmailsCriteriaSpecification() throws MessagingException {
JakartaUtils sessionFactory = mock(JakartaUtils.class);
diff --git a/connectors/idp-extraction/element-templates/hybrid/hybrid-idp-extraction-outbound-connector-hybrid.json b/connectors/idp-extraction/element-templates/hybrid/hybrid-idp-extraction-outbound-connector-hybrid.json
index b8536f62ab..3db1d2c91a 100644
--- a/connectors/idp-extraction/element-templates/hybrid/hybrid-idp-extraction-outbound-connector-hybrid.json
+++ b/connectors/idp-extraction/element-templates/hybrid/hybrid-idp-extraction-outbound-connector-hybrid.json
@@ -170,7 +170,7 @@
},
"type" : "String"
} ],
- "icon": {
- "contents": ""
+ "icon" : {
+ "contents" : ""
}
}
\ No newline at end of file
diff --git a/connectors/idp-extraction/element-templates/idp-extraction-outbound-connector.json b/connectors/idp-extraction/element-templates/idp-extraction-outbound-connector.json
index 6edcaa712f..43b67c216b 100644
--- a/connectors/idp-extraction/element-templates/idp-extraction-outbound-connector.json
+++ b/connectors/idp-extraction/element-templates/idp-extraction-outbound-connector.json
@@ -165,7 +165,7 @@
},
"type" : "String"
} ],
- "icon": {
- "contents": ""
+ "icon" : {
+ "contents" : ""
}
}
\ No newline at end of file