Skip to content

Commit

Permalink
feat: add scraper for RSD aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
ewan-escience committed Nov 28, 2024
1 parent a1c0fe6 commit e5591de
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 7 deletions.
15 changes: 10 additions & 5 deletions database/024-remote-rsd.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ CREATE TABLE remote_rsd (
label VARCHAR (50) NOT NULL CHECK (LENGTH(label) >= 3),
domain VARCHAR(200) NOT NULL UNIQUE,
active BOOLEAN DEFAULT TRUE,
refresh_rate_min BIGINT DEFAULT 5 CHECK (refresh_rate_min >= 5),
refreshed_at TIMESTAMPTZ,
scrape_interval_minutes BIGINT DEFAULT 5 CHECK (scrape_interval_minutes >= 5),
scraped_at TIMESTAMPTZ,
last_err_msg VARCHAR(1000),
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
Expand Down Expand Up @@ -46,8 +46,12 @@ CREATE TRIGGER sanitise_update_remote_rsd BEFORE UPDATE ON remote_rsd
FOR EACH ROW EXECUTE PROCEDURE sanitise_update_remote_rsd();

-- RLS REMOTE_RSD
-- rsd-admin access only

ALTER TABLE remote_rsd ENABLE ROW LEVEL SECURITY;

CREATE POLICY anyone_can_read ON remote_rsd FOR SELECT TO rsd_web_anon, rsd_user
USING (TRUE);

CREATE POLICY admin_all_rights ON remote_rsd TO rsd_admin
USING (TRUE)
WITH CHECK (TRUE);
Expand All @@ -58,7 +62,8 @@ CREATE POLICY admin_all_rights ON remote_rsd TO rsd_admin
-- Results are returned from software_overview RPC from remote RSD and enriched with remote_rsd id
CREATE TABLE remote_software (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
remote_rsd UUID NOT NULL REFERENCES remote_rsd(id),
remote_rsd_id UUID NOT NULL REFERENCES remote_rsd(id),
remote_software_id UUID NOT NULL,
slug VARCHAR(200) NOT NULL CHECK (slug ~ '^[a-z0-9]+(-[a-z0-9]+)*$'),
is_published BOOLEAN DEFAULT FALSE NOT NULL,
brand_name VARCHAR(200) NOT NULL,
Expand All @@ -72,7 +77,7 @@ CREATE TABLE remote_software (
prog_lang TEXT[],
licenses VARCHAR[],
scraped_at TIMESTAMPTZ NOT NULL,
UNIQUE(remote_rsd, slug)
UNIQUE(remote_rsd_id, remote_software_id)
);

CREATE POLICY anyone_can_read ON remote_software FOR SELECT TO rsd_web_anon, rsd_user
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- SPDX-FileCopyrightText: 2024 Dusan Mijatovic (Netherlands eScience Center)
-- SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) <[email protected]>
-- SPDX-FileCopyrightText: 2024 Netherlands eScience Center
--
-- SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -41,7 +42,7 @@ SELECT
software_overview.licenses
FROM
software_overview()
UNION
UNION ALL
SELECT
remote_software.id,
remote_rsd.label AS source,
Expand All @@ -61,7 +62,7 @@ SELECT
FROM
remote_software
INNER JOIN
remote_rsd ON remote_rsd.id=remote_software.remote_rsd
remote_rsd ON remote_rsd.id = remote_software.remote_rsd_id
;
$$;

Expand Down
1 change: 1 addition & 0 deletions scrapers/jobs.cron
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
5-59/6 * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.git.MainContributors > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1
5-59/6 * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.doi.MainCitations > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1
2-59/6 * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.ror.MainRor > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1
* * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.aggregator.MainAggregator > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1
21 changes: 21 additions & 0 deletions scrapers/src/main/java/nl/esciencecenter/rsd/scraper/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,27 @@ public static String patchAsAdmin(String uri, String json, String... extraHeader
return response.body();
}

public static String deleteAsAdmin(String uri, String... extraHeaders) throws IOException, InterruptedException, RsdResponseException {
String jwtString = adminJwt();
HttpRequest.Builder builder = HttpRequest.newBuilder()
.method("DELETE", HttpRequest.BodyPublishers.noBody())
.uri(URI.create(uri))
.timeout(Duration.ofSeconds(30))
.header("Authorization", "Bearer " + jwtString);
if (extraHeaders != null && extraHeaders.length > 0) {
builder.headers(extraHeaders);
}
HttpRequest request = builder.build();
HttpResponse<String> response;
try (HttpClient client = HttpClient.newHttpClient()) {
response = client.send(request, HttpResponse.BodyHandlers.ofString());
}
if (response.statusCode() >= 300) {
throw new RsdResponseException(response.statusCode(), response.uri(), response.body(), "Error deleting data as admin");
}
return response.body();
}

private static String adminJwt() {
String signingSecret = Config.jwtSigningSecret();
Algorithm signingAlgorithm = Algorithm.HMAC256(signingSecret);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) <[email protected]>
// SPDX-FileCopyrightText: 2024 Netherlands eScience Center
//
// SPDX-License-Identifier: Apache-2.0

package nl.esciencecenter.rsd.scraper.aggregator;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import nl.esciencecenter.rsd.scraper.Config;
import nl.esciencecenter.rsd.scraper.RsdResponseException;
import nl.esciencecenter.rsd.scraper.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class MainAggregator {

private static final Logger LOGGER = LoggerFactory.getLogger(MainAggregator.class);
private static final URI BASE_URL = URI.create(Config.backendBaseUrl());
static final String REMOTE_SOFTWARE_TABLE_NAME = "remote_software";
static final String AGGREGATOR_SERVICE_NAME = "Aggregator";


public static void main(String[] args) {
LOGGER.info("Start aggregating RSDs");
long start = System.nanoTime();

ZonedDateTime now = ZonedDateTime.now();
Collection<RemoteRsdData> allRemoteEntries = PostgrestConnector.allActiveDomains(BASE_URL);

Collection<RemoteRsdData> remoteEntriesToScrape = allRemoteEntries.stream()
.filter(entry -> entry.refreshedAt() == null || entry.refreshedAt()
.plus(entry.refreshInterval())
// subtracting 10 seconds to take into account variations in when this scraper starts
.minus(Duration.ofSeconds(10L))
.isBefore(now))
.toList();

ConcurrentMap<UUID, JsonArray> softwarePerId = new ConcurrentHashMap<>(remoteEntriesToScrape.size());
Collection<Callable<Void>> tasks = remoteEntriesToScrape.stream()
.<Callable<Void>>map(entry -> () -> {
JsonArray scrapedSoftware = RemoteRsdConnector.getAllSoftware(entry.domain());
softwarePerId.put(entry.id(), scrapedSoftware);
return null;
})
.toList();

try (ExecutorService executorService = Executors.newFixedThreadPool(8)) {
List<Future<Void>> completedFutures = executorService.invokeAll(tasks);
for (Future<Void> completedFuture : completedFutures) {
try {
completedFuture.get();
} catch (ExecutionException e) {
LOGGER.error("Unknown error", e);
Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, null, e);
} catch (InterruptedException e) {
Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, null, e);
LOGGER.error("Got interrupted, early exiting aggregating RSDs", e);
Thread.currentThread().interrupt();
return;
}
}
} catch (InterruptedException e) {
Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, null, e);
LOGGER.error("Got interrupted, early exiting aggregating RSDs", e);
Thread.currentThread().interrupt();
return;
}

JsonArray allSoftware = new JsonArray();
for (Map.Entry<UUID, JsonArray> entry : softwarePerId.entrySet()) {
JsonArray softwareArray = entry.getValue();
UUID id = entry.getKey();
for (JsonElement jsonElement : softwareArray) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
jsonObject.addProperty("remote_rsd_id", id.toString());
jsonObject.addProperty("remote_software_id", jsonObject.getAsJsonPrimitive("id").getAsString());
jsonObject.remove("id");
jsonObject
.addProperty("scraped_at", now.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));

allSoftware.add(jsonElement);
}
}

PostgrestConnector.saveRemoteSoftware(BASE_URL, allSoftware);

for (UUID id : softwarePerId.keySet()) {
PostgrestConnector.updateRefreshedTimeAndErrorMessage(BASE_URL, id, now, null);
}

Map<UUID, Collection<UUID>> softwareScrapedPerRemoteId = new HashMap<>();
for (JsonElement jsonElement : allSoftware) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
UUID remoteRsdId = UUID.fromString(jsonObject.getAsJsonPrimitive("remote_rsd_id").getAsString());
UUID remoteSoftwareId = UUID.fromString(jsonObject.getAsJsonPrimitive("remote_software_id").getAsString());
Collection<UUID> softwareOfRemote = softwareScrapedPerRemoteId.computeIfAbsent(remoteRsdId, id -> new HashSet<>());
softwareOfRemote.add(remoteSoftwareId);
}

for (RemoteRsdData remoteRsdData : remoteEntriesToScrape) {
UUID remoteId = remoteRsdData.id();
if (!softwareScrapedPerRemoteId.containsKey(remoteId)) {
continue;
}

Collection<UUID> scrapedSoftware = softwareScrapedPerRemoteId.get(remoteId);
Collection<UUID> previouslyStoredSoftwareIds = remoteRsdData.softwareIds();

for (UUID previouslyStoredSoftwareId : previouslyStoredSoftwareIds) {
if (!scrapedSoftware.contains(previouslyStoredSoftwareId)) {
try {
PostgrestConnector.deleteSoftware(BASE_URL, remoteId, previouslyStoredSoftwareId);
} catch (RsdResponseException e) {
LOGGER.error("Unknown error", e);
Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, previouslyStoredSoftwareId, e);
} catch (IOException e) {
LOGGER.error("Unknown error", e);
Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, previouslyStoredSoftwareId, e);
} catch (InterruptedException e) {
LOGGER.error("Got interrupted, early exiting deleting old entries", e);
Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, previouslyStoredSoftwareId, e);
Thread.currentThread().interrupt();
return;
}
}
}
}

for (RemoteRsdData entry : remoteEntriesToScrape) {
UUID id = entry.id();
if (!softwarePerId.containsKey(id)) {
PostgrestConnector.updateRefreshedTimeAndErrorMessage(BASE_URL, id, now, "Unknown error while scraping");
}
}

long stop = System.nanoTime();
LOGGER.info("Done aggregating RSDs ({} ms)", (stop - start) / 1000_000L);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) <[email protected]>
// SPDX-FileCopyrightText: 2024 Netherlands eScience Center
//
// SPDX-License-Identifier: Apache-2.0

package nl.esciencecenter.rsd.scraper.aggregator;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import nl.esciencecenter.rsd.scraper.RsdResponseException;
import nl.esciencecenter.rsd.scraper.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;

public class PostgrestConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgrestConnector.class);

private PostgrestConnector() {
}

public static Collection<RemoteRsdData> allActiveDomains(URI baseUrl) {
String filter = "select=id,domain,scrape_interval_minutes,scraped_at,remote_software(remote_software_id)&active=is.true";
String url = baseUrl.toString() + "/remote_rsd?" + filter;

String response = Utils.getAsAdmin(url);

return parseDomainsToScrapeResponse(response);
}

static List<RemoteRsdData> parseDomainsToScrapeResponse(String response) {
JsonArray jsonTree = JsonParser.parseString(response).getAsJsonArray();
List<RemoteRsdData> result = new ArrayList<>(jsonTree.size());

for (JsonElement element : jsonTree) {
try {
JsonObject jsonObject = element.getAsJsonObject();
UUID id = UUID.fromString(jsonObject.getAsJsonPrimitive("id").getAsString());
URI domain = URI.create(jsonObject.getAsJsonPrimitive("domain").getAsString());
Duration refreshInterval = Duration.ofMinutes(jsonObject.getAsJsonPrimitive("scrape_interval_minutes")
.getAsLong());
JsonElement refreshedAtElement = jsonObject.get("scraped_at");
ZonedDateTime refreshedAt = refreshedAtElement.isJsonNull() ? null : ZonedDateTime.parse(refreshedAtElement.getAsString());

Collection<UUID> softwareIds = new HashSet<>();
JsonArray idsArray = jsonObject.getAsJsonArray("remote_software");
for (JsonElement jsonElement : idsArray) {
UUID softwareId = UUID.fromString(jsonElement.getAsJsonObject()
.getAsJsonPrimitive("remote_software_id")
.getAsString());
softwareIds.add(softwareId);
}

result.add(new RemoteRsdData(
id,
domain,
refreshInterval,
refreshedAt,
softwareIds
));
} catch (RuntimeException e) {
LOGGER.error("Exception when parsing item", e);
Utils.saveExceptionInDatabase(MainAggregator.AGGREGATOR_SERVICE_NAME, MainAggregator.REMOTE_SOFTWARE_TABLE_NAME, null, e);
}
}

return result;
}

public static void deleteSoftware(URI baseUrl, UUID remoteRsdId, UUID remoteSoftwareId) throws RsdResponseException, IOException, InterruptedException {
String filter = "remote_rsd_id=eq." + remoteRsdId + "&remote_software_id=eq." + remoteSoftwareId;
String url = baseUrl.toString() + "/remote_software?" + filter;

Utils.deleteAsAdmin(url);
}

public static void saveRemoteSoftware(URI baseUrl, JsonArray softwareArray) {
String url = baseUrl + "/remote_software?on_conflict=remote_rsd_id,remote_software_id";

Utils.postAsAdmin(url, softwareArray.toString(), "Prefer", "resolution=merge-duplicates");
}

public static void updateRefreshedTimeAndErrorMessage(URI baseUrl, UUID id, ZonedDateTime refreshedAt, String errorMessage) {
String filter = "id=eq." + id;
String url = baseUrl.toString() + "/remote_rsd?" + filter;

JsonObject body = new JsonObject();
body.addProperty("scraped_at", refreshedAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
body.addProperty("last_err_msg", errorMessage);

Utils.patchAsAdmin(url, body.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) <[email protected]>
// SPDX-FileCopyrightText: 2024 Netherlands eScience Center
//
// SPDX-License-Identifier: Apache-2.0

package nl.esciencecenter.rsd.scraper.aggregator;

import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
import nl.esciencecenter.rsd.scraper.RsdResponseException;
import nl.esciencecenter.rsd.scraper.Utils;

import java.io.IOException;
import java.net.URI;

public class RemoteRsdConnector {

private RemoteRsdConnector() {
}

public static JsonArray getAllSoftware(URI remoteDomain) throws RsdResponseException, IOException, InterruptedException {
String path = "/api/v1/rpc/software_overview";
String url = remoteDomain.toString() + path;

String response = Utils.get(url);
return JsonParser.parseString(response).getAsJsonArray();
}
}
Loading

0 comments on commit e5591de

Please sign in to comment.