diff --git a/src/main/java/org/dependencytrack/common/ConfigKey.java b/src/main/java/org/dependencytrack/common/ConfigKey.java index 5e42f2412..e32d48cdf 100644 --- a/src/main/java/org/dependencytrack/common/ConfigKey.java +++ b/src/main/java/org/dependencytrack/common/ConfigKey.java @@ -58,6 +58,7 @@ public enum ConfigKey implements Config.Key { DATABASE_RUN_MIGRATIONS("database.run.migrations", true), DATABASE_RUN_MIGRATIONS_ONLY("database.run.migrations.only", false), INIT_TASKS_ENABLED("init.tasks.enabled", true), + INIT_TASKS_KAFKA_TOPICS_ENABLED("init.tasks.kafka.topics.enabled", false), INIT_AND_EXIT("init.and.exit", false), DEV_SERVICES_ENABLED("dev.services.enabled", false), diff --git a/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java b/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java index c9c6e4f4f..93f3dee9d 100644 --- a/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java +++ b/src/main/java/org/dependencytrack/dev/DevServicesInitializer.java @@ -20,33 +20,26 @@ import alpine.Config; import alpine.common.logging.Logger; + import jakarta.servlet.ServletContextEvent; import jakarta.servlet.ServletContextListener; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.NewTopic; -import org.dependencytrack.event.kafka.KafkaTopics; - import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static alpine.Config.AlpineKey.DATABASE_PASSWORD; import static alpine.Config.AlpineKey.DATABASE_URL; import static alpine.Config.AlpineKey.DATABASE_USERNAME; -import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_ENABLED; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_FRONTEND; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_KAFKA; import static org.dependencytrack.common.ConfigKey.DEV_SERVICES_IMAGE_POSTGRES; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_ENABLED; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_KAFKA_TOPICS_ENABLED; import static org.dependencytrack.common.ConfigKey.KAFKA_BOOTSTRAP_SERVERS; /** @@ -138,6 +131,8 @@ public void contextInitialized(final ServletContextEvent event) { configOverrides.put(DATABASE_USERNAME.getPropertyName(), postgresUsername); configOverrides.put(DATABASE_PASSWORD.getPropertyName(), postgresPassword); configOverrides.put(KAFKA_BOOTSTRAP_SERVERS.getPropertyName(), kafkaBootstrapServers); + configOverrides.put(INIT_TASKS_ENABLED.getPropertyName(), "true"); + configOverrides.put(INIT_TASKS_KAFKA_TOPICS_ENABLED.getPropertyName(), "true"); try { LOGGER.info("Applying config overrides: %s".formatted(configOverrides)); @@ -150,37 +145,6 @@ public void contextInitialized(final ServletContextEvent event) { throw new RuntimeException("Failed to update configuration", e); } - final var topicsToCreate = new ArrayList<>(List.of( - new NewTopic(KafkaTopics.NEW_EPSS.name(), 1, (short) 1).configs(Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)), - new NewTopic(KafkaTopics.NEW_VULNERABILITY.name(), 1, (short) 1).configs(Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)), - new NewTopic(KafkaTopics.NOTIFICATION_ANALYZER.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_BOM.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_CONFIGURATION.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_FILE_SYSTEM.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_INTEGRATION.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_POLICY_VIOLATION.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_REPOSITORY.name(), 1, (short) 1), - new NewTopic(KafkaTopics.NOTIFICATION_VEX.name(), 1, (short) 1), - new NewTopic(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name(), 1, (short) 1), - new NewTopic(KafkaTopics.REPO_META_ANALYSIS_RESULT.name(), 1, (short) 1), - new NewTopic(KafkaTopics.VULN_ANALYSIS_COMMAND.name(), 1, (short) 1), - new NewTopic(KafkaTopics.VULN_ANALYSIS_RESULT.name(), 1, (short) 1), - new NewTopic(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name(), 1, (short) 1) - )); - - try (final var adminClient = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) { - LOGGER.info("Creating topics: %s".formatted(topicsToCreate)); - adminClient.createTopics(topicsToCreate).all().get(); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException("Failed to create topics", e); - } - LOGGER.info("PostgreSQL is listening at localhost:%d".formatted(postgresPort)); LOGGER.info("Kafka is listening at localhost:%d".formatted(kafkaPort)); LOGGER.info("Frontend is listening at http://localhost:%d".formatted(frontendPort)); diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java b/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java index cae504507..30865ce8e 100644 --- a/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaEventConverter.java @@ -123,7 +123,7 @@ static KafkaEvent convert(final ComponentVulnerabilityAnal .build(); return new KafkaEvent<>( - KafkaTopics.VULN_ANALYSIS_COMMAND, + KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, scanKey, scanCommand, Map.of(KafkaEventHeaders.VULN_ANALYSIS_LEVEL, event.level().name(), KafkaEventHeaders.IS_NEW_COMPONENT, String.valueOf(event.isNewComponent())) @@ -145,46 +145,46 @@ static KafkaEvent convert(final ComponentRepositoryMeta .setFetchMeta(event.fetchMeta()) .build(); - return new KafkaEvent<>(KafkaTopics.REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null); + return new KafkaEvent<>(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, event.purlCoordinates(), analysisCommand, null); } static KafkaEvent convert(final GitHubAdvisoryMirrorEvent ignored) { final String key = Vulnerability.Source.GITHUB.name(); - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, null); } static KafkaEvent convert(final NistMirrorEvent ignored) { final String key = Vulnerability.Source.NVD.name(); - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, null); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, null); } static KafkaEvent convert(final OsvMirrorEvent event) { final String key = Vulnerability.Source.OSV.name(); final String value = event.ecosystem(); - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, key, value); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, key, value); } static KafkaEvent convert(final EpssMirrorEvent ignored) { - return new KafkaEvent<>(KafkaTopics.VULNERABILITY_MIRROR_COMMAND, "EPSS", null); + return new KafkaEvent<>(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND, "EPSS", null); } private static Topic extractDestinationTopic(final Notification notification) { return switch (notification.getGroup()) { - case GROUP_ANALYZER -> KafkaTopics.NOTIFICATION_ANALYZER; - case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED, GROUP_BOM_VALIDATION_FAILED -> KafkaTopics.NOTIFICATION_BOM; - case GROUP_CONFIGURATION -> KafkaTopics.NOTIFICATION_CONFIGURATION; - case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.NOTIFICATION_DATASOURCE_MIRRORING; - case GROUP_FILE_SYSTEM -> KafkaTopics.NOTIFICATION_FILE_SYSTEM; - case GROUP_INTEGRATION -> KafkaTopics.NOTIFICATION_INTEGRATION; - case GROUP_NEW_VULNERABILITY -> KafkaTopics.NOTIFICATION_NEW_VULNERABILITY; - case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; - case GROUP_POLICY_VIOLATION -> KafkaTopics.NOTIFICATION_POLICY_VIOLATION; - case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE; - case GROUP_PROJECT_CREATED -> KafkaTopics.NOTIFICATION_PROJECT_CREATED; - case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; - case GROUP_REPOSITORY -> KafkaTopics.NOTIFICATION_REPOSITORY; - case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.NOTIFICATION_VEX; - case GROUP_USER_CREATED, GROUP_USER_DELETED -> KafkaTopics.NOTIFICATION_USER; + case GROUP_ANALYZER -> KafkaTopics.TOPIC_NOTIFICATION_ANALYZER; + case GROUP_BOM_CONSUMED, GROUP_BOM_PROCESSED, GROUP_BOM_PROCESSING_FAILED, GROUP_BOM_VALIDATION_FAILED -> KafkaTopics.TOPIC_NOTIFICATION_BOM; + case GROUP_CONFIGURATION -> KafkaTopics.TOPIC_NOTIFICATION_CONFIGURATION; + case GROUP_DATASOURCE_MIRRORING -> KafkaTopics.TOPIC_NOTIFICATION_DATASOURCE_MIRRORING; + case GROUP_FILE_SYSTEM -> KafkaTopics.TOPIC_NOTIFICATION_FILE_SYSTEM; + case GROUP_INTEGRATION -> KafkaTopics.TOPIC_NOTIFICATION_INTEGRATION; + case GROUP_NEW_VULNERABILITY -> KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY; + case GROUP_NEW_VULNERABLE_DEPENDENCY -> KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; + case GROUP_POLICY_VIOLATION -> KafkaTopics.TOPIC_NOTIFICATION_POLICY_VIOLATION; + case GROUP_PROJECT_AUDIT_CHANGE -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE; + case GROUP_PROJECT_CREATED -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED; + case GROUP_PROJECT_VULN_ANALYSIS_COMPLETE -> KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; + case GROUP_REPOSITORY -> KafkaTopics.TOPIC_NOTIFICATION_REPOSITORY; + case GROUP_VEX_CONSUMED, GROUP_VEX_PROCESSED -> KafkaTopics.TOPIC_NOTIFICATION_VEX; + case GROUP_USER_CREATED, GROUP_USER_DELETED -> KafkaTopics.TOPIC_NOTIFICATION_USER; case GROUP_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException(""" Unable to determine destination topic because the notification does not \ specify a notification group: %s""".formatted(notification.getGroup())); diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaTopicInitializer.java b/src/main/java/org/dependencytrack/event/kafka/KafkaTopicInitializer.java new file mode 100644 index 000000000..04ebe8e17 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaTopicInitializer.java @@ -0,0 +1,177 @@ +/* + * This file is part of Dependency-Track. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) OWASP Foundation. All Rights Reserved. + */ +package org.dependencytrack.event.kafka; + +import alpine.Config; +import alpine.common.logging.Logger; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DescribeTopicsOptions; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; + +import jakarta.servlet.ServletContextEvent; +import jakarta.servlet.ServletContextListener; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_ENABLED; +import static org.dependencytrack.common.ConfigKey.INIT_TASKS_KAFKA_TOPICS_ENABLED; +import static org.dependencytrack.common.ConfigKey.KAFKA_BOOTSTRAP_SERVERS; + +/** + * @since 5.6.0 + */ +public class KafkaTopicInitializer implements ServletContextListener { + + private static final Logger LOGGER = Logger.getLogger(KafkaTopicInitializer.class); + + private final Config config = Config.getInstance(); + + @Override + public void contextInitialized(final ServletContextEvent event) { + if (!config.getPropertyAsBoolean(INIT_TASKS_ENABLED)) { + LOGGER.debug("Not initializing Kafka topics because %s is disabled" + .formatted(INIT_TASKS_ENABLED.getPropertyName())); + return; + } + if (!config.getPropertyAsBoolean(INIT_TASKS_KAFKA_TOPICS_ENABLED)) { + LOGGER.debug("Not initializing Kafka topics because %s is disabled" + .formatted(INIT_TASKS_KAFKA_TOPICS_ENABLED.getPropertyName())); + return; + } + + LOGGER.warn("Auto-creating topics with default configuration is not recommended for production deployments"); + + try (final AdminClient adminClient = createAdminClient()) { + final List> topicsToCreate = determineTopicsToCreate(adminClient); + if (topicsToCreate.isEmpty()) { + LOGGER.info("All topics exist already; Nothing to do"); + return; + } + + createTopics(adminClient, topicsToCreate); + LOGGER.info("Successfully created %d topics".formatted(topicsToCreate.size())); + } + } + + private List> determineTopicsToCreate(final AdminClient adminClient) { + final Map> topicByName = KafkaTopics.ALL_TOPICS.stream() + .collect(Collectors.toMap(KafkaTopics.Topic::name, Function.identity())); + + final var topicsToCreate = new ArrayList>(topicByName.size()); + + final var describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(3_000); + final DescribeTopicsResult topicsResult = adminClient.describeTopics(topicByName.keySet(), describeTopicsOptions); + + final var exceptionsByTopicName = new HashMap(); + for (final Map.Entry> entry : topicsResult.topicNameValues().entrySet()) { + final String topicName = entry.getKey(); + try { + entry.getValue().get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + LOGGER.debug("Topic %s does not exist and will need to be created".formatted(topicName)); + topicsToCreate.add(topicByName.get(topicName)); + } else { + exceptionsByTopicName.put(topicName, e.getCause()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(""" + Thread was interrupted while waiting for broker response. \ + The existence of topic %s can not be determined.""".formatted(topicName), e); + } + } + + if (!exceptionsByTopicName.isEmpty()) { + final String exceptionSummary = exceptionsByTopicName.entrySet().stream() + .map(entry -> "{topic=%s, error=%s}".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "[", "]")); + throw new IllegalStateException("Existence of %d topic(s) could not be verified: %s" + .formatted(exceptionsByTopicName.size(), exceptionSummary)); + } + + return topicsToCreate; + } + + private void createTopics(final AdminClient adminClient, final Collection> topics) { + final List newTopics = topics.stream() + .map(topic -> { + final var newTopic = new NewTopic( + topic.name(), + topic.defaultConfig().partitions(), + topic.defaultConfig().replicationFactor()); + if (topic.defaultConfig().configs() != null) { + return newTopic.configs(topic.defaultConfig().configs()); + } + + return newTopic; + }) + .toList(); + + final var createTopicsOptions = new CreateTopicsOptions().timeoutMs(3_000); + final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics, createTopicsOptions); + + final var exceptionsByTopicName = new HashMap(); + for (final Map.Entry> entry : createTopicsResult.values().entrySet()) { + final String topicName = entry.getKey(); + try { + entry.getValue().get(); + } catch (ExecutionException e) { + exceptionsByTopicName.put(topicName, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(""" + Thread was interrupted while waiting for broker response. \ + Successful creation of topic %s can not be verified.""".formatted(topicName), e); + } + } + + if (!exceptionsByTopicName.isEmpty()) { + final String exceptionSummary = exceptionsByTopicName.entrySet().stream() + .map(entry -> "{topic=%s, error=%s}".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "[", "]")); + throw new IllegalStateException("Failed to create %d topic(s): %s" + .formatted(exceptionsByTopicName.size(), exceptionSummary)); + } + } + + private AdminClient createAdminClient() { + final var adminClientConfig = new HashMap(); + adminClientConfig.put(BOOTSTRAP_SERVERS_CONFIG, config.getProperty(KAFKA_BOOTSTRAP_SERVERS)); + adminClientConfig.put(CLIENT_ID_CONFIG, "%s-admin-client".formatted("instanceId")); + + LOGGER.debug("Creating admin client with options %s".formatted(adminClientConfig)); + return AdminClient.create(adminClientConfig); + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java b/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java index cc5260445..18a20dcfa 100644 --- a/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java +++ b/src/main/java/org/dependencytrack/event/kafka/KafkaTopics.java @@ -18,7 +18,6 @@ */ package org.dependencytrack.event.kafka; -import alpine.Config; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.cyclonedx.proto.v1_6.Bom; @@ -31,68 +30,219 @@ import org.dependencytrack.proto.vulnanalysis.v1.ScanCommand; import org.dependencytrack.proto.vulnanalysis.v1.ScanKey; import org.dependencytrack.proto.vulnanalysis.v1.ScanResult; +import org.dependencytrack.proto.vulnanalysis.v1.ScannerResult; -public final class KafkaTopics { +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; - public static final Topic NOTIFICATION_ANALYZER; - public static final Topic NOTIFICATION_BOM; - public static final Topic NOTIFICATION_CONFIGURATION; - public static final Topic NOTIFICATION_DATASOURCE_MIRRORING; - public static final Topic NOTIFICATION_FILE_SYSTEM; - public static final Topic NOTIFICATION_INTEGRATION; - public static final Topic NOTIFICATION_NEW_VULNERABILITY; - public static final Topic NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; - public static final Topic NOTIFICATION_POLICY_VIOLATION; - public static final Topic NOTIFICATION_PROJECT_AUDIT_CHANGE; - public static final Topic NOTIFICATION_PROJECT_CREATED; - public static final Topic NOTIFICATION_REPOSITORY; - public static final Topic NOTIFICATION_VEX; - public static final Topic NOTIFICATION_USER; - public static final Topic VULNERABILITY_MIRROR_COMMAND; - public static final Topic NEW_VULNERABILITY; - public static final Topic REPO_META_ANALYSIS_COMMAND; - public static final Topic REPO_META_ANALYSIS_RESULT; - public static final Topic VULN_ANALYSIS_COMMAND; - public static final Topic VULN_ANALYSIS_RESULT; - public static final Topic VULN_ANALYSIS_RESULT_PROCESSED; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; - public static final Topic NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; - public static final Topic NEW_EPSS; - private static final Serde NOTIFICATION_SERDE = new KafkaProtobufSerde<>(Notification.parser()); +public final class KafkaTopics { - static { - NOTIFICATION_ANALYZER = new Topic<>("dtrack.notification.analyzer", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_BOM = new Topic<>("dtrack.notification.bom", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_CONFIGURATION = new Topic<>("dtrack.notification.configuration", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_DATASOURCE_MIRRORING = new Topic<>("dtrack.notification.datasource-mirroring", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_FILE_SYSTEM = new Topic<>("dtrack.notification.file-system", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_INTEGRATION = new Topic<>("dtrack.notification.integration", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_NEW_VULNERABILITY = new Topic<>("dtrack.notification.new-vulnerability", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_NEW_VULNERABLE_DEPENDENCY = new Topic<>("dtrack.notification.new-vulnerable-dependency", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_POLICY_VIOLATION = new Topic<>("dtrack.notification.policy-violation", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_PROJECT_AUDIT_CHANGE = new Topic<>("dtrack.notification.project-audit-change", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_PROJECT_CREATED = new Topic<>("dtrack.notification.project-created", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_REPOSITORY = new Topic<>("dtrack.notification.repository", Serdes.String(), NOTIFICATION_SERDE); - NOTIFICATION_VEX = new Topic<>("dtrack.notification.vex", Serdes.String(), NOTIFICATION_SERDE); - VULNERABILITY_MIRROR_COMMAND = new Topic<>("dtrack.vulnerability.mirror.command", Serdes.String(), Serdes.String()); - NEW_VULNERABILITY = new Topic<>("dtrack.vulnerability", Serdes.String(), new KafkaProtobufSerde<>(Bom.parser())); - REPO_META_ANALYSIS_COMMAND = new Topic<>("dtrack.repo-meta-analysis.component", Serdes.String(), new KafkaProtobufSerde<>(AnalysisCommand.parser())); - REPO_META_ANALYSIS_RESULT = new Topic<>("dtrack.repo-meta-analysis.result", Serdes.String(), new KafkaProtobufSerde<>(AnalysisResult.parser())); - VULN_ANALYSIS_COMMAND = new Topic<>("dtrack.vuln-analysis.component", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanCommand.parser())); - VULN_ANALYSIS_RESULT = new Topic<>("dtrack.vuln-analysis.result", new KafkaProtobufSerde<>(ScanKey.parser()), new KafkaProtobufSerde<>(ScanResult.parser())); - VULN_ANALYSIS_RESULT_PROCESSED = new Topic<>("dtrack.vuln-analysis.result.processed", Serdes.String(), new KafkaProtobufSerde<>(ScanResult.parser())); - NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE = new Topic<>("dtrack.notification.project-vuln-analysis-complete", Serdes.String(), NOTIFICATION_SERDE); - NEW_EPSS = new Topic<>("dtrack.epss", Serdes.String(), new KafkaProtobufSerde<>(EpssItem.parser())); - NOTIFICATION_USER = new Topic<>("dtrack.notification.user", Serdes.String(), NOTIFICATION_SERDE); - } + public record Topic( + String name, + Serde keySerde, + Serde valueSerde, + Config defaultConfig) { - public record Topic(String name, Serde keySerde, Serde valueSerde) { + /** + * @since 5.6.0 + */ + public record Config( + int partitions, + short replicationFactor, + Map configs) { + } @Override public String name() { - return Config.getInstance().getProperty(ConfigKey.DT_KAFKA_TOPIC_PREFIX) + name; + return alpine.Config.getInstance().getProperty(ConfigKey.DT_KAFKA_TOPIC_PREFIX) + name; } } + public static final Topic TOPIC_EPSS; + public static final Topic TOPIC_NOTIFICATION_ANALYZER; + public static final Topic TOPIC_NOTIFICATION_BOM; + public static final Topic TOPIC_NOTIFICATION_CONFIGURATION; + public static final Topic TOPIC_NOTIFICATION_DATASOURCE_MIRRORING; + public static final Topic TOPIC_NOTIFICATION_FILE_SYSTEM; + public static final Topic TOPIC_NOTIFICATION_INTEGRATION; + public static final Topic TOPIC_NOTIFICATION_NEW_VULNERABILITY; + public static final Topic TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY; + public static final Topic TOPIC_NOTIFICATION_POLICY_VIOLATION; + public static final Topic TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE; + public static final Topic TOPIC_NOTIFICATION_PROJECT_CREATED; + public static final Topic TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE; + public static final Topic TOPIC_NOTIFICATION_REPOSITORY; + public static final Topic TOPIC_NOTIFICATION_VEX; + public static final Topic TOPIC_NOTIFICATION_USER; + public static final Topic TOPIC_REPO_META_ANALYSIS_COMMAND; + public static final Topic TOPIC_REPO_META_ANALYSIS_RESULT; + public static final Topic TOPIC_VULN_ANALYSIS_COMMAND; + public static final Topic TOPIC_VULN_ANALYSIS_RESULT; + public static final Topic TOPIC_VULN_ANALYSIS_RESULT_PROCESSED; + public static final Topic TOPIC_VULN_ANALYSIS_SCANNER_RESULT; + public static final Topic TOPIC_VULNERABILITY; + public static final Topic TOPIC_VULNERABILITY_MIRROR_COMMAND; + public static final List> ALL_TOPICS; + + private static final String DEFAULT_RETENTION_MS = String.valueOf(TimeUnit.HOURS.toMillis(12)); + private static final Serde NOTIFICATION_PROTO_SERDE = new KafkaProtobufSerde<>(Notification.parser()); + + + static { + // TODO: Provide a way to (partially) overwrite the default configs. + + TOPIC_EPSS = new Topic<>( + "dtrack.epss", + Serdes.String(), + new KafkaProtobufSerde<>(EpssItem.parser()), + new Topic.Config(3, (short) 1, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT))); + TOPIC_NOTIFICATION_ANALYZER = new Topic<>( + "dtrack.notification.analyzer", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_BOM = new Topic<>( + "dtrack.notification.bom", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_CONFIGURATION = new Topic<>( + "dtrack.notification.configuration", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_DATASOURCE_MIRRORING = new Topic<>( + "dtrack.notification.datasource-mirroring", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_FILE_SYSTEM = new Topic<>( + "dtrack.notification.file-system", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_INTEGRATION = new Topic<>( + "dtrack.notification.integration", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_NEW_VULNERABILITY = new Topic<>( + "dtrack.notification.new-vulnerability", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY = new Topic<>( + "dtrack.notification.new-vulnerable-dependency", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_POLICY_VIOLATION = new Topic<>( + "dtrack.notification.policy-violation", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE = new Topic<>( + "dtrack.notification.project-audit-change", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_PROJECT_CREATED = new Topic<>( + "dtrack.notification.project-created", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE = new Topic<>( + "dtrack.notification.project-vuln-analysis-complete", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_REPOSITORY = new Topic<>( + "dtrack.notification.repository", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_USER = new Topic<>( + "dtrack.notification.user", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_NOTIFICATION_VEX = new Topic<>( + "dtrack.notification.vex", + Serdes.String(), + NOTIFICATION_PROTO_SERDE, + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_REPO_META_ANALYSIS_COMMAND = new Topic<>( + "dtrack.repo-meta-analysis.component", + Serdes.String(), + new KafkaProtobufSerde<>(AnalysisCommand.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_REPO_META_ANALYSIS_RESULT = new Topic<>( + "dtrack.repo-meta-analysis.result", + Serdes.String(), + new KafkaProtobufSerde<>(AnalysisResult.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULN_ANALYSIS_COMMAND = new Topic<>( + "dtrack.vuln-analysis.component", + new KafkaProtobufSerde<>(ScanKey.parser()), + new KafkaProtobufSerde<>(ScanCommand.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULN_ANALYSIS_RESULT = new Topic<>( + "dtrack.vuln-analysis.result", + new KafkaProtobufSerde<>(ScanKey.parser()), + new KafkaProtobufSerde<>(ScanResult.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULN_ANALYSIS_RESULT_PROCESSED = new Topic<>( + "dtrack.vuln-analysis.result.processed", + Serdes.String(), + new KafkaProtobufSerde<>(ScanResult.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULN_ANALYSIS_SCANNER_RESULT = new Topic<>( + "dtrack.vuln-analysis.scanner.result", + new KafkaProtobufSerde<>(ScanKey.parser()), + new KafkaProtobufSerde<>(ScannerResult.parser()), + new Topic.Config(3, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + TOPIC_VULNERABILITY = new Topic<>( + "dtrack.vulnerability", + Serdes.String(), + new KafkaProtobufSerde<>(Bom.parser()), + new Topic.Config(1, (short) 1, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT))); + TOPIC_VULNERABILITY_MIRROR_COMMAND = new Topic<>( + "dtrack.vulnerability.mirror.command", + Serdes.String(), + Serdes.String(), + new Topic.Config(1, (short) 1, Map.of(RETENTION_MS_CONFIG, DEFAULT_RETENTION_MS))); + + ALL_TOPICS = List.of( + TOPIC_EPSS, + TOPIC_NOTIFICATION_ANALYZER, + TOPIC_NOTIFICATION_BOM, + TOPIC_NOTIFICATION_CONFIGURATION, + TOPIC_NOTIFICATION_DATASOURCE_MIRRORING, + TOPIC_NOTIFICATION_FILE_SYSTEM, + TOPIC_NOTIFICATION_INTEGRATION, + TOPIC_NOTIFICATION_NEW_VULNERABILITY, + TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY, + TOPIC_NOTIFICATION_POLICY_VIOLATION, + TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, + TOPIC_NOTIFICATION_PROJECT_CREATED, + TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, + TOPIC_NOTIFICATION_REPOSITORY, + TOPIC_NOTIFICATION_VEX, + TOPIC_NOTIFICATION_USER, + TOPIC_REPO_META_ANALYSIS_COMMAND, + TOPIC_REPO_META_ANALYSIS_RESULT, + TOPIC_VULN_ANALYSIS_COMMAND, + TOPIC_VULN_ANALYSIS_RESULT, + TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, + TOPIC_VULN_ANALYSIS_SCANNER_RESULT, + TOPIC_VULNERABILITY, + TOPIC_VULNERABILITY_MIRROR_COMMAND); + } + } diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java index f96819fad..e84b45a44 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/ProcessorInitializer.java @@ -36,15 +36,15 @@ public void contextInitialized(final ServletContextEvent event) { LOGGER.info("Initializing processors"); PROCESSOR_MANAGER.registerProcessor(VulnerabilityMirrorProcessor.PROCESSOR_NAME, - KafkaTopics.NEW_VULNERABILITY, new VulnerabilityMirrorProcessor()); + KafkaTopics.TOPIC_VULNERABILITY, new VulnerabilityMirrorProcessor()); PROCESSOR_MANAGER.registerProcessor(RepositoryMetaResultProcessor.PROCESSOR_NAME, - KafkaTopics.REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor()); + KafkaTopics.TOPIC_REPO_META_ANALYSIS_RESULT, new RepositoryMetaResultProcessor()); PROCESSOR_MANAGER.registerBatchProcessor(EpssMirrorProcessor.PROCESSOR_NAME, - KafkaTopics.NEW_EPSS, new EpssMirrorProcessor()); + KafkaTopics.TOPIC_EPSS, new EpssMirrorProcessor()); PROCESSOR_MANAGER.registerProcessor(VulnerabilityScanResultProcessor.PROCESSOR_NAME, - KafkaTopics.VULN_ANALYSIS_RESULT, new VulnerabilityScanResultProcessor()); + KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT, new VulnerabilityScanResultProcessor()); PROCESSOR_MANAGER.registerBatchProcessor(ProcessedVulnerabilityScanResultProcessor.PROCESSOR_NAME, - KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, new ProcessedVulnerabilityScanResultProcessor()); + KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, new ProcessedVulnerabilityScanResultProcessor()); PROCESSOR_MANAGER.startAll(); } diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java index e0a17ec63..573b2c4e7 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessor.java @@ -686,7 +686,7 @@ private void maybeQueueResultProcessedEvent(final ScanKey scanKey, final ScanRes .toList()) .build(); - final var event = new KafkaEvent<>(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, scanKey.getScanToken(), strippedScanResult); + final var event = new KafkaEvent<>(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, scanKey.getScanToken(), strippedScanResult); eventsToDispatch.get().add(event); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index a45e9cf85..1cb446856 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1277,12 +1277,22 @@ vulnerability.policy.s3.region= #
    #
  • Execution of database migrations
  • #
  • Populating the database with default objects (permissions, users, licenses, etc.)
  • +#
  • Creating Kafka topics with default configuration
  • #
# # @category: General # @type: boolean init.tasks.enabled=true +# Whether to initialize (i.e. create, with default configuration) Kafka topics on startup. +# This option is intended for testing and should not be used for production deployments, +# since production-grade topic configurations will differ vastly. +# Has no effect unless init.tasks.enabled is `true`. +# +# @category: General +# @type: boolean +init.tasks.kafka.topics.enabled=false + # Whether to only execute initialization tasks and exit. # # @category: General diff --git a/src/main/webapp/WEB-INF/web.xml b/src/main/webapp/WEB-INF/web.xml index 17e64e4bf..96d73b50b 100644 --- a/src/main/webapp/WEB-INF/web.xml +++ b/src/main/webapp/WEB-INF/web.xml @@ -35,6 +35,9 @@ org.dependencytrack.persistence.migration.MigrationInitializer + + org.dependencytrack.event.kafka.KafkaTopicInitializer + alpine.server.persistence.PersistenceManagerFactory diff --git a/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java b/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java index 0c2b3bf6b..167f8c39e 100644 --- a/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/KafkaEventDispatcherTest.java @@ -72,7 +72,7 @@ public void testDispatchEventWithComponentRepositoryMetaAnalysisEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("pkg:maven/foo/bar@1.2.3"); assertThat(record.value()).isNotNull(); assertThat(record.headers()).isEmpty(); @@ -89,7 +89,7 @@ public void testDispatchEventWithComponentVulnerabilityAnalysisEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); assertThat(record.key()).isNotNull(); assertThat(record.value()).isNotNull(); assertThat(record.headers()).satisfiesExactlyInAnyOrder( @@ -112,7 +112,7 @@ public void testDispatchEventWithGitHubAdvisoryMirrorEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULNERABILITY_MIRROR_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("GITHUB"); assertThat(record.value()).isNull(); assertThat(record.headers()).isEmpty(); @@ -126,7 +126,7 @@ public void testDispatchEventWithNistMirrorEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULNERABILITY_MIRROR_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("NVD"); assertThat(record.value()).isNull(); assertThat(record.headers()).isEmpty(); @@ -140,7 +140,7 @@ public void testDispatchEventWithOsvMirrorEvent() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULNERABILITY_MIRROR_COMMAND.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULNERABILITY_MIRROR_COMMAND.name()); assertThat(record.key()).asString().isEqualTo("OSV"); assertThat(record.value()).asString().isEqualTo("Maven"); assertThat(record.headers()).isEmpty(); @@ -179,7 +179,7 @@ public void testDispatchNotification() { assertThat(future).isCompletedWithValueMatching(Objects::nonNull); assertThat(mockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_ANALYZER.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_ANALYZER.name()); assertThat(record.key()).isNull(); assertThat(record.value()).isNotNull(); assertThat(record.headers()).isEmpty(); diff --git a/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java b/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java index 3d211df55..85b5db0a5 100644 --- a/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/KafkaTopicsTest.java @@ -32,12 +32,12 @@ public class KafkaTopicsTest { @Test public void testTopicNameWithPrefix() { environmentVariables.set("DT_KAFKA_TOPIC_PREFIX", "foo-bar.baz."); - assertThat(KafkaTopics.VULN_ANALYSIS_RESULT.name()).isEqualTo("foo-bar.baz.dtrack.vuln-analysis.result"); + assertThat(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT.name()).isEqualTo("foo-bar.baz.dtrack.vuln-analysis.result"); } @Test public void testTopicNameWithoutPrefix() { - assertThat(KafkaTopics.VULN_ANALYSIS_RESULT.name()).isEqualTo("dtrack.vuln-analysis.result"); + assertThat(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT.name()).isEqualTo("dtrack.vuln-analysis.result"); } } \ No newline at end of file diff --git a/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java b/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java index c888bfab5..d27b31612 100644 --- a/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/componentmeta/SupportedMetaHandlerTest.java @@ -53,8 +53,8 @@ public void testHandleIntegrityComponentNotInDB() throws MalformedPackageURLExce IntegrityMetaComponent result = handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/org.http4s/blaze-core_2.12"); assertThat(command.getComponent().getUuid()).isEqualTo(uuid.toString()); assertThat(command.getComponent().getInternal()).isFalse(); @@ -81,8 +81,8 @@ public void testHandleIntegrityComponentInDBForMoreThanAnHour() throws Malformed IntegrityMetaComponent integrityMetaComponent = handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/org.http4s/blaze-core_2.12"); assertThat(command.getComponent().getUuid()).isEqualTo(uuid.toString()); assertThat(command.getComponent().getInternal()).isFalse(); @@ -111,8 +111,8 @@ public void testHandleIntegrityWhenMetadataExists() throws MalformedPackageURLEx IntegrityMetaComponent integrityMetaComponent = handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/org.http4s/blaze-core_2.12"); assertThat(command.getComponent().getUuid()).isEqualTo(uuid.toString()); assertThat(command.getComponent().getInternal()).isFalse(); diff --git a/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java b/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java index 8d54704e3..9b1f94a86 100644 --- a/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/componentmeta/UnSupportedMetaHandlerTest.java @@ -46,8 +46,8 @@ public void testHandleComponentInDb() throws MalformedPackageURLException { handler.handle(); assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:golang/foo/bar@baz"); assertThat(command.getComponent().getInternal()).isFalse(); assertThat(command.getFetchMeta()).isEqualTo(FetchMeta.FETCH_META_LATEST_VERSION); diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java index d92e7a97c..44440157d 100644 --- a/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/ProcessedVulnerabilityScanResultProcessorTest.java @@ -153,12 +153,12 @@ public void testProcessWithFailureThresholdExceeded() throws Exception { ); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_VULN_ANALYSIS_COMPLETE); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -225,12 +225,12 @@ public void testProcessWithResultWithoutScannerResults() throws Exception { ); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_VULN_ANALYSIS_COMPLETE); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -295,14 +295,14 @@ public void testProcessWithDelayedBomProcessedNotification() throws Exception { processor.process(List.of(aConsumerRecord(vulnScan.getToken().toString(), scanResult).build())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_BOM, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSED); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -358,14 +358,14 @@ public void testProcessWithDelayedBomProcessedNotificationWhenVulnerabilityScanF processor.process(List.of(aConsumerRecord(vulnScan.getToken().toString(), scanResult).build())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); - final String recordKey = deserializeKey(KafkaTopics.NOTIFICATION_BOM, record); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(recordKey).isEqualTo(project.getUuid().toString()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSED); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); @@ -412,7 +412,7 @@ public void testProcessWithDelayedBomProcessedNotificationWithoutCompletedBomPro processor.process(List.of(aConsumerRecord(vulnScan.getToken().toString(), scanResult).build())); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name())); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE.name())); await("Internal event publish") .atMost(Duration.ofSeconds(1)) diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java index 72d619691..c7b29abc7 100644 --- a/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/VulnerabilityScanResultProcessorTest.java @@ -146,8 +146,8 @@ public void dropFailedScanResultTest() { assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_ANALYZER.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_ANALYZER, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_ANALYZER.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_ANALYZER, record); assertThat(notification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(notification.getLevel()).isEqualTo(LEVEL_ERROR); assertThat(notification.getGroup()).isEqualTo(GROUP_ANALYZER); @@ -157,10 +157,10 @@ record -> { component.getUuid(), SCANNER_INTERNAL, scanToken + "/" + component.getUuid()); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); } ); @@ -191,10 +191,10 @@ public void dropPendingScanResultTest() { processor.process(aConsumerRecord(scanKey, scanResult).build()); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -218,10 +218,10 @@ public void processSuccessfulScanResultWhenComponentDoesNotExistTest() { processor.process(aConsumerRecord(scanKey, scanResult).build()); assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -300,15 +300,15 @@ public void processSuccessfulScanResult() { assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABLE_DEPENDENCY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABLE_DEPENDENCY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABLE_DEPENDENCY); @@ -329,8 +329,8 @@ record -> { ); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABILITY); @@ -340,8 +340,8 @@ record -> { assertThat(subject.getVulnerabilityAnalysisLevel()).isEqualTo("BOM_UPLOAD_ANALYSIS"); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABILITY); @@ -400,10 +400,10 @@ public void processSuccessfulScanResultWithExistingFindingTest() { // Because the vulnerability was reported already, no notification must be sent. assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -712,15 +712,15 @@ public void analysisThroughPolicyNewAnalysisTest() { // TODO: There should be PROJECT_AUDIT_CHANGE notifications. assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_NEW_VULNERABILITY, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_NEW_VULNERABILITY, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_NEW_VULNERABILITY); @@ -810,10 +810,10 @@ public void analysisThroughPolicyNewAnalysisSuppressionTest() { // The vulnerability was suppressed, so no notifications to be expected. // TODO: There should be PROJECT_AUDIT_CHANGE notifications. assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -925,15 +925,15 @@ public void analysisThroughPolicyExistingDifferentAnalysisTest() { // There should be PROJECT_AUDIT_CHANGE notification. assertThat(kafkaMockProducer.history()).satisfiesExactly( record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, record); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getLevel()).isEqualTo(LEVEL_INFORMATIONAL); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -1030,10 +1030,10 @@ public void analysisThroughPolicyExistingEqualAnalysisTest() { // The vulnerability already existed, so no notifications to be expected. assertThat(kafkaMockProducer.history()).satisfiesExactly(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED.name()); - final String recordKey = deserializeKey(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED.name()); + final String recordKey = deserializeKey(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordKey).isEqualTo(scanToken); - final ScanResult recordValue = deserializeValue(KafkaTopics.VULN_ANALYSIS_RESULT_PROCESSED, record); + final ScanResult recordValue = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_RESULT_PROCESSED, record); assertThat(recordValue.getScannerResultsList()).noneMatch(ScannerResult::hasBom); }); } @@ -1388,7 +1388,7 @@ public void analysisThroughPolicyWithAnalysisUpdateNotOnStateOrSuppressionTest() assertThat(analysis.getAnalysisDetails()).isEqualTo("newDetails"); assertThat(kafkaMockProducer.history()).noneSatisfy( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE.name())); + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE.name())); } @Test diff --git a/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java b/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java index c1fa0eb76..347117ee5 100644 --- a/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java +++ b/src/test/java/org/dependencytrack/event/kafka/processor/api/ProcessorManagerTest.java @@ -87,7 +87,7 @@ public void tearDown() { @Test public void testSingleRecordProcessor() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final var recordsProcessed = new AtomicInteger(0); @@ -116,7 +116,7 @@ record -> recordsProcessed.incrementAndGet(); @Test public void testSingleRecordProcessorRetry() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final var attemptsCounter = new AtomicInteger(0); @@ -153,7 +153,7 @@ public void testSingleRecordProcessorRetry() throws Exception { @Test public void testBatchProcessor() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final var recordsProcessed = new AtomicInteger(0); @@ -187,7 +187,7 @@ public void testBatchProcessor() throws Exception { @Test public void testWithMaxConcurrencyMatchingPartitionCount() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 12, (short) 1))).all().get(); environmentVariables.set("KAFKA_PROCESSOR_FOO_PROCESSING_ORDER", "partition"); @@ -218,8 +218,8 @@ public void testWithMaxConcurrencyMatchingPartitionCount() throws Exception { @Test public void testStartAllWithMissingTopics() throws Exception { - final var inputTopicA = new Topic<>("input-a", Serdes.String(), Serdes.String()); - final var inputTopicB = new Topic<>("input-b", Serdes.String(), Serdes.String()); + final var inputTopicA = new Topic<>("input-a", Serdes.String(), Serdes.String(), /* defaultConfig */ null); + final var inputTopicB = new Topic<>("input-b", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopicA.name(), 3, (short) 1))).all().get(); final Processor processor = record -> { @@ -240,7 +240,7 @@ public void testStartAllWithMissingTopics() throws Exception { @Test public void testProbeHealth() throws Exception { - final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String()); + final var inputTopic = new Topic<>("input", Serdes.String(), Serdes.String(), /* defaultConfig */ null); adminClient.createTopics(List.of(new NewTopic(inputTopic.name(), 3, (short) 1))).all().get(); final Processor processor = record -> { diff --git a/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java index 112e18421..52f8b4f60 100644 --- a/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/AnalysisResourceTest.java @@ -335,9 +335,9 @@ public void updateAnalysisCreateNewTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isTrue(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -396,9 +396,9 @@ public void updateAnalysisCreateNewWithUserTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isTrue(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -446,9 +446,9 @@ public void updateAnalysisCreateNewWithEmptyRequestTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isFalse(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -524,9 +524,9 @@ public void updateAnalysisUpdateExistingTest() throws Exception { assertThat(responseJson.getBoolean("isSuppressed")).isFalse(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -641,9 +641,9 @@ public void updateAnalysisUpdateExistingWithEmptyRequestTest() throws Exception .hasFieldOrPropertyWithValue("commenter", Json.createValue("Test Users")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -811,9 +811,9 @@ public void updateAnalysisIssue1409Test() throws InterruptedException { assertThat(responseJson.getBoolean("isSuppressed")).isFalse(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); diff --git a/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java index d5f323e90..0f1501647 100644 --- a/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/BomResourceTest.java @@ -1196,7 +1196,7 @@ public void uploadBomInvalidJsonTest() throws InterruptedException { """); assertThat(kafkaMockProducer.history()).hasSize(1); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); AssertionsForClassTypes.assertThat(userNotification).isNotNull(); AssertionsForClassTypes.assertThat(userNotification.getScope()).isEqualTo(SCOPE_PORTFOLIO); AssertionsForClassTypes.assertThat(userNotification.getGroup()).isEqualTo(GROUP_BOM_VALIDATION_FAILED); @@ -1250,7 +1250,7 @@ public void uploadBomInvalidXmlTest() throws InterruptedException { """); assertThat(kafkaMockProducer.history()).hasSize(1); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); AssertionsForClassTypes.assertThat(userNotification).isNotNull(); AssertionsForClassTypes.assertThat(userNotification.getScope()).isEqualTo(SCOPE_PORTFOLIO); AssertionsForClassTypes.assertThat(userNotification.getGroup()).isEqualTo(GROUP_BOM_VALIDATION_FAILED); diff --git a/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java index d0863a1b9..aec0e110d 100644 --- a/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/ComponentResourceTest.java @@ -644,15 +644,15 @@ public void createComponentTest() { Assert.assertEquals("SampleAuthor" ,json.getJsonArray("authors").getJsonObject(0).getString("name")); Assert.assertTrue(UuidUtil.isValidUUID(json.getString("uuid"))); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo(json.getString("purl")); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(json.getString("uuid")); } ); @@ -725,15 +725,15 @@ public void updateComponentTest() { Assert.assertEquals("Test component", json.getString("description")); Assert.assertEquals(1, json.getJsonArray("externalReferences").size()); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo(json.getString("purl")); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = KafkaTestUtil.deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = KafkaTestUtil.deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(json.getString("uuid")); } ); diff --git a/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java index afcc702bf..49f87cba0 100644 --- a/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/ProjectResourceTest.java @@ -1482,7 +1482,7 @@ public void createProjectTest() throws Exception { Assert.assertTrue(UuidUtil.isValidUUID(json.getString("uuid"))); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); assertThat(projectNotification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(projectNotification.getGroup()).isEqualTo(GROUP_PROJECT_CREATED); diff --git a/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java b/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java index 19b844be2..271b28a3c 100644 --- a/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/UserResourceAuthenticatedTest.java @@ -229,7 +229,7 @@ public void createLdapUserTest() throws InterruptedException { Assert.assertEquals("blackbeard", json.getString("username")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_CREATED); @@ -277,7 +277,7 @@ public void deleteLdapUserTest() throws InterruptedException { Assert.assertEquals(204, response.getStatus(), 0); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_DELETED); @@ -306,7 +306,7 @@ public void createManagedUserTest() throws InterruptedException { Assert.assertEquals("blackbeard", json.getString("username")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_CREATED); @@ -512,7 +512,7 @@ public void deleteManagedUserTest() throws InterruptedException { Assert.assertEquals(204, response.getStatus(), 0); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_DELETED); @@ -535,7 +535,7 @@ public void createOidcUserTest() throws InterruptedException { Assert.assertEquals("blackbeard", json.getString("username")); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 1, Duration.ofSeconds(5)); - final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.NOTIFICATION_USER, kafkaMockProducer.history().get(0)); + final org.dependencytrack.proto.notification.v1.Notification userNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_USER, kafkaMockProducer.history().get(0)); assertThat(userNotification).isNotNull(); assertThat(userNotification.getScope()).isEqualTo(SCOPE_SYSTEM); assertThat(userNotification.getGroup()).isEqualTo(GROUP_USER_CREATED); diff --git a/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java b/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java index 2c35255d5..c5d759bd5 100644 --- a/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java +++ b/src/test/java/org/dependencytrack/resources/v1/ViolationAnalysisResourceTest.java @@ -215,9 +215,9 @@ public void updateAnalysisCreateNewTest() throws Exception { .doesNotContainKey("commenter"); // Not set when authenticating via API key; assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -265,9 +265,9 @@ public void updateAnalysisCreateNewWithEmptyRequestTest() throws Exception { assertThat(jsonObject.getJsonArray("analysisComments")).isEmpty(); assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -333,9 +333,9 @@ public void updateAnalysisUpdateExistingTest() throws Exception { .doesNotContainKey("commenter"); // Not set when authenticating via API key assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); @@ -441,9 +441,9 @@ public void updateAnalysisUpdateExistingWithEmptyRequestTest() throws Exception .doesNotContainKey("commenter"); // Not set when authenticating via API key assertConditionWithTimeout(() -> kafkaMockProducer.history().size() == 2, Duration.ofSeconds(5)); - final Notification projectNotification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); + final Notification projectNotification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED, kafkaMockProducer.history().get(0)); assertThat(projectNotification).isNotNull(); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_AUDIT_CHANGE, kafkaMockProducer.history().get(1)); assertThat(notification).isNotNull(); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_PROJECT_AUDIT_CHANGE); diff --git a/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java b/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java index b02edd317..990fb9643 100644 --- a/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/BomUploadProcessingTaskTest.java @@ -113,11 +113,11 @@ public void informTest() throws Exception { new BomUploadProcessingTask().inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) ); qm.getPersistenceManager().refresh(project); qm.getPersistenceManager().refreshAll(qm.getAllWorkflowStatesForAToken(bomUploadEvent.getChainIdentifier())); @@ -271,11 +271,11 @@ public void informTestWithComponentAlreadyExistsForIntegrityCheck() throws Excep new BomUploadProcessingTask().inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) ); qm.getPersistenceManager().refresh(project); qm.getPersistenceManager().refreshAll(qm.getAllWorkflowStatesForAToken(bomUploadEvent.getChainIdentifier())); @@ -354,9 +354,9 @@ public void informWithEmptyBomTest() throws Exception { new BomUploadProcessingTask().inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()) ); qm.getPersistenceManager().refreshAll(qm.getAllWorkflowStatesForAToken(bomUploadEvent.getChainIdentifier())); qm.getPersistenceManager().refresh(project); @@ -412,10 +412,10 @@ public void informWithInvalidBomTest() throws Exception { new BomUploadProcessingTask().inform(bomUploadEvent); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getScope()).isEqualTo(SCOPE_PORTFOLIO); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSING_FAILED); assertThat(notification.getLevel()).isEqualTo(LEVEL_ERROR); @@ -537,20 +537,20 @@ public void informWithBloatedBomTest() throws Exception { assertThat(kafkaMockProducer.history()) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_PROCESSED); }) .noneSatisfy(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSING_FAILED); }); @@ -606,14 +606,14 @@ public void informWithBloatedBomTest() throws Exception { // Verify that all vulnerability analysis commands have been sent. final long vulnAnalysisCommandsSent = kafkaMockProducer.history().stream() .map(ProducerRecord::topic) - .filter(KafkaTopics.VULN_ANALYSIS_COMMAND.name()::equals) + .filter(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()::equals) .count(); assertThat(vulnAnalysisCommandsSent).isEqualTo(9056); // Verify that all repository meta analysis commands have been sent. final long repoMetaAnalysisCommandsSent = kafkaMockProducer.history().stream() .map(ProducerRecord::topic) - .filter(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()::equals) + .filter(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()::equals) .count(); assertThat(repoMetaAnalysisCommandsSent).isEqualTo(9056); } @@ -764,20 +764,20 @@ public void informWithComponentsUnderMetadataBomTest() throws Exception { assertThat(kafkaMockProducer.history()) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }) .anySatisfy(record -> { - assertThat(deserializeKey(KafkaTopics.NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(deserializeKey(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)).isEqualTo(project.getUuid().toString()); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_PROCESSED); }) .noneSatisfy(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSING_FAILED); }); @@ -812,14 +812,14 @@ public void informWithDelayedBomProcessedNotification() throws Exception { new BomUploadProcessingTask(new KafkaEventDispatcher(), /* delayBomProcessedNotification */ true).inform(bomUploadEvent); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }, - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) // BOM_PROCESSED notification should not have been sent. ); } @@ -834,15 +834,15 @@ public void informWithDelayedBomProcessedNotificationAndNoComponents() throws Ex new BomUploadProcessingTask(new KafkaEventDispatcher(), /* delayBomProcessedNotification */ true).inform(bomUploadEvent); assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_CONSUMED); }, event -> { - assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, event); + assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, event); assertThat(notification.getGroup()).isEqualTo(Group.GROUP_BOM_PROCESSED); } ); @@ -858,10 +858,10 @@ public void informWithComponentWithoutPurl() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) // (No REPO_META_ANALYSIS_COMMAND event because the component doesn't have a PURL) ); @@ -883,12 +883,12 @@ public void informWithCustomLicenseResolutionTest() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly( @@ -921,10 +921,10 @@ public void informWithBomContainingLicenseExpressionTest() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly(component -> { @@ -949,10 +949,10 @@ public void informWithBomContainingLicenseExpressionWithSingleIdTest() throws Ex assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly(component -> { @@ -973,10 +973,10 @@ public void informWithBomContainingInvalidLicenseExpressionTest() throws Excepti assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).satisfiesExactly(component -> { @@ -1210,10 +1210,10 @@ public void informWithBomContainingServiceTest() throws Exception { assertBomProcessedNotification(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()), - event -> assertThat(event.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()) + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()), + event -> assertThat(event.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()) ); assertThat(qm.getAllComponents(project)).isNotEmpty(); @@ -1669,14 +1669,14 @@ public void informWithEmptyComponentAndServiceNameTest() throws Exception { private void assertBomProcessedNotification() throws Exception { try { assertThat(kafkaMockProducer.history()).anySatisfy(record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_BOM.name()); - final Notification notification = deserializeValue(KafkaTopics.NOTIFICATION_BOM, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_BOM.name()); + final Notification notification = deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record); assertThat(notification.getGroup()).isEqualTo(GROUP_BOM_PROCESSED); }); } catch (AssertionError e) { final Optional optionalNotification = kafkaMockProducer.history().stream() - .filter(record -> record.topic().equals(KafkaTopics.NOTIFICATION_BOM.name())) - .map(record -> deserializeValue(KafkaTopics.NOTIFICATION_BOM, record)) + .filter(record -> record.topic().equals(KafkaTopics.TOPIC_NOTIFICATION_BOM.name())) + .map(record -> deserializeValue(KafkaTopics.TOPIC_NOTIFICATION_BOM, record)) .filter(notification -> notification.getGroup() == GROUP_BOM_PROCESSING_FAILED) .findAny(); if (optionalNotification.isEmpty()) { diff --git a/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java b/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java index a9da28052..35814551e 100644 --- a/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/IntegrityMetaInitializerTaskTest.java @@ -64,7 +64,7 @@ public void testIntegrityMetaInitializer() { new IntegrityMetaInitializerTask().inform(new IntegrityMetaInitializerEvent()); assertThat(qm.getIntegrityMetaComponentCount()).isEqualTo(1); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()) ); } diff --git a/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java b/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java index cd52caaac..3b567877a 100644 --- a/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/RepositoryMetaAnalysisTaskTest.java @@ -88,14 +88,14 @@ public void testPortfolioRepositoryMetaAnalysis() { new RepositoryMetaAnalysisTask().inform(new PortfolioRepositoryMetaAnalysisEvent()); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectA - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectB - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectC - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectD - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), // projectE + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectA + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectB + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectC + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectD + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), // projectE record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isFalse(); }, @@ -103,8 +103,8 @@ record -> { // componentProjectC must not have been submitted, because it belongs to an inactive project // componentProjectD has the same PURL coordinates as componentProjectA and is not submitted again record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isTrue(); } @@ -159,23 +159,23 @@ public void testProjectRepositoryMetaAnalysis() { new RepositoryMetaAnalysisTask().inform(new ProjectRepositoryMetaAnalysisEvent(project.getUuid())); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isFalse(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-a@1.0.1"); assertThat(command.getComponent().getInternal()).isTrue(); }, // componentB must not have been submitted, because it does not have a PURL record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.REPO_META_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.REPO_META_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_REPO_META_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getPurl()).isEqualTo("pkg:maven/acme/acme-lib-c@3.0.1"); assertThat(command.getComponent().getInternal()).isFalse(); } @@ -195,7 +195,7 @@ public void testProjectRepositoryMetaAnalysisWithInactiveProject() { new RepositoryMetaAnalysisTask().inform(new ProjectRepositoryMetaAnalysisEvent(project.getUuid())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()) // Component of inactive project must not have been submitted for analysis ); } diff --git a/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java b/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java index 97995e419..98ec39618 100644 --- a/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java +++ b/src/test/java/org/dependencytrack/tasks/VulnerabilityAnalysisTaskTest.java @@ -69,12 +69,12 @@ public void testPortfolioVulnerabilityAnalysis() { new VulnerabilityAnalysisTask().inform(new PortfolioVulnerabilityAnalysisEvent()); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentProjectB.getUuid().toString()); assertThat(command.getComponent().getCpe()).isEqualTo(componentProjectB.getCpe()); assertThat(command.getComponent().hasPurl()).isFalse(); @@ -82,8 +82,8 @@ record -> { assertThat(command.getComponent().getInternal()).isTrue(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentProjectA.getUuid().toString()); assertThat(command.getComponent().hasCpe()).isFalse(); assertThat(command.getComponent().getPurl()).isEqualTo(componentProjectA.getPurl().toString()); @@ -134,10 +134,10 @@ public void testProjectVulnerabilityAnalysis() { assertThat(scan.getUpdatedAt()).isNotNull(); assertThat(kafkaMockProducer.history()).satisfiesExactlyInAnyOrder( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()), + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()), record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentC.getUuid().toString()); assertThat(command.getComponent().getCpe()).isEqualTo(componentC.getCpe()); assertThat(command.getComponent().getPurl()).isEqualTo(componentC.getPurl().toString()); @@ -145,8 +145,8 @@ record -> { assertThat(command.getComponent().getInternal()).isFalse(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentB.getUuid().toString()); assertThat(command.getComponent().getCpe()).isEqualTo(componentB.getCpe()); assertThat(command.getComponent().hasPurl()).isFalse(); @@ -154,8 +154,8 @@ record -> { assertThat(command.getComponent().getInternal()).isFalse(); }, record -> { - assertThat(record.topic()).isEqualTo(KafkaTopics.VULN_ANALYSIS_COMMAND.name()); - final var command = deserializeValue(KafkaTopics.VULN_ANALYSIS_COMMAND, record); + assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND.name()); + final var command = deserializeValue(KafkaTopics.TOPIC_VULN_ANALYSIS_COMMAND, record); assertThat(command.getComponent().getUuid()).isEqualTo(componentA.getUuid().toString()); assertThat(command.getComponent().hasCpe()).isFalse(); assertThat(command.getComponent().getPurl()).isEqualTo(componentA.getPurl().toString()); @@ -176,7 +176,7 @@ public void testProjectVulnerabilityAnalysisWithNoComponents() { assertThat(scan).isNull(); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()) // Project does not have any components so nothing should've been submitted for analysis ); } @@ -193,7 +193,7 @@ public void testProjectVulnerabilityAnalysisWithInactiveProject() { new VulnerabilityAnalysisTask().inform(new ProjectVulnerabilityAnalysisEvent(project.getUuid())); assertThat(kafkaMockProducer.history()).satisfiesExactly( - record -> assertThat(record.topic()).isEqualTo(KafkaTopics.NOTIFICATION_PROJECT_CREATED.name()) + record -> assertThat(record.topic()).isEqualTo(KafkaTopics.TOPIC_NOTIFICATION_PROJECT_CREATED.name()) // Component of inactive project must not have been submitted for analysis ); }