diff --git a/core/src/main/java/io/aiven/klaw/model/enums/MailType.java b/core/src/main/java/io/aiven/klaw/model/enums/MailType.java index 0dfdaaf178..2dd27ce94f 100644 --- a/core/src/main/java/io/aiven/klaw/model/enums/MailType.java +++ b/core/src/main/java/io/aiven/klaw/model/enums/MailType.java @@ -23,6 +23,7 @@ public enum MailType { REGISTER_USER_REQUEST, TOPIC_UPDATE_REQUESTED, SCHEMA_PROMOTION_REQUESTED, + SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS, NEW_USER_ADDED, RESET_CONSUMER_OFFSET_REQUESTED, RESET_CONSUMER_OFFSET_APPROVED, diff --git a/core/src/main/java/io/aiven/klaw/service/MailUtils.java b/core/src/main/java/io/aiven/klaw/service/MailUtils.java index 0fd52a5805..f2447f308e 100644 --- a/core/src/main/java/io/aiven/klaw/service/MailUtils.java +++ b/core/src/main/java/io/aiven/klaw/service/MailUtils.java @@ -11,6 +11,7 @@ import io.aiven.klaw.model.enums.MailType; import io.aiven.klaw.model.enums.PermissionType; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -126,7 +127,7 @@ void sendMail( getUserName(SecurityContextHolder.getContext().getAuthentication().getPrincipal())) .getTenantId(); loadKwProps(tenantId); - Boolean requiresApproval = false; + boolean requiresApproval = false; switch (mailType) { case TOPIC_CREATE_REQUESTED -> { formattedStr = String.format(topicRequestMail, "'" + topicName + "'"); @@ -277,6 +278,41 @@ void sendMail( loginUrl); } + void notifySubscribersOnSchemaChange( + MailType mailType, + String topicName, + String envName, + String teamName, + List toMailIds, + String ccOwnerTeamMailId, + int tenantId, + String loginUrl) { + String subject, formattedStr; + if (mailType == MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS) { + subject = "New schema on a topic"; + formattedStr = + "A schema has been uploaded on Topic :" + + topicName + + " Environment : " + + envName + + " by team : " + + teamName; + String finalSubject = subject; + String finalFormattedStr = formattedStr; + CompletableFuture.runAsync( + () -> { + emailService.sendSimpleMessage( + toMailIds, + Collections.singletonList(ccOwnerTeamMailId), + Collections.emptyList(), + finalSubject, + finalFormattedStr, + tenantId, + loginUrl); + }); + } + } + void sendMail(String username, String pwd, HandleDbRequests dbHandle, String loginUrl) { String formattedStr, subject; int tenantId = diff --git a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java index 2ceffb7e9c..20558627ed 100644 --- a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java +++ b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java @@ -18,9 +18,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.aiven.klaw.config.ManageDatabase; +import io.aiven.klaw.dao.Acl; import io.aiven.klaw.dao.Env; import io.aiven.klaw.dao.KwClusters; import io.aiven.klaw.dao.SchemaRequest; +import io.aiven.klaw.dao.Team; import io.aiven.klaw.dao.Topic; import io.aiven.klaw.dao.UserInfo; import io.aiven.klaw.error.KlawException; @@ -267,6 +269,10 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException if (responseDb.equals(ApiResultStatus.SUCCESS.value)) { saveToTopicHistory(userDetails, tenantId, schemaRequest); } + + // send mail to producers and consumers + notifySubscribers(schemaRequest, tenantId); + mailService.sendMail( schemaRequest.getTopicname(), null, @@ -294,6 +300,56 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException } } + public void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { + String topic = schemaRequest.getTopicname(); + String schemaRequestEnvironment = schemaRequest.getEnvironment(); + + Optional optSchemaEnv = + manageDatabase.getEnv(tenantId, Integer.valueOf(schemaRequestEnvironment)); + Optional optionalKafkaEnv = Optional.empty(); + // add null pointer checks. + if (optSchemaEnv.isPresent()) { + optionalKafkaEnv = + manageDatabase.getEnv( + tenantId, Integer.valueOf(optSchemaEnv.get().getAssociatedEnv().getId())); + } + + // get all producer and consumer acls for topic, schemaRequestEnvironment + List acls = new ArrayList<>(); + if (optionalKafkaEnv.isPresent()) { + acls = + manageDatabase + .getHandleDbRequests() + .getSyncAcls(optionalKafkaEnv.get().getId(), topic, tenantId); + } + + // get all teams to be notified based on above acls + List teamIdsToBeNotified = acls.stream().map(Acl::getTeamId).toList(); + List teamsToBeNotified = + manageDatabase.getTeamObjForTenant(tenantId).stream() + .filter(team -> teamIdsToBeNotified.contains(team.getTeamId())) + .map(Team::getTeammail) + .toList(); + + Optional optionalOwnerTeam = + manageDatabase.getTeamObjForTenant(tenantId).stream() + .filter(team -> Objects.equals(team.getTeamId(), schemaRequest.getTeamId())) + .findFirst(); + + // send notifications + if (optionalOwnerTeam.isPresent() && optionalKafkaEnv.isPresent()) { + mailService.notifySubscribersOnSchemaChange( + SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS, + topic, + optionalKafkaEnv.get().getName(), + optionalOwnerTeam.get().getTeamname(), + teamsToBeNotified, + optionalOwnerTeam.get().getTeammail(), + tenantId, + commonUtilsService.getLoginUrl()); + } + } + private void saveToTopicHistory(String userDetails, int tenantId, SchemaRequest schemaRequest) { manageDatabase.getKafkaEnvList(tenantId).stream() .filter( diff --git a/core/src/test/java/io/aiven/klaw/UtilMethods.java b/core/src/test/java/io/aiven/klaw/UtilMethods.java index c12f8e0ed8..20aeb38849 100644 --- a/core/src/test/java/io/aiven/klaw/UtilMethods.java +++ b/core/src/test/java/io/aiven/klaw/UtilMethods.java @@ -7,6 +7,7 @@ import io.aiven.klaw.dao.AclRequests; import io.aiven.klaw.dao.ActivityLog; import io.aiven.klaw.dao.Env; +import io.aiven.klaw.dao.EnvTag; import io.aiven.klaw.dao.KwClusters; import io.aiven.klaw.dao.KwKafkaConnector; import io.aiven.klaw.dao.KwTenants; @@ -271,6 +272,23 @@ public List getAcls() { return allTopicReqs; } + public List getSyncAcls() { + List allTopicReqs = new ArrayList<>(); + Acl acl1 = new Acl(); + acl1.setTeamId(102); + acl1.setAclType(AclType.PRODUCER.value); + acl1.setTenantId(101); + allTopicReqs.add(acl1); + + Acl acl2 = new Acl(); + acl2.setTeamId(103); + acl2.setAclType(AclType.CONSUMER.value); + acl2.setTenantId(101); + allTopicReqs.add(acl2); + + return allTopicReqs; + } + public List getAclInfoList() { List allTopicReqs = new ArrayList<>(); AclInfo topicRequest = new AclInfo(); @@ -469,6 +487,39 @@ public List getTeams() { return allTopicReqs; } + public List getTeamsForTenant() { + List teams = new ArrayList<>(); + Team team1 = new Team(); + team1.setTeamname("Seahorses"); + team1.setTeamId(101); + team1.setContactperson("Contact Person1"); + team1.setTenantId(101); + team1.setTeamphone("3142342343242"); + team1.setTeammail("test1@test.com"); + + Team team2 = new Team(); + team2.setTeamname("Octopus"); + team2.setTeamId(102); + team2.setContactperson("Contact Person2"); + team2.setTenantId(101); + team2.setTeamphone("3142342343242"); + team2.setTeammail("test2@test.com"); + + Team team3 = new Team(); + team3.setTeamname("Dragons"); + team3.setTeamId(103); + team3.setContactperson("Contact Person3"); + team3.setTenantId(101); + team3.setTeamphone("3142342343242"); + team3.setTeammail("test3@test.com"); + + teams.add(team1); + teams.add(team2); + teams.add(team3); + + return teams; + } + public List getTeamsModel() { List allTopicReqs = new ArrayList<>(); TeamModelResponse team = new TeamModelResponse(); @@ -703,6 +754,29 @@ public List getEnvLists() { return envList; } + public List getKafkaEnvs() { + List envList = new ArrayList<>(); + Env env = new Env(); + env.setId("1"); + env.setName("DEV"); + envList.add(env); + env.setClusterId(1); + env.setTenantId(101); + env.setAssociatedEnv(new EnvTag("3", "DEV")); + return envList; + } + + public Env getSchemaEnv() { + Env env = new Env(); + env.setId("3"); + env.setName("DEV"); + env.setClusterId(3); + env.setTenantId(101); + env.setAssociatedEnv(new EnvTag("1", "DEV")); + + return env; + } + public List getEnvListsIncorrect1() { List envList = new ArrayList<>(); Env env = new Env(); diff --git a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java index aefb35e864..155928eabe 100644 --- a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java +++ b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java @@ -1,5 +1,6 @@ package io.aiven.klaw.service; +import static io.aiven.klaw.model.enums.MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -14,6 +15,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.aiven.klaw.UtilMethods; import io.aiven.klaw.config.ManageDatabase; import io.aiven.klaw.dao.Env; import io.aiven.klaw.dao.EnvTag; @@ -84,6 +86,7 @@ public class SchemaRegistryControllerServiceTest { private ObjectMapper mapper = new ObjectMapper(); private Env env; + private UtilMethods utilMethods; @Captor private ArgumentCaptor schemaRequestCaptor; @@ -109,6 +112,7 @@ public void setUp() throws Exception { Boolean validateOnSave = true; ReflectionTestUtils.setField( schemaRegistryControllerService, "validateCompatiblityOnSave", validateOnSave); + utilMethods = new UtilMethods(); } private void loginMock() { @@ -751,6 +755,34 @@ public void getListofSchemaRequestsIn_OLDEST_FIRST_ORDER() { } } + @Test + public void notifySubscribers() { + int tenantId = 101; + SchemaRequest schemaRequest = getSchemasReq(); + when(manageDatabase.getEnv(tenantId, Integer.valueOf(schemaRequest.getEnvironment()))) + .thenReturn(Optional.of(utilMethods.getSchemaEnv())); + when(manageDatabase.getEnv(tenantId, Integer.valueOf(utilMethods.getSchemaEnv().getId()))) + .thenReturn(Optional.of(utilMethods.getKafkaEnvs().get(0))); + + when(handleDbRequests.getSyncAcls( + utilMethods.getKafkaEnvs().get(0).getId(), schemaRequest.getTopicname(), tenantId)) + .thenReturn(utilMethods.getSyncAcls()); + when(manageDatabase.getTeamObjForTenant(tenantId)).thenReturn(utilMethods.getTeamsForTenant()); + schemaRegistryControllerService.notifySubscribers(schemaRequest, tenantId); + verify(mailService, times(1)) + .notifySubscribersOnSchemaChange( + SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS, + schemaRequest.getTopicname(), + env.getName(), + utilMethods.getTeamsForTenant().get(0).getTeamname(), + Arrays.asList( + utilMethods.getTeamsForTenant().get(1).getTeammail(), + utilMethods.getTeamsForTenant().get(2).getTeammail()), + utilMethods.getTeamsForTenant().get(0).getTeammail(), + tenantId, + commonUtilsService.getLoginUrl()); + } + private static SchemaRequestModel createDefaultSchemaRequestModel() { SchemaRequestModel schemaRequest = new SchemaRequestModel(); schemaRequest.setSchemafull("{}"); @@ -885,6 +917,17 @@ private SchemaPromotion buildPromoteSchemaRequest(boolean isForceRegister, Strin return schema; } + private SchemaRequest getSchemasReq() { + SchemaRequest schReq = new SchemaRequest(); + schReq.setEnvironment("3"); + schReq.setRequestStatus(RequestStatus.CREATED.value); + schReq.setTeamId(101); + schReq.setRequesttime(new Timestamp(System.currentTimeMillis())); + schReq.setTopicname("testtopic"); + + return schReq; + } + private List getSchemasReqs() { List schList = new ArrayList<>(); SchemaRequest schReq = new SchemaRequest(); @@ -892,6 +935,7 @@ private List getSchemasReqs() { schReq.setRequestStatus(RequestStatus.CREATED.value); schReq.setTeamId(101); schReq.setRequesttime(new Timestamp(System.currentTimeMillis())); + schReq.setTopicname("testtopic"); schList.add(schReq); schReq = new SchemaRequest();