Skip to content

Commit

Permalink
Atomic writer fixes (#137)
Browse files Browse the repository at this point in the history
* Update README.md

* atomic writer fixes
  • Loading branch information
ndc-dxc authored Aug 26, 2024
1 parent 9643664 commit 45bdde0
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public void switchInstances(Repository repository) {
instances.put(repository.getUrl(), toPut);
}

@Override
public void rollbackInstance(Repository repository) {
switchInstances(repository);
}

@Override
public List<RepositoryInstance> getCurrentInstances() {
return instances.entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class TripleStoreRepositoryTest {
private static String sparqlUrl;
private static String sparqlGraphUrl;
private static final String graphName = "http://www.fantasy.org/graph";
private static final String oldGraphName = "http://old.www.fantasy.org/graph";
private static final String oldGraphName = "http://tmp.www.fantasy.org/graph";

private static TripleStoreRepository repository;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ public void clearRepo(@RequestParam("repo_url") String repoUrl) {
.build());
}

@PostMapping(value = "jobs/rollback", params = "repositoryId")
@ResponseStatus(HttpStatus.ACCEPTED)
public void rollback(@RequestParam("repositoryId") String repositoryId) {
log.info("Starting rollback job at " + LocalDateTime.now() + "for repository " + repositoryId);
harvesterJob.rollback(repositoryId);
}

@Getter
@Builder
private static class WebHarversterAlertableEvent implements AlertableEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import it.gov.innovazione.ndc.harvester.util.GitUtils;
import it.gov.innovazione.ndc.model.harvester.Repository;
import it.gov.innovazione.ndc.repository.HarvestJobException;
import it.gov.innovazione.ndc.service.DefaultInstanceManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -25,6 +26,7 @@ public class HarvesterJob {
private final RepositoryService repositoryService;
private final SimpleHarvestRepositoryProcessor simpleHarvestRepositoryProcessor;
private final GitUtils gitUtils;
private final DefaultInstanceManager defaultInstanceManager;

public List<JobExecutionResponse> harvest(Boolean force) {
List<Repository> allRepos = repositoryService.getActiveRepos();
Expand Down Expand Up @@ -84,4 +86,9 @@ private JobExecutionResponse harvest(Repository repository, String correlationId

}

public void rollback(String repositoryId) {
Repository repository = repositoryService.findActiveRepoById(repositoryId)
.orElseThrow(() -> new HarvestJobException(String.format("Repository %s not found", repositoryId)));
defaultInstanceManager.rollbackInstance(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ private void storeRightsHolders(Repository repository) {

private void clearRepo(String repoUrl, Instance instance) {
cleanUpWithHarvesters(repoUrl, instance);
cleanUpTripleStore(repoUrl, OLD_GRAPH_PREFIX);
cleanUpTripleStore(repoUrl, TMP_GRAPH_PREFIX);
cleanUpIndexedMetadata(repoUrl, instance);
}

private void clearRepoAllInstances(String repoUrl) {
cleanUpWithHarvesters(repoUrl, Instance.PRIMARY);
cleanUpWithHarvesters(repoUrl, Instance.SECONDARY);
cleanUpTripleStore(repoUrl, OLD_GRAPH_PREFIX);
cleanUpTripleStore(repoUrl, TMP_GRAPH_PREFIX);
cleanUpTripleStore(repoUrl, OLD_GRAPH_PREFIX);
cleanUpTripleStore(repoUrl, ONLINE_GRAPH_PREFIX);
cleanUpIndexedMetadata(repoUrl, Instance.PRIMARY);
cleanUpIndexedMetadata(repoUrl, Instance.SECONDARY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.Builder;
import lombok.Data;
import lombok.experimental.FieldNameConstants;
import org.springframework.data.annotation.AccessType;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
Expand All @@ -18,14 +19,14 @@
import static org.springframework.data.elasticsearch.annotations.FieldType.Keyword;
import static org.springframework.data.elasticsearch.annotations.FieldType.Text;

@Document(indexName = "semantic-asset-metadata-3")
@Document(indexName = "semantic-asset-metadata-4")
@Setting(settingPath = "elasticsearch-settings.json")
@Data
@Builder(toBuilder = true)
@FieldNameConstants
public class SemanticAssetMetadata {

@Id
@Field(type = Keyword)
private String iri;
@Field(type = Keyword)
private SemanticAssetType type;
Expand Down Expand Up @@ -105,4 +106,10 @@ public class SemanticAssetMetadata {
//for the searchable content in multiple fields
@JsonIgnore
private String searchableText;

@Id
@AccessType(AccessType.Type.PROPERTY)
public String getElasticsearchId() {
return iri + '-' + instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,21 @@ private TermsQuery termsQuery(String field, Set<String> values) {
}

public Optional<SemanticAssetMetadata> findByIri(String iri) {
return Optional.ofNullable(esOps.get(iri, SemanticAssetMetadata.class));
List<Query> queries = new ArrayList<>();
queries.add(termQuery("iri", iri)._toQuery());
getConditionForInstances()
.map(QueryVariant::_toQuery)
.ifPresent(queries::add);
NativeQuery query = NativeQuery.builder()
.withQuery(BoolQuery.of(bq -> bq.must(queries))._toQuery())
.build();
try {
SearchHits<SemanticAssetMetadata> search = esOps.search(query, SemanticAssetMetadata.class);
return search.get().findFirst().map(SearchHit::getContent);
} catch (Exception e) {
log.error("Error while searching for asset with iri: {}", iri, e);
}
return Optional.empty();
}

public long deleteByRepoUrl(String repoUrl, Instance instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
@Repository
public class TripleStoreRepository {
private static final String DROP_SILENT_GRAPH_WITH_LOG_ENABLE_3 = "DEFINE sql:log-enable 3%nDROP SILENT GRAPH <%s>%n";
public static final String TMP_GRAPH_PREFIX = "tmp";
public static final String OLD_GRAPH_PREFIX = "old";
public static final String TMP_GRAPH_PREFIX = "old";
public static final String ONLINE_GRAPH_PREFIX = "";
private static final String RENAME_GRAPH = "DEFINE sql:log-enable 3%nMOVE SILENT GRAPH <%s> to <%s>%n";

Expand Down Expand Up @@ -86,17 +86,26 @@ public void clearExistingNamedGraph(String repoUrl, String prefix) {
public void save(String graphName, Model model) {
log.info("Saving model to Virtuoso");
try (RDFConnection connection = virtuosoClient.getConnection()) {
saveWithConnection(reworkRepoUrlIfNecessary(graphName, OLD_GRAPH_PREFIX), model, connection);
saveWithConnection(reworkRepoUrlIfNecessary(graphName, TMP_GRAPH_PREFIX), model, connection);
}
log.info("Model saved to Virtuoso");
}

public void switchInstances(it.gov.innovazione.ndc.model.harvester.Repository repository) {
String temp = reworkRepoUrlIfNecessary(repository.getUrl(), OLD_GRAPH_PREFIX);
String temp2 = reworkRepoUrlIfNecessary(repository.getUrl(), TMP_GRAPH_PREFIX);
rename(repository.getUrl(), temp2);
rename(temp, repository.getUrl());
rename(temp2, temp);
String tmpGraphName = reworkRepoUrlIfNecessary(repository.getUrl(), TMP_GRAPH_PREFIX);
String oldGraphName = reworkRepoUrlIfNecessary(repository.getUrl(), OLD_GRAPH_PREFIX);
clearExistingNamedGraph(repository.getUrl(), OLD_GRAPH_PREFIX);
rename(repository.getUrl(), oldGraphName);
rename(tmpGraphName, repository.getUrl());
}


public void rollbackInstance(it.gov.innovazione.ndc.model.harvester.Repository repository) {
String tmpGraphName = reworkRepoUrlIfNecessary(repository.getUrl(), TMP_GRAPH_PREFIX);
String oldGraphName = reworkRepoUrlIfNecessary(repository.getUrl(), OLD_GRAPH_PREFIX);
rename(repository.getUrl(), tmpGraphName);
rename(oldGraphName, repository.getUrl());
rename(tmpGraphName, oldGraphName);
}

public void rename(String oldGraph, String newGraph) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public Instance getNextOnlineInstance(Repository repository) {
return getCurrentInstance(repository).switchInstance();
}

public Instance getOldOnlineInstance(Repository repository) {
return getCurrentInstance(repository).switchInstance();
}

public Instance getCurrentInstance(Repository repository) {
Optional<Instance> instance = configService.fromRepo(ACTIVE_INSTANCE, repository.getId());
return instance.orElse(Instance.PRIMARY);
Expand All @@ -47,6 +51,13 @@ public void switchInstances(Repository repository) {
tripleStoreRepository.switchInstances(repository);
}

public void rollbackInstance(Repository repository) {
// rollback instance on Repositories
configService.writeConfigKey(ACTIVE_INSTANCE, "system", getOldOnlineInstance(repository), repository.getId());
// rollback instance on Virtuoso
tripleStoreRepository.rollbackInstance(repository);
}

@Override
public List<RepositoryInstance> getCurrentInstances() {
return repositoryService.getActiveRepos().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface InstanceManager {

void switchInstances(Repository repository);

void rollbackInstance(Repository repository);

List<RepositoryInstance> getCurrentInstances();

@Data
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application-local.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
harvester.repositories=https://github.com/FrankMaverick/inail-ndc,https://github.com/FrankMaverick/dati-semantic-cookiecutter,https://github.com/FrankMaverick/Leo-OpenData,https://github.com/FrankMaverick/ts-ontologie-vocabolari-controllati
harvester.repositories=https://github.com/FrankMaverick/Leo-OpenData

virtuoso.sparql=http://localhost:8890/sparql-auth
virtuoso.sparql-graph-store=http://localhost:8890/sparql-graph-crud-auth
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchPage;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
Expand All @@ -30,6 +31,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -50,10 +52,12 @@ class SemanticAssetMetadataRepositoryTest {

@Test
void shouldFindById() {
when(esOps.get("http://www.example.org/asset/1", SemanticAssetMetadata.class))
.thenReturn(SemanticAssetMetadata.builder()
.iri("http://www.example.org/asset/1")
.build());
SearchHit<SemanticAssetMetadata> searchHit = mock(SearchHit.class);
SearchHits<SemanticAssetMetadata> searchHits = mock(SearchHits.class);
when(searchHits.get()).thenReturn(Stream.of(searchHit));
when(searchHit.getContent()).thenReturn(SemanticAssetMetadata.builder().iri("http://www.example.org/asset/1").build());

when(esOps.search(any(Query.class), any(Class.class))).thenReturn(searchHits);

Optional<SemanticAssetMetadata> asset =
repository.findByIri("http://www.example.org/asset/1");
Expand All @@ -64,8 +68,8 @@ void shouldFindById() {

@Test
void shouldNotFindById() {
when(esOps.get("http://www.example.org/asset/1", SemanticAssetMetadata.class))
.thenReturn(null);
when(esOps.search(any(Query.class), any(Class.class))).thenReturn(null);


Optional<SemanticAssetMetadata> asset =
repository.findByIri("http://www.example.org/asset/1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@ExtendWith(MockitoExtension.class)
class TripleStoreRepositoryTest {
private static final String REPO_URL = "http://www.repos.org/reponame";
private static final String OLD_REPO_URL = "http://old.www.repos.org/reponame";
private static final String OLD_REPO_URL = "http://tmp.www.repos.org/reponame";

@Mock
RDFConnection connection;
Expand Down

0 comments on commit 45bdde0

Please sign in to comment.