Skip to content

Commit

Permalink
Atomic Writer (#136)
Browse files Browse the repository at this point in the history
* upgrades gradle
upgrades spring security
upgrades SecurityConfig
fixes docker image in compose to run on latest mysql

* upgrade Spring

* fixes integration tests

* optimize imports

* fixes LOB mapping to tinytext

* Atomic Writer

* fixes integration tests

* fixes owasp

* Update README.md
  • Loading branch information
ndc-dxc authored Aug 26, 2024
1 parent 795c222 commit 9643664
Show file tree
Hide file tree
Showing 46 changed files with 592 additions and 241 deletions.
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ jacocoTestCoverageVerification {
limit {
counter = 'LINE'
value = 'COVEREDRATIO'
minimum = 0.9
minimum = 0.8
}
excludes = [
'it.gov.innovazione.ndc.config.*',
'it.gov.innovazione.ndc.Application',
'it.gov.innovazione.ndc.harvester.util.*',
'it.gov.innovazione.ndc.model.profiles.*',
'it.gov.innovazione.ndc.model.*',
'it.gov.innovazione.ndc.gen.*',
'it.gov.innovazione.ndc.validator.BaseSemanticAssetValidator',
'it.gov.innovazione.ndc.controller.*',
Expand All @@ -189,6 +189,8 @@ jacocoTestCoverageVerification {
'it.gov.innovazione.ndc.harvester.harvesters.utils.PathUtils',
'it.gov.innovazione.ndc.harvester.service.OnceLogger',
'it.gov.innovazione.ndc.harvester.service.ConfigReaderService',
'it.gov.innovazione.ndc.service.DefaultInstanceManager',
'it.gov.innovazione.ndc.repository.TripleStoreRepository',
'it.gov.innovazione.ndc.service.EventCleaner',
'it.gov.innovazione.ndc.service.TemplateService',
'it.gov.innovazione.ndc.alerter.*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;

import static it.gov.innovazione.ndc.harvester.service.RepositoryUtils.asRepo;
import static it.gov.innovazione.ndc.integration.Containers.ELASTICSEARCH_PORT;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -82,10 +83,9 @@ public void beforeEach() throws IOException {

static void updateTestcontainersProperties(DynamicPropertyRegistry registry) {
String url = "http://localhost:" + virtuoso.getMappedPort(Containers.VIRTUOSO_PORT);
String elasticSearchAddress = elasticsearchContainer.getHttpHostAddress();
registry.add("virtuoso.sparql", () -> url + "/sparql");
registry.add("virtuoso.sparql-graph-store", () -> url + "/sparql-graph-crud/");
registry.add("spring.elasticsearch.rest.uris", () -> elasticSearchAddress);
registry.add("elastic.test.exposed-port", () -> elasticsearchContainer.getMappedPort(ELASTICSEARCH_PORT));
}

private void dataIsHarvested() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package it.gov.innovazione.ndc.integration;

import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.model.harvester.Repository;
import it.gov.innovazione.ndc.service.InstanceManager;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Service
@Primary
@RequiredArgsConstructor
public class InMemoryInstanceManager implements InstanceManager {

private final Map<String, Instance> instances = new HashMap<>();

private Instance allInstances = Instance.PRIMARY;

public void setAllInstances(Instance allInstances) {
this.allInstances = allInstances;
instances.replaceAll((k, v) -> allInstances);
}

@Override
public Instance getNextOnlineInstance(String repoUrl) {
if (instances.containsKey(repoUrl)) {
return instances.get(repoUrl).switchInstance();
}
Repository fakeRepo = Repository.builder()
.url(repoUrl)
.build();
return getCurrentInstance(fakeRepo).switchInstance();
}

@Override
public Instance getNextOnlineInstance(Repository repository) {
return getCurrentInstance(repository).switchInstance();
}

@Override
public Instance getCurrentInstance(Repository repository) {
if (!instances.containsKey(repository.getUrl())) {
instances.put(repository.getUrl(), allInstances);
}
return instances.get(repository.getUrl());
}

@Override
public void switchInstances(Repository repository) {
Instance toPut = getCurrentInstance(repository).switchInstance();
instances.put(repository.getUrl(), toPut);
}

@Override
public List<RepositoryInstance> getCurrentInstances() {
return instances.entrySet().stream()
.map(entry -> RepositoryInstance.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}

public void switchAllInstances() {
instances.replaceAll((k, v) -> v.switchInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import it.gov.innovazione.ndc.harvester.SemanticAssetType;
import it.gov.innovazione.ndc.model.profiles.NDC;
import it.gov.innovazione.ndc.service.GithubService;
import it.gov.innovazione.ndc.service.InstanceManager;
import junit.framework.AssertionFailedError;
import org.apache.jena.query.QuerySolution;
import org.apache.jena.query.ResultSet;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdfconnection.RDFConnection;
import org.apache.jena.rdfconnection.RDFConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.DynamicPropertyRegistry;
Expand All @@ -41,12 +45,25 @@ public class RestApiIntegrationTests extends BaseIntegrationTest {
private GithubService githubService;
@MockBean
private NdcEventPublisher ndcEventPublisher;
@Autowired
private InstanceManager instanceManager;

@DynamicPropertySource
static void updateDynamicPropertySource(DynamicPropertyRegistry registry) {
updateTestcontainersProperties(registry);
}

@BeforeEach
public void beforeSetup() {
((InMemoryInstanceManager) instanceManager).switchAllInstances();
}

@AfterEach
public void afterSetup() {
((InMemoryInstanceManager) instanceManager).switchAllInstances();
}


@Test
void shouldBeAbleToHarvestAndSearchControlledVocabularySuccessfully() {
final String assetIri = "https://w3id.org/italia/controlled-vocabulary/licences";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import it.gov.innovazione.ndc.config.ElasticConfigurator;
import it.gov.innovazione.ndc.harvester.SemanticAssetType;
import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.model.index.SemanticAssetMetadata;
import it.gov.innovazione.ndc.repository.SemanticAssetMetadataRepository;
import org.elasticsearch.client.RestClient;
Expand Down Expand Up @@ -32,7 +33,9 @@ public class SemanticAssetMetadataRepositoryIntegrationTest {
public static void beforeAll() {
elastic.start();
elasticsearchOperations = buildElasticsearchOps();
repository = new SemanticAssetMetadataRepository(elasticsearchOperations);
InMemoryInstanceManager instanceManager = new InMemoryInstanceManager();
instanceManager.setAllInstances(Instance.PRIMARY);
repository = new SemanticAssetMetadataRepository(elasticsearchOperations, instanceManager);
}

@NotNull
Expand Down Expand Up @@ -66,6 +69,7 @@ void shouldIndexAndSearch() {
.repoUrl(repoName)
.type(type)
.iri(iri)
.instance(Instance.PRIMARY.name())
.build();
entries.add(entry);
}
Expand All @@ -76,7 +80,7 @@ void shouldIndexAndSearch() {

elasticsearchOperations.indexOps(SemanticAssetMetadata.class).refresh();

List<SemanticAssetMetadata> vocabs = repository.findVocabulariesForRepoUrl("http://repo1");
List<SemanticAssetMetadata> vocabs = repository.findVocabulariesForRepoUrl("http://repo1", Instance.PRIMARY);
assertThat(vocabs).hasSize(ASSET_COUNT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class TripleStoreRepositoryTest {
private static String sparqlUrl;
private static String sparqlGraphUrl;
private static final String graphName = "http://www.fantasy.org/graph";
private static final String oldGraphName = "http://old.www.fantasy.org/graph";

private static TripleStoreRepository repository;

Expand Down Expand Up @@ -75,7 +76,7 @@ void shouldSaveGivenModelInVirtuosoTestcontainer() {
"<http://purl.org/dc/terms/accrualPeriodicity>",
"?o"
)
.from(graphName)
.from(oldGraphName)
.build();
QueryExecution queryExecution =
QueryExecutionFactory.sparqlService(sparqlUrl, findPeriodicity);
Expand Down Expand Up @@ -105,7 +106,7 @@ void shouldSaveGivenModelWithBlankNodeInVirtuosoTestcontainer() {
"<http://www.w3.org/ns/dcat#keyword>",
"?k"
)
.from(graphName)
.from(oldGraphName)
.build();
QueryExecution queryExecution =
QueryExecutionFactory.sparqlService(sparqlUrl, keywordQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import it.gov.innovazione.ndc.harvester.exception.HarvesterAlreadyInProgressException;
import it.gov.innovazione.ndc.harvester.exception.HarvesterException;
import it.gov.innovazione.ndc.harvester.exception.RepoContainsNdcIssueException;
import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.service.HarvesterRunService;
import it.gov.innovazione.ndc.model.harvester.HarvesterRun;
import it.gov.innovazione.ndc.model.harvester.Repository;
import it.gov.innovazione.ndc.service.GithubService;
import it.gov.innovazione.ndc.service.InstanceManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
Expand Down Expand Up @@ -47,6 +49,7 @@ public class SimpleHarvestRepositoryProcessor {
private final GithubService githubService;

private final List<String> locks = new ArrayList<>();
private final InstanceManager instanceManager;

public static List<String> getAllRunningHarvestThreadNames() {
return ThreadUtils.getAllThreads().stream()
Expand All @@ -64,7 +67,10 @@ private void setThreadName(String runId, String repoId, String revision, String
public void execute(String runId, Repository repository, String correlationId, String revision, boolean force, String currentUserLogin) {
try {
setThreadName(runId, repository.getId(), revision, "RUNNING");
publishHarvesterStartedEvent(repository, correlationId, revision, runId, currentUserLogin);

Instance instanceToHarvest = instanceManager.getNextOnlineInstance(repository);

publishHarvesterStartedEvent(repository, correlationId, revision, runId, currentUserLogin, instanceToHarvest);

synchronized (locks) {
if (locks.contains(repository.getId() + revision)) {
Expand Down Expand Up @@ -98,15 +104,19 @@ public void execute(String runId, Repository repository, String correlationId, S
.correlationId(correlationId)
.runId(runId)
.currentUserId(currentUserLogin)
.instance(instanceToHarvest)
.build());

verifyNoNdcIssuesInRepoIfNecessary(repository);

harvesterService.harvest(repository, revision);
harvesterService.harvest(repository, revision, instanceToHarvest);

githubService.openIssueIfNecessary();

publishHarvesterSuccessfulEvent(repository, correlationId, revision, runId, currentUserLogin);

instanceManager.switchInstances(repository);

setThreadName(runId, repository.getId(), revision, "IDLE");
} catch (HarvesterException e) {
publishHarvesterFailedEvent(repository, correlationId, revision, runId, e.getHarvesterRunStatus(), e, currentUserLogin);
Expand Down Expand Up @@ -153,7 +163,7 @@ private synchronized void verifyHarvestingIsNotInProgress(String runId, Reposito
}
}

public void publishHarvesterStartedEvent(Repository repository, String correlationId, String revision, String runId, String currentUserLogin) {
public void publishHarvesterStartedEvent(Repository repository, String correlationId, String revision, String runId, String currentUserLogin, Instance instance) {
ndcEventPublisher.publishEvent(
"harvester",
"harvester.started",
Expand All @@ -162,6 +172,7 @@ public void publishHarvesterStartedEvent(Repository repository, String correlati
HarvesterStartedEvent.builder()
.runId(runId)
.repository(repository)
.instance(instance)
.revision(revision)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public <T> Optional<T> fromGlobal(ActualConfigService.ConfigKey key) {
}
}

private <T> Optional<T> fromRepo(ActualConfigService.ConfigKey key, String repoId) {
public <T> Optional<T> fromRepo(ActualConfigService.ConfigKey key, String repoId) {
try {
Class<T> type = null;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package it.gov.innovazione.ndc.eventhandler.event;

import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.alerter.entities.EventCategory;
import it.gov.innovazione.ndc.alerter.entities.Severity;
import it.gov.innovazione.ndc.alerter.event.AlertableEvent;
Expand All @@ -14,16 +15,17 @@
public class HarvesterStartedEvent implements AlertableEvent {
private final String runId;
private final Repository repository;
private final Instance instance;
private final String revision;

@Override
public String getName() {
return "Run " + runId + " started";
return "Run " + runId + " started on instance " + instance;
}

@Override
public String getDescription() {
return "Harvester run " + runId + " started";
return "Run " + runId + " started on instance " + instance + " for repository " + repository.getUrl() + " with revision " + revision;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private void handleHarvesterStartedEvent(NdcEventWrapper<HarvesterStartedEvent>
.correlationId(event.getCorrelationId())
.repositoryId(event.getPayload().getRepository().getId())
.repositoryUrl(event.getPayload().getRepository().getUrl())
.instance(event.getPayload().getInstance().name())
.startedAt(event.getTimestamp())
.startedBy(event.getUser())
.revision(event.getPayload().getRevision())
Expand Down
Loading

0 comments on commit 9643664

Please sign in to comment.