Skip to content

Commit

Permalink
feat: fetch params for universe
Browse files Browse the repository at this point in the history
  • Loading branch information
rishuyadavbv committed Oct 22, 2024
1 parent 6714ec8 commit 6c5d3aa
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 58 deletions.
5 changes: 0 additions & 5 deletions queue/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.29</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.bazaarvoice.emodb.queue.api.Names;
import com.bazaarvoice.emodb.queue.api.UnknownMoveException;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaConfig;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaConsumerService;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil;
Expand All @@ -35,6 +36,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;

import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Clock;
Expand All @@ -49,6 +51,7 @@
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;


abstract class AbstractQueueService implements BaseQueueService {
private final Logger _log = LoggerFactory.getLogger(AbstractQueueService.class);
private final BaseEventStore _eventStore;
Expand All @@ -65,7 +68,7 @@ abstract class AbstractQueueService implements BaseQueueService {
private final StepFunctionService stepFunctionService;
private final ParameterStoreUtil parameterStoreUtil;

private static final String UNIVERSE = "cert"; //System.getenv("UNIVERSE");
private static final String UNIVERSE = KafkaConfig.getUniverseFromEnv();

protected AbstractQueueService(BaseEventStore eventStore, JobService jobService,
JobHandlerRegistry jobHandlerRegistry,
Expand Down Expand Up @@ -121,6 +124,8 @@ public MoveQueueResult run(MoveQueueRequest request)

@Override
public void send(String queue, Object message) {
_log.info("Environment variable UNIVERSE: {}",System.getenv("UNIVERSE" ));

_log.info("Starting send operation. Queue: {}, Message: {}", queue, message);

List<String> allowedQueues = fetchAllowedQueues();
Expand Down Expand Up @@ -217,7 +222,6 @@ public void sendAll(Map<String, ? extends Collection<?>> messagesByQueue) {
Multimap<String, String> eventsByChannel = builder.build();

String queueType = determineQueueType();
fetchUniverse();
for (Map.Entry<String, Collection<String>> topicEntry : eventsByChannel.asMap().entrySet()) {
String queueName= topicEntry.getKey();
String topic = "dsq-" + (("dedup".equals(queueType)) ? "dedup-" + queueName : queueName);
Expand All @@ -227,12 +231,6 @@ public void sendAll(Map<String, ? extends Collection<?>> messagesByQueue) {
// Execute Step Function after topic creation
startStepFunctionExecution(parameters, queueType,queueName, topic);
}
String universe1 = System.getenv("UNIVERSE");
if (universe1 == null) {
universe1 = "NOTFOUND";
}

_log.info("Fetched environment variable UNIVERSE: {}", universe1);
producerService.sendMessages(topic, topicEntry.getValue(), queueType);
KafkaConsumerService kafkaConsumerService = new KafkaConsumerService();
kafkaConsumerService.listTopicData();
Expand All @@ -241,37 +239,7 @@ public void sendAll(Map<String, ? extends Collection<?>> messagesByQueue) {
_log.info("All messages have been sent to their respective queues.");
}

public void fetchUniverse(){
Yaml yaml = new Yaml();
try (InputStream inputStream = AbstractQueueService.class.getClassLoader().getResourceAsStream("web-local/config-dc2.yaml")) {
if (inputStream == null) {
_log.info("File not found.");
return;
}

// Parse the YAML into a map
Map<String, Object> yamlData = yaml.load(inputStream);

// Navigate to the zooKeeper section and get the namespace value
Map<String, Object> zooKeeperConfig = (Map<String, Object>) yamlData.get("zooKeeper");
String namespace = (String) zooKeeperConfig.get("namespace");

if (namespace != null) {
// Split the namespace value by "/"
String[] parts = namespace.split("/");

// Print each part
for (String part : parts) {
_log.info("fetch unverse yaml : " + part);
}
} else {
_log.info("Namespace not found.");
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void sendAll(String queue, Collection<?> messages, boolean fromKafka) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Optional;
import java.util.stream.Collectors;

public class KafkaConfig {
Expand All @@ -38,7 +40,6 @@ public class KafkaConfig {
"b-2.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092";



static {
try {
// Fetch the UNIVERSE environment variable
Expand All @@ -48,13 +49,7 @@ public class KafkaConfig {
// logger.warn("Environment variable UNIVERSE is not set.");
// throw new IllegalArgumentException("Environment variable UNIVERSE is not set.");
// });
final String UNIVERSE = "cert";
String universe1 = System.getenv("UNIVERSE");
if (universe1 == null) {
universe1 = "NOTFOUND";
}

logger.info("Fetched environment variable UNIVERSE: {}", universe1);
final String UNIVERSE = getUniverseFromEnv();
// Load configurations from SSM during static initialization
Map<String, String> parameterValues = getParameterValues(
Arrays.asList(
Expand All @@ -67,29 +62,65 @@ public class KafkaConfig {

// Set configurations with fallback to defaults if not present
// Sets the batch size for Kafka producer, which controls the amount of data to batch before sending.
batchSizeConfig = parameterValues.getOrDefault("/"+UNIVERSE+"/emodb/kafka/batchSize", "16384");
batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384");

// Sets the number of retry attempts for failed Kafka message sends.
retriesConfig = parameterValues.getOrDefault("/"+UNIVERSE+"/emodb/kafka/retries", "3");
retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3");

// Sets the number of milliseconds a producer is willing to wait before sending a batch out
lingerMsConfig = parameterValues.getOrDefault("/"+UNIVERSE+"/emodb/kafka/lingerMs", "1");
lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1");

// Configures the Kafka broker addresses for producer connections.
bootstrapServersConfig = parameterValues.getOrDefault("/"+UNIVERSE+"/emodb/kafka/bootstrapServers", DEFAULT_BOOTSTRAP_SERVERS);
bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", DEFAULT_BOOTSTRAP_SERVERS);

logger.info("Kafka configurations loaded successfully from SSM.");
} catch (AmazonServiceException e) {
logger.error("Failed to load configurations from SSM. Using default values.", e);
throw e;
}
catch (Exception e) {
} catch (Exception e) {
logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e);
throw e;
}
}

public static String getUniverseFromEnv() {
String filePath = "/etc/environment";
logger.info("Reading environment file: " + filePath);
Properties environmentProps = new Properties();

try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
String line;
while ((line = reader.readLine()) != null) {
// Skip empty lines or comments
if (line.trim().isEmpty() || line.trim().startsWith("#")) {
continue;
}

// Split the line into key-value pair
String[] parts = line.split("=", 2);
logger.info("parts: " + Arrays.toString(parts));
if (parts.length == 2) {
String key = parts[0].trim();
String value = parts[1].trim();
// Remove any surrounding quotes from value
value = value.replace("\"", "");
environmentProps.put(key, value);
}
}

// Access the environment variables
String universe = environmentProps.getProperty("UNIVERSE");
String region = environmentProps.getProperty("REGION");

// Print the values
logger.info("from etc file UNIVERSE: " + universe);
logger.info(" from etc file REGION: " + region);
return universe;
} catch (IOException e) {
logger.error("Error reading environment file: " + e.getMessage());
throw new RuntimeException("Error reading environment file: " + e.getMessage());
}
}



Expand Down

0 comments on commit 6c5d3aa

Please sign in to comment.