From 4021669b1208427bba7167d4d082b270067e42e1 Mon Sep 17 00:00:00 2001 From: muralibasani Date: Tue, 28 Nov 2023 12:52:33 +0100 Subject: [PATCH 1/4] Notify acl owners on schema changes Signed-off-by: muralibasani --- .../io/aiven/klaw/model/enums/MailType.java | 3 +- .../java/io/aiven/klaw/service/MailUtils.java | 38 +++++++++++++++++- .../SchemaRegistryControllerService.java | 40 +++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/aiven/klaw/model/enums/MailType.java b/core/src/main/java/io/aiven/klaw/model/enums/MailType.java index 0dfdaaf178..6f95271aa2 100644 --- a/core/src/main/java/io/aiven/klaw/model/enums/MailType.java +++ b/core/src/main/java/io/aiven/klaw/model/enums/MailType.java @@ -23,8 +23,9 @@ public enum MailType { REGISTER_USER_REQUEST, TOPIC_UPDATE_REQUESTED, SCHEMA_PROMOTION_REQUESTED, + SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS, NEW_USER_ADDED, RESET_CONSUMER_OFFSET_REQUESTED, RESET_CONSUMER_OFFSET_APPROVED, - RESET_CONSUMER_OFFSET_DENIED + RESET_CONSUMER_OFFSET_DENIED, } diff --git a/core/src/main/java/io/aiven/klaw/service/MailUtils.java b/core/src/main/java/io/aiven/klaw/service/MailUtils.java index 0fd52a5805..dc3fb31e69 100644 --- a/core/src/main/java/io/aiven/klaw/service/MailUtils.java +++ b/core/src/main/java/io/aiven/klaw/service/MailUtils.java @@ -11,6 +11,7 @@ import io.aiven.klaw.model.enums.MailType; import io.aiven.klaw.model.enums.PermissionType; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -126,7 +127,7 @@ void sendMail( getUserName(SecurityContextHolder.getContext().getAuthentication().getPrincipal())) .getTenantId(); loadKwProps(tenantId); - Boolean requiresApproval = false; + boolean requiresApproval = false; switch (mailType) { case TOPIC_CREATE_REQUESTED -> { formattedStr = String.format(topicRequestMail, "'" + topicName + "'"); @@ -277,6 +278,41 @@ void sendMail( loginUrl); } + void notifySubscribersOnSchemaChange( + MailType mailType, + String topicName, + String envName, + String teamName, + List toMailIds, + String ccOwnerTeamMailId, + int tenantId, + String loginUrl) { + String subject, formattedStr; + if (mailType == MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS) { + subject = "New schema on a topic"; + formattedStr = + "A schema has been uploaded on Topic :" + + topicName + + " Environment : " + + envName + + " by team : " + + teamName; + String finalSubject = subject; + String finalFormattedStr = formattedStr; + CompletableFuture.runAsync( + () -> { + emailService.sendSimpleMessage( + toMailIds, + Collections.singletonList(ccOwnerTeamMailId), + Collections.singletonList(null), + finalSubject, + finalFormattedStr, + tenantId, + loginUrl); + }); + } + } + void sendMail(String username, String pwd, HandleDbRequests dbHandle, String loginUrl) { String formattedStr, subject; int tenantId = diff --git a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java index 2ceffb7e9c..4f4866576c 100644 --- a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java +++ b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java @@ -18,9 +18,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.aiven.klaw.config.ManageDatabase; +import io.aiven.klaw.dao.Acl; import io.aiven.klaw.dao.Env; import io.aiven.klaw.dao.KwClusters; import io.aiven.klaw.dao.SchemaRequest; +import io.aiven.klaw.dao.Team; import io.aiven.klaw.dao.Topic; import io.aiven.klaw.dao.UserInfo; import io.aiven.klaw.error.KlawException; @@ -267,6 +269,10 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException if (responseDb.equals(ApiResultStatus.SUCCESS.value)) { saveToTopicHistory(userDetails, tenantId, schemaRequest); } + + // send mail to producers and consumers + notifySubscribers(schemaRequest, tenantId); + mailService.sendMail( schemaRequest.getTopicname(), null, @@ -294,6 +300,40 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException } } + private void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { + String topic = schemaRequest.getTopicname(); + String environment = schemaRequest.getEnvironment(); + // get all producer and consumer acls for topic, environment + List acls = manageDatabase.getHandleDbRequests().getSyncAcls(environment, topic, tenantId); + + // get all teams to be notified based on above acls + List teamIdsToBeNotified = acls.stream().map(Acl::getTeamId).toList(); + List teamsToBeNotified = + manageDatabase.getTeamObjForTenant(tenantId).stream() + .filter(team -> teamIdsToBeNotified.contains(team.getTeamId())) + .map(Team::getTeammail) + .toList(); + + Optional optionalOwnerTeam = + manageDatabase.getTeamObjForTenant(tenantId).stream() + .filter(team -> Objects.equals(team.getTeamId(), schemaRequest.getTeamId())) + .findFirst(); + Optional optionalEnv = manageDatabase.getEnv(tenantId, Integer.valueOf(environment)); + + // send notifications + if (optionalOwnerTeam.isPresent() && optionalEnv.isPresent()) { + mailService.notifySubscribersOnSchemaChange( + SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS, + topic, + optionalEnv.get().getName(), + optionalOwnerTeam.get().getTeamname(), + teamsToBeNotified, + optionalOwnerTeam.get().getTeamname(), + tenantId, + commonUtilsService.getLoginUrl()); + } + } + private void saveToTopicHistory(String userDetails, int tenantId, SchemaRequest schemaRequest) { manageDatabase.getKafkaEnvList(tenantId).stream() .filter( From 4f88f8d507751c67fa8a2d761ad39bdc3cad0ef6 Mon Sep 17 00:00:00 2001 From: muralibasani Date: Tue, 28 Nov 2023 15:48:49 +0100 Subject: [PATCH 2/4] Add test Signed-off-by: muralibasani --- .../java/io/aiven/klaw/service/MailUtils.java | 2 +- .../SchemaRegistryControllerService.java | 31 +++++++-- .../test/java/io/aiven/klaw/UtilMethods.java | 63 +++++++++++++++++++ .../SchemaRegistryControllerServiceTest.java | 43 +++++++++++++ 4 files changed, 132 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/aiven/klaw/service/MailUtils.java b/core/src/main/java/io/aiven/klaw/service/MailUtils.java index dc3fb31e69..f2447f308e 100644 --- a/core/src/main/java/io/aiven/klaw/service/MailUtils.java +++ b/core/src/main/java/io/aiven/klaw/service/MailUtils.java @@ -304,7 +304,7 @@ void notifySubscribersOnSchemaChange( emailService.sendSimpleMessage( toMailIds, Collections.singletonList(ccOwnerTeamMailId), - Collections.singletonList(null), + Collections.emptyList(), finalSubject, finalFormattedStr, tenantId, diff --git a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java index 4f4866576c..8b89cc90aa 100644 --- a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java +++ b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java @@ -300,11 +300,31 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException } } - private void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { + public void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { String topic = schemaRequest.getTopicname(); - String environment = schemaRequest.getEnvironment(); - // get all producer and consumer acls for topic, environment - List acls = manageDatabase.getHandleDbRequests().getSyncAcls(environment, topic, tenantId); + String schemaRequestEnvironment = schemaRequest.getEnvironment(); + Optional optionalKafkaEnv = + manageDatabase.getKafkaEnvList(tenantId).stream() + .filter( + kafkaEnv -> { + if (kafkaEnv.getAssociatedEnv() != null) { + return kafkaEnv.getAssociatedEnv().getId().equals(schemaRequestEnvironment); + } + return false; + }) + .findFirst(); + + // get all producer and consumer acls for topic, schemaRequestEnvironment + List acls = new ArrayList<>(); + Optional optionalEnv = Optional.empty(); + if (optionalKafkaEnv.isPresent()) { + acls = + manageDatabase + .getHandleDbRequests() + .getSyncAcls(optionalKafkaEnv.get().getId(), topic, tenantId); + optionalEnv = + manageDatabase.getEnv(tenantId, Integer.valueOf(optionalKafkaEnv.get().getId())); + } // get all teams to be notified based on above acls List teamIdsToBeNotified = acls.stream().map(Acl::getTeamId).toList(); @@ -318,7 +338,6 @@ private void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { manageDatabase.getTeamObjForTenant(tenantId).stream() .filter(team -> Objects.equals(team.getTeamId(), schemaRequest.getTeamId())) .findFirst(); - Optional optionalEnv = manageDatabase.getEnv(tenantId, Integer.valueOf(environment)); // send notifications if (optionalOwnerTeam.isPresent() && optionalEnv.isPresent()) { @@ -328,7 +347,7 @@ private void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { optionalEnv.get().getName(), optionalOwnerTeam.get().getTeamname(), teamsToBeNotified, - optionalOwnerTeam.get().getTeamname(), + optionalOwnerTeam.get().getTeammail(), tenantId, commonUtilsService.getLoginUrl()); } diff --git a/core/src/test/java/io/aiven/klaw/UtilMethods.java b/core/src/test/java/io/aiven/klaw/UtilMethods.java index c12f8e0ed8..88a92e9784 100644 --- a/core/src/test/java/io/aiven/klaw/UtilMethods.java +++ b/core/src/test/java/io/aiven/klaw/UtilMethods.java @@ -7,6 +7,7 @@ import io.aiven.klaw.dao.AclRequests; import io.aiven.klaw.dao.ActivityLog; import io.aiven.klaw.dao.Env; +import io.aiven.klaw.dao.EnvTag; import io.aiven.klaw.dao.KwClusters; import io.aiven.klaw.dao.KwKafkaConnector; import io.aiven.klaw.dao.KwTenants; @@ -271,6 +272,23 @@ public List getAcls() { return allTopicReqs; } + public List getSyncAcls() { + List allTopicReqs = new ArrayList<>(); + Acl acl1 = new Acl(); + acl1.setTeamId(102); + acl1.setAclType(AclType.PRODUCER.value); + acl1.setTenantId(101); + allTopicReqs.add(acl1); + + Acl acl2 = new Acl(); + acl2.setTeamId(103); + acl2.setAclType(AclType.CONSUMER.value); + acl2.setTenantId(101); + allTopicReqs.add(acl2); + + return allTopicReqs; + } + public List getAclInfoList() { List allTopicReqs = new ArrayList<>(); AclInfo topicRequest = new AclInfo(); @@ -469,6 +487,39 @@ public List getTeams() { return allTopicReqs; } + public List getTeamsForTenant() { + List teams = new ArrayList<>(); + Team team1 = new Team(); + team1.setTeamname("Seahorses"); + team1.setTeamId(101); + team1.setContactperson("Contact Person1"); + team1.setTenantId(101); + team1.setTeamphone("3142342343242"); + team1.setTeammail("test1@test.com"); + + Team team2 = new Team(); + team2.setTeamname("Octopus"); + team2.setTeamId(102); + team2.setContactperson("Contact Person2"); + team2.setTenantId(101); + team2.setTeamphone("3142342343242"); + team2.setTeammail("test2@test.com"); + + Team team3 = new Team(); + team3.setTeamname("Dragons"); + team3.setTeamId(103); + team3.setContactperson("Contact Person3"); + team3.setTenantId(101); + team3.setTeamphone("3142342343242"); + team3.setTeammail("test3@test.com"); + + teams.add(team1); + teams.add(team2); + teams.add(team3); + + return teams; + } + public List getTeamsModel() { List allTopicReqs = new ArrayList<>(); TeamModelResponse team = new TeamModelResponse(); @@ -703,6 +754,18 @@ public List getEnvLists() { return envList; } + public List getKafkaEnvs() { + List envList = new ArrayList<>(); + Env env = new Env(); + env.setId("1"); + env.setName("DEV"); + envList.add(env); + env.setClusterId(1); + env.setTenantId(101); + env.setAssociatedEnv(new EnvTag("3", "DEV")); + return envList; + } + public List getEnvListsIncorrect1() { List envList = new ArrayList<>(); Env env = new Env(); diff --git a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java index aefb35e864..01c64df50f 100644 --- a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java +++ b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java @@ -1,5 +1,6 @@ package io.aiven.klaw.service; +import static io.aiven.klaw.model.enums.MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -14,6 +15,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.aiven.klaw.UtilMethods; import io.aiven.klaw.config.ManageDatabase; import io.aiven.klaw.dao.Env; import io.aiven.klaw.dao.EnvTag; @@ -84,6 +86,7 @@ public class SchemaRegistryControllerServiceTest { private ObjectMapper mapper = new ObjectMapper(); private Env env; + private UtilMethods utilMethods; @Captor private ArgumentCaptor schemaRequestCaptor; @@ -109,6 +112,7 @@ public void setUp() throws Exception { Boolean validateOnSave = true; ReflectionTestUtils.setField( schemaRegistryControllerService, "validateCompatiblityOnSave", validateOnSave); + utilMethods = new UtilMethods(); } private void loginMock() { @@ -751,6 +755,33 @@ public void getListofSchemaRequestsIn_OLDEST_FIRST_ORDER() { } } + @Test + public void notifySubscribers() { + int tenantId = 101; + SchemaRequest schemaRequest = getSchemasReq(); + when(manageDatabase.getKafkaEnvList(tenantId)).thenReturn(utilMethods.getKafkaEnvs()); + when(handleDbRequests.getSyncAcls( + utilMethods.getKafkaEnvs().get(0).getId(), schemaRequest.getTopicname(), tenantId)) + .thenReturn(utilMethods.getSyncAcls()); + when(manageDatabase.getTeamObjForTenant(tenantId)).thenReturn(utilMethods.getTeamsForTenant()); + when(manageDatabase.getEnv( + tenantId, Integer.valueOf(utilMethods.getKafkaEnvs().get(0).getId()))) + .thenReturn(Optional.of(env)); + schemaRegistryControllerService.notifySubscribers(schemaRequest, tenantId); + verify(mailService, times(1)) + .notifySubscribersOnSchemaChange( + SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS, + schemaRequest.getTopicname(), + env.getName(), + utilMethods.getTeamsForTenant().get(0).getTeamname(), + Arrays.asList( + utilMethods.getTeamsForTenant().get(1).getTeammail(), + utilMethods.getTeamsForTenant().get(2).getTeammail()), + utilMethods.getTeamsForTenant().get(0).getTeammail(), + tenantId, + commonUtilsService.getLoginUrl()); + } + private static SchemaRequestModel createDefaultSchemaRequestModel() { SchemaRequestModel schemaRequest = new SchemaRequestModel(); schemaRequest.setSchemafull("{}"); @@ -885,6 +916,17 @@ private SchemaPromotion buildPromoteSchemaRequest(boolean isForceRegister, Strin return schema; } + private SchemaRequest getSchemasReq() { + SchemaRequest schReq = new SchemaRequest(); + schReq.setEnvironment("3"); + schReq.setRequestStatus(RequestStatus.CREATED.value); + schReq.setTeamId(101); + schReq.setRequesttime(new Timestamp(System.currentTimeMillis())); + schReq.setTopicname("testtopic"); + + return schReq; + } + private List getSchemasReqs() { List schList = new ArrayList<>(); SchemaRequest schReq = new SchemaRequest(); @@ -892,6 +934,7 @@ private List getSchemasReqs() { schReq.setRequestStatus(RequestStatus.CREATED.value); schReq.setTeamId(101); schReq.setRequesttime(new Timestamp(System.currentTimeMillis())); + schReq.setTopicname("testtopic"); schList.add(schReq); schReq = new SchemaRequest(); From 93396158ab538dad87e7f8e1d9c9098d10ddba9c Mon Sep 17 00:00:00 2001 From: muralibasani Date: Tue, 28 Nov 2023 16:34:37 +0100 Subject: [PATCH 3/4] Optimize calls Signed-off-by: muralibasani --- .../io/aiven/klaw/model/enums/MailType.java | 2 +- .../SchemaRegistryControllerService.java | 20 +++++++++---------- .../test/java/io/aiven/klaw/UtilMethods.java | 11 ++++++++++ .../SchemaRegistryControllerServiceTest.java | 6 +++++- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/aiven/klaw/model/enums/MailType.java b/core/src/main/java/io/aiven/klaw/model/enums/MailType.java index 6f95271aa2..2dd27ce94f 100644 --- a/core/src/main/java/io/aiven/klaw/model/enums/MailType.java +++ b/core/src/main/java/io/aiven/klaw/model/enums/MailType.java @@ -27,5 +27,5 @@ public enum MailType { NEW_USER_ADDED, RESET_CONSUMER_OFFSET_REQUESTED, RESET_CONSUMER_OFFSET_APPROVED, - RESET_CONSUMER_OFFSET_DENIED, + RESET_CONSUMER_OFFSET_DENIED } diff --git a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java index 8b89cc90aa..71fa9c7486 100644 --- a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java +++ b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java @@ -303,16 +303,16 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException public void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { String topic = schemaRequest.getTopicname(); String schemaRequestEnvironment = schemaRequest.getEnvironment(); - Optional optionalKafkaEnv = - manageDatabase.getKafkaEnvList(tenantId).stream() - .filter( - kafkaEnv -> { - if (kafkaEnv.getAssociatedEnv() != null) { - return kafkaEnv.getAssociatedEnv().getId().equals(schemaRequestEnvironment); - } - return false; - }) - .findFirst(); + + Optional optSchemaEnv = + manageDatabase.getEnv(tenantId, Integer.valueOf(schemaRequestEnvironment)); + Optional optionalKafkaEnv = Optional.empty(); + // add null pointer checks. + if (optSchemaEnv.isPresent()) { + optionalKafkaEnv = + manageDatabase.getEnv( + tenantId, Integer.valueOf(optSchemaEnv.get().getAssociatedEnv().getId())); + } // get all producer and consumer acls for topic, schemaRequestEnvironment List acls = new ArrayList<>(); diff --git a/core/src/test/java/io/aiven/klaw/UtilMethods.java b/core/src/test/java/io/aiven/klaw/UtilMethods.java index 88a92e9784..20aeb38849 100644 --- a/core/src/test/java/io/aiven/klaw/UtilMethods.java +++ b/core/src/test/java/io/aiven/klaw/UtilMethods.java @@ -766,6 +766,17 @@ public List getKafkaEnvs() { return envList; } + public Env getSchemaEnv() { + Env env = new Env(); + env.setId("3"); + env.setName("DEV"); + env.setClusterId(3); + env.setTenantId(101); + env.setAssociatedEnv(new EnvTag("1", "DEV")); + + return env; + } + public List getEnvListsIncorrect1() { List envList = new ArrayList<>(); Env env = new Env(); diff --git a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java index 01c64df50f..fc7e4a1f88 100644 --- a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java +++ b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java @@ -759,7 +759,11 @@ public void getListofSchemaRequestsIn_OLDEST_FIRST_ORDER() { public void notifySubscribers() { int tenantId = 101; SchemaRequest schemaRequest = getSchemasReq(); - when(manageDatabase.getKafkaEnvList(tenantId)).thenReturn(utilMethods.getKafkaEnvs()); + when(manageDatabase.getEnv(tenantId, Integer.valueOf(schemaRequest.getEnvironment()))) + .thenReturn(Optional.of(utilMethods.getSchemaEnv())); + when(manageDatabase.getEnv(tenantId, Integer.valueOf(utilMethods.getSchemaEnv().getId()))) + .thenReturn(Optional.of(utilMethods.getKafkaEnvs().get(0))); + when(handleDbRequests.getSyncAcls( utilMethods.getKafkaEnvs().get(0).getId(), schemaRequest.getTopicname(), tenantId)) .thenReturn(utilMethods.getSyncAcls()); From e6fbdb00fe631f1d07505c606db07f70eef7aadd Mon Sep 17 00:00:00 2001 From: muralibasani Date: Tue, 28 Nov 2023 17:27:05 +0100 Subject: [PATCH 4/4] Remove an unused var Signed-off-by: muralibasani --- .../klaw/service/SchemaRegistryControllerService.java | 7 ++----- .../klaw/service/SchemaRegistryControllerServiceTest.java | 3 --- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java index 71fa9c7486..20558627ed 100644 --- a/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java +++ b/core/src/main/java/io/aiven/klaw/service/SchemaRegistryControllerService.java @@ -316,14 +316,11 @@ public void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { // get all producer and consumer acls for topic, schemaRequestEnvironment List acls = new ArrayList<>(); - Optional optionalEnv = Optional.empty(); if (optionalKafkaEnv.isPresent()) { acls = manageDatabase .getHandleDbRequests() .getSyncAcls(optionalKafkaEnv.get().getId(), topic, tenantId); - optionalEnv = - manageDatabase.getEnv(tenantId, Integer.valueOf(optionalKafkaEnv.get().getId())); } // get all teams to be notified based on above acls @@ -340,11 +337,11 @@ public void notifySubscribers(SchemaRequest schemaRequest, int tenantId) { .findFirst(); // send notifications - if (optionalOwnerTeam.isPresent() && optionalEnv.isPresent()) { + if (optionalOwnerTeam.isPresent() && optionalKafkaEnv.isPresent()) { mailService.notifySubscribersOnSchemaChange( SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS, topic, - optionalEnv.get().getName(), + optionalKafkaEnv.get().getName(), optionalOwnerTeam.get().getTeamname(), teamsToBeNotified, optionalOwnerTeam.get().getTeammail(), diff --git a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java index fc7e4a1f88..155928eabe 100644 --- a/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java +++ b/core/src/test/java/io/aiven/klaw/service/SchemaRegistryControllerServiceTest.java @@ -768,9 +768,6 @@ public void notifySubscribers() { utilMethods.getKafkaEnvs().get(0).getId(), schemaRequest.getTopicname(), tenantId)) .thenReturn(utilMethods.getSyncAcls()); when(manageDatabase.getTeamObjForTenant(tenantId)).thenReturn(utilMethods.getTeamsForTenant()); - when(manageDatabase.getEnv( - tenantId, Integer.valueOf(utilMethods.getKafkaEnvs().get(0).getId()))) - .thenReturn(Optional.of(env)); schemaRegistryControllerService.notifySubscribers(schemaRequest, tenantId); verify(mailService, times(1)) .notifySubscribersOnSchemaChange(