Skip to content

Commit

Permalink
Notify producer and consumer clients when schemas are uploaded on top…
Browse files Browse the repository at this point in the history
…ic (#2060)

* Notify acl owners on schema changes

Signed-off-by: muralibasani <[email protected]>

* Add test

Signed-off-by: muralibasani <[email protected]>

* Optimize calls

Signed-off-by: muralibasani <[email protected]>

* Remove an unused var

Signed-off-by: muralibasani <[email protected]>

---------

Signed-off-by: muralibasani <[email protected]>
Co-authored-by: muralibasani <[email protected]>
  • Loading branch information
muralibasani and muralibasani authored Nov 28, 2023
1 parent a824571 commit 14a5068
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 1 deletion.
1 change: 1 addition & 0 deletions core/src/main/java/io/aiven/klaw/model/enums/MailType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum MailType {
REGISTER_USER_REQUEST,
TOPIC_UPDATE_REQUESTED,
SCHEMA_PROMOTION_REQUESTED,
SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS,
NEW_USER_ADDED,
RESET_CONSUMER_OFFSET_REQUESTED,
RESET_CONSUMER_OFFSET_APPROVED,
Expand Down
38 changes: 37 additions & 1 deletion core/src/main/java/io/aiven/klaw/service/MailUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.aiven.klaw.model.enums.MailType;
import io.aiven.klaw.model.enums.PermissionType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -126,7 +127,7 @@ void sendMail(
getUserName(SecurityContextHolder.getContext().getAuthentication().getPrincipal()))
.getTenantId();
loadKwProps(tenantId);
Boolean requiresApproval = false;
boolean requiresApproval = false;
switch (mailType) {
case TOPIC_CREATE_REQUESTED -> {
formattedStr = String.format(topicRequestMail, "'" + topicName + "'");
Expand Down Expand Up @@ -277,6 +278,41 @@ void sendMail(
loginUrl);
}

void notifySubscribersOnSchemaChange(
MailType mailType,
String topicName,
String envName,
String teamName,
List<String> toMailIds,
String ccOwnerTeamMailId,
int tenantId,
String loginUrl) {
String subject, formattedStr;
if (mailType == MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS) {
subject = "New schema on a topic";
formattedStr =
"A schema has been uploaded on Topic :"
+ topicName
+ " Environment : "
+ envName
+ " by team : "
+ teamName;
String finalSubject = subject;
String finalFormattedStr = formattedStr;
CompletableFuture.runAsync(
() -> {
emailService.sendSimpleMessage(
toMailIds,
Collections.singletonList(ccOwnerTeamMailId),
Collections.emptyList(),
finalSubject,
finalFormattedStr,
tenantId,
loginUrl);
});
}
}

void sendMail(String username, String pwd, HandleDbRequests dbHandle, String loginUrl) {
String formattedStr, subject;
int tenantId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.aiven.klaw.config.ManageDatabase;
import io.aiven.klaw.dao.Acl;
import io.aiven.klaw.dao.Env;
import io.aiven.klaw.dao.KwClusters;
import io.aiven.klaw.dao.SchemaRequest;
import io.aiven.klaw.dao.Team;
import io.aiven.klaw.dao.Topic;
import io.aiven.klaw.dao.UserInfo;
import io.aiven.klaw.error.KlawException;
Expand Down Expand Up @@ -267,6 +269,10 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException
if (responseDb.equals(ApiResultStatus.SUCCESS.value)) {
saveToTopicHistory(userDetails, tenantId, schemaRequest);
}

// send mail to producers and consumers
notifySubscribers(schemaRequest, tenantId);

mailService.sendMail(
schemaRequest.getTopicname(),
null,
Expand Down Expand Up @@ -294,6 +300,56 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException
}
}

public void notifySubscribers(SchemaRequest schemaRequest, int tenantId) {
String topic = schemaRequest.getTopicname();
String schemaRequestEnvironment = schemaRequest.getEnvironment();

Optional<Env> optSchemaEnv =
manageDatabase.getEnv(tenantId, Integer.valueOf(schemaRequestEnvironment));
Optional<Env> optionalKafkaEnv = Optional.empty();
// add null pointer checks.
if (optSchemaEnv.isPresent()) {
optionalKafkaEnv =
manageDatabase.getEnv(
tenantId, Integer.valueOf(optSchemaEnv.get().getAssociatedEnv().getId()));
}

// get all producer and consumer acls for topic, schemaRequestEnvironment
List<Acl> acls = new ArrayList<>();
if (optionalKafkaEnv.isPresent()) {
acls =
manageDatabase
.getHandleDbRequests()
.getSyncAcls(optionalKafkaEnv.get().getId(), topic, tenantId);
}

// get all teams to be notified based on above acls
List<Integer> teamIdsToBeNotified = acls.stream().map(Acl::getTeamId).toList();
List<String> teamsToBeNotified =
manageDatabase.getTeamObjForTenant(tenantId).stream()
.filter(team -> teamIdsToBeNotified.contains(team.getTeamId()))
.map(Team::getTeammail)
.toList();

Optional<Team> optionalOwnerTeam =
manageDatabase.getTeamObjForTenant(tenantId).stream()
.filter(team -> Objects.equals(team.getTeamId(), schemaRequest.getTeamId()))
.findFirst();

// send notifications
if (optionalOwnerTeam.isPresent() && optionalKafkaEnv.isPresent()) {
mailService.notifySubscribersOnSchemaChange(
SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS,
topic,
optionalKafkaEnv.get().getName(),
optionalOwnerTeam.get().getTeamname(),
teamsToBeNotified,
optionalOwnerTeam.get().getTeammail(),
tenantId,
commonUtilsService.getLoginUrl());
}
}

private void saveToTopicHistory(String userDetails, int tenantId, SchemaRequest schemaRequest) {
manageDatabase.getKafkaEnvList(tenantId).stream()
.filter(
Expand Down
74 changes: 74 additions & 0 deletions core/src/test/java/io/aiven/klaw/UtilMethods.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.aiven.klaw.dao.AclRequests;
import io.aiven.klaw.dao.ActivityLog;
import io.aiven.klaw.dao.Env;
import io.aiven.klaw.dao.EnvTag;
import io.aiven.klaw.dao.KwClusters;
import io.aiven.klaw.dao.KwKafkaConnector;
import io.aiven.klaw.dao.KwTenants;
Expand Down Expand Up @@ -271,6 +272,23 @@ public List<Acl> getAcls() {
return allTopicReqs;
}

public List<Acl> getSyncAcls() {
List<Acl> allTopicReqs = new ArrayList<>();
Acl acl1 = new Acl();
acl1.setTeamId(102);
acl1.setAclType(AclType.PRODUCER.value);
acl1.setTenantId(101);
allTopicReqs.add(acl1);

Acl acl2 = new Acl();
acl2.setTeamId(103);
acl2.setAclType(AclType.CONSUMER.value);
acl2.setTenantId(101);
allTopicReqs.add(acl2);

return allTopicReqs;
}

public List<AclInfo> getAclInfoList() {
List<AclInfo> allTopicReqs = new ArrayList<>();
AclInfo topicRequest = new AclInfo();
Expand Down Expand Up @@ -469,6 +487,39 @@ public List<Team> getTeams() {
return allTopicReqs;
}

public List<Team> getTeamsForTenant() {
List<Team> teams = new ArrayList<>();
Team team1 = new Team();
team1.setTeamname("Seahorses");
team1.setTeamId(101);
team1.setContactperson("Contact Person1");
team1.setTenantId(101);
team1.setTeamphone("3142342343242");
team1.setTeammail("[email protected]");

Team team2 = new Team();
team2.setTeamname("Octopus");
team2.setTeamId(102);
team2.setContactperson("Contact Person2");
team2.setTenantId(101);
team2.setTeamphone("3142342343242");
team2.setTeammail("[email protected]");

Team team3 = new Team();
team3.setTeamname("Dragons");
team3.setTeamId(103);
team3.setContactperson("Contact Person3");
team3.setTenantId(101);
team3.setTeamphone("3142342343242");
team3.setTeammail("[email protected]");

teams.add(team1);
teams.add(team2);
teams.add(team3);

return teams;
}

public List<TeamModelResponse> getTeamsModel() {
List<TeamModelResponse> allTopicReqs = new ArrayList<>();
TeamModelResponse team = new TeamModelResponse();
Expand Down Expand Up @@ -703,6 +754,29 @@ public List<Env> getEnvLists() {
return envList;
}

public List<Env> getKafkaEnvs() {
List<Env> envList = new ArrayList<>();
Env env = new Env();
env.setId("1");
env.setName("DEV");
envList.add(env);
env.setClusterId(1);
env.setTenantId(101);
env.setAssociatedEnv(new EnvTag("3", "DEV"));
return envList;
}

public Env getSchemaEnv() {
Env env = new Env();
env.setId("3");
env.setName("DEV");
env.setClusterId(3);
env.setTenantId(101);
env.setAssociatedEnv(new EnvTag("1", "DEV"));

return env;
}

public List<Env> getEnvListsIncorrect1() {
List<Env> envList = new ArrayList<>();
Env env = new Env();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.klaw.service;

import static io.aiven.klaw.model.enums.MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -14,6 +15,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.aiven.klaw.UtilMethods;
import io.aiven.klaw.config.ManageDatabase;
import io.aiven.klaw.dao.Env;
import io.aiven.klaw.dao.EnvTag;
Expand Down Expand Up @@ -84,6 +86,7 @@ public class SchemaRegistryControllerServiceTest {
private ObjectMapper mapper = new ObjectMapper();

private Env env;
private UtilMethods utilMethods;

@Captor private ArgumentCaptor<SchemaRequest> schemaRequestCaptor;

Expand All @@ -109,6 +112,7 @@ public void setUp() throws Exception {
Boolean validateOnSave = true;
ReflectionTestUtils.setField(
schemaRegistryControllerService, "validateCompatiblityOnSave", validateOnSave);
utilMethods = new UtilMethods();
}

private void loginMock() {
Expand Down Expand Up @@ -751,6 +755,34 @@ public void getListofSchemaRequestsIn_OLDEST_FIRST_ORDER() {
}
}

@Test
public void notifySubscribers() {
int tenantId = 101;
SchemaRequest schemaRequest = getSchemasReq();
when(manageDatabase.getEnv(tenantId, Integer.valueOf(schemaRequest.getEnvironment())))
.thenReturn(Optional.of(utilMethods.getSchemaEnv()));
when(manageDatabase.getEnv(tenantId, Integer.valueOf(utilMethods.getSchemaEnv().getId())))
.thenReturn(Optional.of(utilMethods.getKafkaEnvs().get(0)));

when(handleDbRequests.getSyncAcls(
utilMethods.getKafkaEnvs().get(0).getId(), schemaRequest.getTopicname(), tenantId))
.thenReturn(utilMethods.getSyncAcls());
when(manageDatabase.getTeamObjForTenant(tenantId)).thenReturn(utilMethods.getTeamsForTenant());
schemaRegistryControllerService.notifySubscribers(schemaRequest, tenantId);
verify(mailService, times(1))
.notifySubscribersOnSchemaChange(
SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS,
schemaRequest.getTopicname(),
env.getName(),
utilMethods.getTeamsForTenant().get(0).getTeamname(),
Arrays.asList(
utilMethods.getTeamsForTenant().get(1).getTeammail(),
utilMethods.getTeamsForTenant().get(2).getTeammail()),
utilMethods.getTeamsForTenant().get(0).getTeammail(),
tenantId,
commonUtilsService.getLoginUrl());
}

private static SchemaRequestModel createDefaultSchemaRequestModel() {
SchemaRequestModel schemaRequest = new SchemaRequestModel();
schemaRequest.setSchemafull("{}");
Expand Down Expand Up @@ -885,13 +917,25 @@ private SchemaPromotion buildPromoteSchemaRequest(boolean isForceRegister, Strin
return schema;
}

private SchemaRequest getSchemasReq() {
SchemaRequest schReq = new SchemaRequest();
schReq.setEnvironment("3");
schReq.setRequestStatus(RequestStatus.CREATED.value);
schReq.setTeamId(101);
schReq.setRequesttime(new Timestamp(System.currentTimeMillis()));
schReq.setTopicname("testtopic");

return schReq;
}

private List<SchemaRequest> getSchemasReqs() {
List<SchemaRequest> schList = new ArrayList<>();
SchemaRequest schReq = new SchemaRequest();
schReq.setEnvironment("1");
schReq.setRequestStatus(RequestStatus.CREATED.value);
schReq.setTeamId(101);
schReq.setRequesttime(new Timestamp(System.currentTimeMillis()));
schReq.setTopicname("testtopic");
schList.add(schReq);

schReq = new SchemaRequest();
Expand Down

0 comments on commit 14a5068

Please sign in to comment.