Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify producer and consumer clients when schemas are uploaded on topic #2060

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/io/aiven/klaw/model/enums/MailType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public enum MailType {
REGISTER_USER_REQUEST,
TOPIC_UPDATE_REQUESTED,
SCHEMA_PROMOTION_REQUESTED,
SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS,
NEW_USER_ADDED,
RESET_CONSUMER_OFFSET_REQUESTED,
RESET_CONSUMER_OFFSET_APPROVED,
RESET_CONSUMER_OFFSET_DENIED
RESET_CONSUMER_OFFSET_DENIED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a trailing comma here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it was by mistake.

}
38 changes: 37 additions & 1 deletion core/src/main/java/io/aiven/klaw/service/MailUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.aiven.klaw.model.enums.MailType;
import io.aiven.klaw.model.enums.PermissionType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -126,7 +127,7 @@ void sendMail(
getUserName(SecurityContextHolder.getContext().getAuthentication().getPrincipal()))
.getTenantId();
loadKwProps(tenantId);
Boolean requiresApproval = false;
boolean requiresApproval = false;
switch (mailType) {
case TOPIC_CREATE_REQUESTED -> {
formattedStr = String.format(topicRequestMail, "'" + topicName + "'");
Expand Down Expand Up @@ -277,6 +278,41 @@ void sendMail(
loginUrl);
}

void notifySubscribersOnSchemaChange(
MailType mailType,
String topicName,
String envName,
String teamName,
List<String> toMailIds,
String ccOwnerTeamMailId,
int tenantId,
String loginUrl) {
String subject, formattedStr;
if (mailType == MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS) {
subject = "New schema on a topic";
formattedStr =
"A schema has been uploaded on Topic :"
+ topicName
+ " Environment : "
+ envName
+ " by team : "
+ teamName;
String finalSubject = subject;
String finalFormattedStr = formattedStr;
CompletableFuture.runAsync(
() -> {
emailService.sendSimpleMessage(
toMailIds,
Collections.singletonList(ccOwnerTeamMailId),
Collections.emptyList(),
finalSubject,
finalFormattedStr,
tenantId,
loginUrl);
});
}
}

void sendMail(String username, String pwd, HandleDbRequests dbHandle, String loginUrl) {
String formattedStr, subject;
int tenantId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.aiven.klaw.config.ManageDatabase;
import io.aiven.klaw.dao.Acl;
import io.aiven.klaw.dao.Env;
import io.aiven.klaw.dao.KwClusters;
import io.aiven.klaw.dao.SchemaRequest;
import io.aiven.klaw.dao.Team;
import io.aiven.klaw.dao.Topic;
import io.aiven.klaw.dao.UserInfo;
import io.aiven.klaw.error.KlawException;
Expand Down Expand Up @@ -267,6 +269,10 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException
if (responseDb.equals(ApiResultStatus.SUCCESS.value)) {
saveToTopicHistory(userDetails, tenantId, schemaRequest);
}

// send mail to producers and consumers
notifySubscribers(schemaRequest, tenantId);

mailService.sendMail(
schemaRequest.getTopicname(),
null,
Expand Down Expand Up @@ -294,6 +300,59 @@ public ApiResponse execSchemaRequests(String avroSchemaId) throws KlawException
}
}

public void notifySubscribers(SchemaRequest schemaRequest, int tenantId) {
String topic = schemaRequest.getTopicname();
String schemaRequestEnvironment = schemaRequest.getEnvironment();
Optional<Env> optionalKafkaEnv =
manageDatabase.getKafkaEnvList(tenantId).stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
manageDatabase.getKafkaEnvList(tenantId).stream()
Optional<Env> optSchemaEnv = manageDatabase.getEnv(tenantId,schemaRequestEnvironment);
//add null pointer checks.
Optional<Env> optKafkaEnv = manageDatabase.getEnv(tenantId,optSchemaEnv .get().getAssociatedEnv().getId);

.filter(
kafkaEnv -> {
if (kafkaEnv.getAssociatedEnv() != null) {
return kafkaEnv.getAssociatedEnv().getId().equals(schemaRequestEnvironment);
}
return false;
})
.findFirst();

// get all producer and consumer acls for topic, schemaRequestEnvironment
List<Acl> acls = new ArrayList<>();
Optional<Env> optionalEnv = Optional.empty();
if (optionalKafkaEnv.isPresent()) {
acls =
manageDatabase
.getHandleDbRequests()
.getSyncAcls(optionalKafkaEnv.get().getId(), topic, tenantId);
optionalEnv =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these OptionalEnv and the OptionalKafkaEnv the same object?

manageDatabase.getEnv(tenantId, Integer.valueOf(optionalKafkaEnv.get().getId()));
}

// get all teams to be notified based on above acls
List<Integer> teamIdsToBeNotified = acls.stream().map(Acl::getTeamId).toList();
List<String> teamsToBeNotified =
manageDatabase.getTeamObjForTenant(tenantId).stream()
.filter(team -> teamIdsToBeNotified.contains(team.getTeamId()))
.map(Team::getTeammail)
.toList();

Optional<Team> optionalOwnerTeam =
manageDatabase.getTeamObjForTenant(tenantId).stream()
.filter(team -> Objects.equals(team.getTeamId(), schemaRequest.getTeamId()))
.findFirst();

// send notifications
if (optionalOwnerTeam.isPresent() && optionalEnv.isPresent()) {
mailService.notifySubscribersOnSchemaChange(
SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS,
topic,
optionalEnv.get().getName(),
optionalOwnerTeam.get().getTeamname(),
teamsToBeNotified,
optionalOwnerTeam.get().getTeammail(),
tenantId,
commonUtilsService.getLoginUrl());
}
}

private void saveToTopicHistory(String userDetails, int tenantId, SchemaRequest schemaRequest) {
manageDatabase.getKafkaEnvList(tenantId).stream()
.filter(
Expand Down
63 changes: 63 additions & 0 deletions core/src/test/java/io/aiven/klaw/UtilMethods.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.aiven.klaw.dao.AclRequests;
import io.aiven.klaw.dao.ActivityLog;
import io.aiven.klaw.dao.Env;
import io.aiven.klaw.dao.EnvTag;
import io.aiven.klaw.dao.KwClusters;
import io.aiven.klaw.dao.KwKafkaConnector;
import io.aiven.klaw.dao.KwTenants;
Expand Down Expand Up @@ -271,6 +272,23 @@ public List<Acl> getAcls() {
return allTopicReqs;
}

public List<Acl> getSyncAcls() {
List<Acl> allTopicReqs = new ArrayList<>();
Acl acl1 = new Acl();
acl1.setTeamId(102);
acl1.setAclType(AclType.PRODUCER.value);
acl1.setTenantId(101);
allTopicReqs.add(acl1);

Acl acl2 = new Acl();
acl2.setTeamId(103);
acl2.setAclType(AclType.CONSUMER.value);
acl2.setTenantId(101);
allTopicReqs.add(acl2);

return allTopicReqs;
}

public List<AclInfo> getAclInfoList() {
List<AclInfo> allTopicReqs = new ArrayList<>();
AclInfo topicRequest = new AclInfo();
Expand Down Expand Up @@ -469,6 +487,39 @@ public List<Team> getTeams() {
return allTopicReqs;
}

public List<Team> getTeamsForTenant() {
List<Team> teams = new ArrayList<>();
Team team1 = new Team();
team1.setTeamname("Seahorses");
team1.setTeamId(101);
team1.setContactperson("Contact Person1");
team1.setTenantId(101);
team1.setTeamphone("3142342343242");
team1.setTeammail("[email protected]");

Team team2 = new Team();
team2.setTeamname("Octopus");
team2.setTeamId(102);
team2.setContactperson("Contact Person2");
team2.setTenantId(101);
team2.setTeamphone("3142342343242");
team2.setTeammail("[email protected]");

Team team3 = new Team();
team3.setTeamname("Dragons");
team3.setTeamId(103);
team3.setContactperson("Contact Person3");
team3.setTenantId(101);
team3.setTeamphone("3142342343242");
team3.setTeammail("[email protected]");

teams.add(team1);
teams.add(team2);
teams.add(team3);

return teams;
}

public List<TeamModelResponse> getTeamsModel() {
List<TeamModelResponse> allTopicReqs = new ArrayList<>();
TeamModelResponse team = new TeamModelResponse();
Expand Down Expand Up @@ -703,6 +754,18 @@ public List<Env> getEnvLists() {
return envList;
}

public List<Env> getKafkaEnvs() {
List<Env> envList = new ArrayList<>();
Env env = new Env();
env.setId("1");
env.setName("DEV");
envList.add(env);
env.setClusterId(1);
env.setTenantId(101);
env.setAssociatedEnv(new EnvTag("3", "DEV"));
return envList;
}

public List<Env> getEnvListsIncorrect1() {
List<Env> envList = new ArrayList<>();
Env env = new Env();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.klaw.service;

import static io.aiven.klaw.model.enums.MailType.SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -14,6 +15,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.aiven.klaw.UtilMethods;
import io.aiven.klaw.config.ManageDatabase;
import io.aiven.klaw.dao.Env;
import io.aiven.klaw.dao.EnvTag;
Expand Down Expand Up @@ -84,6 +86,7 @@ public class SchemaRegistryControllerServiceTest {
private ObjectMapper mapper = new ObjectMapper();

private Env env;
private UtilMethods utilMethods;

@Captor private ArgumentCaptor<SchemaRequest> schemaRequestCaptor;

Expand All @@ -109,6 +112,7 @@ public void setUp() throws Exception {
Boolean validateOnSave = true;
ReflectionTestUtils.setField(
schemaRegistryControllerService, "validateCompatiblityOnSave", validateOnSave);
utilMethods = new UtilMethods();
}

private void loginMock() {
Expand Down Expand Up @@ -751,6 +755,33 @@ public void getListofSchemaRequestsIn_OLDEST_FIRST_ORDER() {
}
}

@Test
public void notifySubscribers() {
int tenantId = 101;
SchemaRequest schemaRequest = getSchemasReq();
when(manageDatabase.getKafkaEnvList(tenantId)).thenReturn(utilMethods.getKafkaEnvs());
when(handleDbRequests.getSyncAcls(
utilMethods.getKafkaEnvs().get(0).getId(), schemaRequest.getTopicname(), tenantId))
.thenReturn(utilMethods.getSyncAcls());
when(manageDatabase.getTeamObjForTenant(tenantId)).thenReturn(utilMethods.getTeamsForTenant());
when(manageDatabase.getEnv(
tenantId, Integer.valueOf(utilMethods.getKafkaEnvs().get(0).getId())))
.thenReturn(Optional.of(env));
schemaRegistryControllerService.notifySubscribers(schemaRequest, tenantId);
verify(mailService, times(1))
.notifySubscribersOnSchemaChange(
SCHEMA_APPROVED_NOTIFY_SUBSCRIBERS,
schemaRequest.getTopicname(),
env.getName(),
utilMethods.getTeamsForTenant().get(0).getTeamname(),
Arrays.asList(
utilMethods.getTeamsForTenant().get(1).getTeammail(),
utilMethods.getTeamsForTenant().get(2).getTeammail()),
utilMethods.getTeamsForTenant().get(0).getTeammail(),
tenantId,
commonUtilsService.getLoginUrl());
}

private static SchemaRequestModel createDefaultSchemaRequestModel() {
SchemaRequestModel schemaRequest = new SchemaRequestModel();
schemaRequest.setSchemafull("{}");
Expand Down Expand Up @@ -885,13 +916,25 @@ private SchemaPromotion buildPromoteSchemaRequest(boolean isForceRegister, Strin
return schema;
}

private SchemaRequest getSchemasReq() {
SchemaRequest schReq = new SchemaRequest();
schReq.setEnvironment("3");
schReq.setRequestStatus(RequestStatus.CREATED.value);
schReq.setTeamId(101);
schReq.setRequesttime(new Timestamp(System.currentTimeMillis()));
schReq.setTopicname("testtopic");

return schReq;
}

private List<SchemaRequest> getSchemasReqs() {
List<SchemaRequest> schList = new ArrayList<>();
SchemaRequest schReq = new SchemaRequest();
schReq.setEnvironment("1");
schReq.setRequestStatus(RequestStatus.CREATED.value);
schReq.setTeamId(101);
schReq.setRequesttime(new Timestamp(System.currentTimeMillis()));
schReq.setTopicname("testtopic");
schList.add(schReq);

schReq = new SchemaRequest();
Expand Down