From e5591dea341472e30eb8edc150122605c04f5dad Mon Sep 17 00:00:00 2001 From: Ewan Cahen Date: Tue, 26 Nov 2024 16:12:57 +0100 Subject: [PATCH] feat: add scraper for RSD aggregator --- database/024-remote-rsd.sql | 15 +- ...nity-views.sql => 108-community-views.sql} | 0 ....sql => 124-aggregated-software-views.sql} | 5 +- scrapers/jobs.cron | 1 + .../nl/esciencecenter/rsd/scraper/Utils.java | 21 +++ .../scraper/aggregator/MainAggregator.java | 161 ++++++++++++++++++ .../aggregator/PostgrestConnector.java | 106 ++++++++++++ .../aggregator/RemoteRsdConnector.java | 28 +++ .../rsd/scraper/aggregator/RemoteRsdData.java | 21 +++ 9 files changed, 351 insertions(+), 7 deletions(-) rename database/{124-community-views.sql => 108-community-views.sql} (100%) rename database/{105-aggregated-software-views.sql => 124-aggregated-software-views.sql} (95%) create mode 100644 scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/MainAggregator.java create mode 100644 scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/PostgrestConnector.java create mode 100644 scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdConnector.java create mode 100644 scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdData.java diff --git a/database/024-remote-rsd.sql b/database/024-remote-rsd.sql index b5df6a6a4..4af4a9d88 100644 --- a/database/024-remote-rsd.sql +++ b/database/024-remote-rsd.sql @@ -11,8 +11,8 @@ CREATE TABLE remote_rsd ( label VARCHAR (50) NOT NULL CHECK (LENGTH(label) >= 3), domain VARCHAR(200) NOT NULL UNIQUE, active BOOLEAN DEFAULT TRUE, - refresh_rate_min BIGINT DEFAULT 5 CHECK (refresh_rate_min >= 5), - refreshed_at TIMESTAMPTZ, + scrape_interval_minutes BIGINT DEFAULT 5 CHECK (scrape_interval_minutes >= 5), + scraped_at TIMESTAMPTZ, last_err_msg VARCHAR(1000), created_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL @@ -46,8 +46,12 @@ CREATE TRIGGER sanitise_update_remote_rsd BEFORE UPDATE ON remote_rsd FOR EACH ROW EXECUTE PROCEDURE sanitise_update_remote_rsd(); -- RLS REMOTE_RSD --- rsd-admin access only + ALTER TABLE remote_rsd ENABLE ROW LEVEL SECURITY; + +CREATE POLICY anyone_can_read ON remote_rsd FOR SELECT TO rsd_web_anon, rsd_user + USING (TRUE); + CREATE POLICY admin_all_rights ON remote_rsd TO rsd_admin USING (TRUE) WITH CHECK (TRUE); @@ -58,7 +62,8 @@ CREATE POLICY admin_all_rights ON remote_rsd TO rsd_admin -- Results are returned from software_overview RPC from remote RSD and enriched with remote_rsd id CREATE TABLE remote_software ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, - remote_rsd UUID NOT NULL REFERENCES remote_rsd(id), + remote_rsd_id UUID NOT NULL REFERENCES remote_rsd(id), + remote_software_id UUID NOT NULL, slug VARCHAR(200) NOT NULL CHECK (slug ~ '^[a-z0-9]+(-[a-z0-9]+)*$'), is_published BOOLEAN DEFAULT FALSE NOT NULL, brand_name VARCHAR(200) NOT NULL, @@ -72,7 +77,7 @@ CREATE TABLE remote_software ( prog_lang TEXT[], licenses VARCHAR[], scraped_at TIMESTAMPTZ NOT NULL, - UNIQUE(remote_rsd, slug) + UNIQUE(remote_rsd_id, remote_software_id) ); CREATE POLICY anyone_can_read ON remote_software FOR SELECT TO rsd_web_anon, rsd_user diff --git a/database/124-community-views.sql b/database/108-community-views.sql similarity index 100% rename from database/124-community-views.sql rename to database/108-community-views.sql diff --git a/database/105-aggregated-software-views.sql b/database/124-aggregated-software-views.sql similarity index 95% rename from database/105-aggregated-software-views.sql rename to database/124-aggregated-software-views.sql index 85d1ed6fe..bc23f2783 100644 --- a/database/105-aggregated-software-views.sql +++ b/database/124-aggregated-software-views.sql @@ -1,4 +1,5 @@ -- SPDX-FileCopyrightText: 2024 Dusan Mijatovic (Netherlands eScience Center) +-- SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) -- SPDX-FileCopyrightText: 2024 Netherlands eScience Center -- -- SPDX-License-Identifier: Apache-2.0 @@ -41,7 +42,7 @@ SELECT software_overview.licenses FROM software_overview() -UNION +UNION ALL SELECT remote_software.id, remote_rsd.label AS source, @@ -61,7 +62,7 @@ SELECT FROM remote_software INNER JOIN - remote_rsd ON remote_rsd.id=remote_software.remote_rsd + remote_rsd ON remote_rsd.id = remote_software.remote_rsd_id ; $$; diff --git a/scrapers/jobs.cron b/scrapers/jobs.cron index 195dc2316..0ade904a8 100644 --- a/scrapers/jobs.cron +++ b/scrapers/jobs.cron @@ -11,3 +11,4 @@ 5-59/6 * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.git.MainContributors > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1 5-59/6 * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.doi.MainCitations > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1 2-59/6 * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.ror.MainRor > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1 +* * * * * /opt/java/openjdk/bin/java -cp /usr/myjava/scrapers.jar nl.esciencecenter.rsd.scraper.aggregator.MainAggregator > /proc/$(cat /var/run/crond.pid)/fd/1 2>&1 diff --git a/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/Utils.java b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/Utils.java index 5124dbbfb..23c949cce 100644 --- a/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/Utils.java +++ b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/Utils.java @@ -309,6 +309,27 @@ public static String patchAsAdmin(String uri, String json, String... extraHeader return response.body(); } + public static String deleteAsAdmin(String uri, String... extraHeaders) throws IOException, InterruptedException, RsdResponseException { + String jwtString = adminJwt(); + HttpRequest.Builder builder = HttpRequest.newBuilder() + .method("DELETE", HttpRequest.BodyPublishers.noBody()) + .uri(URI.create(uri)) + .timeout(Duration.ofSeconds(30)) + .header("Authorization", "Bearer " + jwtString); + if (extraHeaders != null && extraHeaders.length > 0) { + builder.headers(extraHeaders); + } + HttpRequest request = builder.build(); + HttpResponse response; + try (HttpClient client = HttpClient.newHttpClient()) { + response = client.send(request, HttpResponse.BodyHandlers.ofString()); + } + if (response.statusCode() >= 300) { + throw new RsdResponseException(response.statusCode(), response.uri(), response.body(), "Error deleting data as admin"); + } + return response.body(); + } + private static String adminJwt() { String signingSecret = Config.jwtSigningSecret(); Algorithm signingAlgorithm = Algorithm.HMAC256(signingSecret); diff --git a/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/MainAggregator.java b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/MainAggregator.java new file mode 100644 index 000000000..65735f275 --- /dev/null +++ b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/MainAggregator.java @@ -0,0 +1,161 @@ +// SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) +// SPDX-FileCopyrightText: 2024 Netherlands eScience Center +// +// SPDX-License-Identifier: Apache-2.0 + +package nl.esciencecenter.rsd.scraper.aggregator; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import nl.esciencecenter.rsd.scraper.Config; +import nl.esciencecenter.rsd.scraper.RsdResponseException; +import nl.esciencecenter.rsd.scraper.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class MainAggregator { + + private static final Logger LOGGER = LoggerFactory.getLogger(MainAggregator.class); + private static final URI BASE_URL = URI.create(Config.backendBaseUrl()); + static final String REMOTE_SOFTWARE_TABLE_NAME = "remote_software"; + static final String AGGREGATOR_SERVICE_NAME = "Aggregator"; + + + public static void main(String[] args) { + LOGGER.info("Start aggregating RSDs"); + long start = System.nanoTime(); + + ZonedDateTime now = ZonedDateTime.now(); + Collection allRemoteEntries = PostgrestConnector.allActiveDomains(BASE_URL); + + Collection remoteEntriesToScrape = allRemoteEntries.stream() + .filter(entry -> entry.refreshedAt() == null || entry.refreshedAt() + .plus(entry.refreshInterval()) + // subtracting 10 seconds to take into account variations in when this scraper starts + .minus(Duration.ofSeconds(10L)) + .isBefore(now)) + .toList(); + + ConcurrentMap softwarePerId = new ConcurrentHashMap<>(remoteEntriesToScrape.size()); + Collection> tasks = remoteEntriesToScrape.stream() + .>map(entry -> () -> { + JsonArray scrapedSoftware = RemoteRsdConnector.getAllSoftware(entry.domain()); + softwarePerId.put(entry.id(), scrapedSoftware); + return null; + }) + .toList(); + + try (ExecutorService executorService = Executors.newFixedThreadPool(8)) { + List> completedFutures = executorService.invokeAll(tasks); + for (Future completedFuture : completedFutures) { + try { + completedFuture.get(); + } catch (ExecutionException e) { + LOGGER.error("Unknown error", e); + Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, null, e); + } catch (InterruptedException e) { + Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, null, e); + LOGGER.error("Got interrupted, early exiting aggregating RSDs", e); + Thread.currentThread().interrupt(); + return; + } + } + } catch (InterruptedException e) { + Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, null, e); + LOGGER.error("Got interrupted, early exiting aggregating RSDs", e); + Thread.currentThread().interrupt(); + return; + } + + JsonArray allSoftware = new JsonArray(); + for (Map.Entry entry : softwarePerId.entrySet()) { + JsonArray softwareArray = entry.getValue(); + UUID id = entry.getKey(); + for (JsonElement jsonElement : softwareArray) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + jsonObject.addProperty("remote_rsd_id", id.toString()); + jsonObject.addProperty("remote_software_id", jsonObject.getAsJsonPrimitive("id").getAsString()); + jsonObject.remove("id"); + jsonObject + .addProperty("scraped_at", now.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); + + allSoftware.add(jsonElement); + } + } + + PostgrestConnector.saveRemoteSoftware(BASE_URL, allSoftware); + + for (UUID id : softwarePerId.keySet()) { + PostgrestConnector.updateRefreshedTimeAndErrorMessage(BASE_URL, id, now, null); + } + + Map> softwareScrapedPerRemoteId = new HashMap<>(); + for (JsonElement jsonElement : allSoftware) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + UUID remoteRsdId = UUID.fromString(jsonObject.getAsJsonPrimitive("remote_rsd_id").getAsString()); + UUID remoteSoftwareId = UUID.fromString(jsonObject.getAsJsonPrimitive("remote_software_id").getAsString()); + Collection softwareOfRemote = softwareScrapedPerRemoteId.computeIfAbsent(remoteRsdId, id -> new HashSet<>()); + softwareOfRemote.add(remoteSoftwareId); + } + + for (RemoteRsdData remoteRsdData : remoteEntriesToScrape) { + UUID remoteId = remoteRsdData.id(); + if (!softwareScrapedPerRemoteId.containsKey(remoteId)) { + continue; + } + + Collection scrapedSoftware = softwareScrapedPerRemoteId.get(remoteId); + Collection previouslyStoredSoftwareIds = remoteRsdData.softwareIds(); + + for (UUID previouslyStoredSoftwareId : previouslyStoredSoftwareIds) { + if (!scrapedSoftware.contains(previouslyStoredSoftwareId)) { + try { + PostgrestConnector.deleteSoftware(BASE_URL, remoteId, previouslyStoredSoftwareId); + } catch (RsdResponseException e) { + LOGGER.error("Unknown error", e); + Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, previouslyStoredSoftwareId, e); + } catch (IOException e) { + LOGGER.error("Unknown error", e); + Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, previouslyStoredSoftwareId, e); + } catch (InterruptedException e) { + LOGGER.error("Got interrupted, early exiting deleting old entries", e); + Utils.saveExceptionInDatabase(AGGREGATOR_SERVICE_NAME, REMOTE_SOFTWARE_TABLE_NAME, previouslyStoredSoftwareId, e); + Thread.currentThread().interrupt(); + return; + } + } + } + } + + for (RemoteRsdData entry : remoteEntriesToScrape) { + UUID id = entry.id(); + if (!softwarePerId.containsKey(id)) { + PostgrestConnector.updateRefreshedTimeAndErrorMessage(BASE_URL, id, now, "Unknown error while scraping"); + } + } + + long stop = System.nanoTime(); + LOGGER.info("Done aggregating RSDs ({} ms)", (stop - start) / 1000_000L); + } + +} diff --git a/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/PostgrestConnector.java b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/PostgrestConnector.java new file mode 100644 index 000000000..f068d1080 --- /dev/null +++ b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/PostgrestConnector.java @@ -0,0 +1,106 @@ +// SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) +// SPDX-FileCopyrightText: 2024 Netherlands eScience Center +// +// SPDX-License-Identifier: Apache-2.0 + +package nl.esciencecenter.rsd.scraper.aggregator; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import nl.esciencecenter.rsd.scraper.RsdResponseException; +import nl.esciencecenter.rsd.scraper.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; + +public class PostgrestConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgrestConnector.class); + + private PostgrestConnector() { + } + + public static Collection allActiveDomains(URI baseUrl) { + String filter = "select=id,domain,scrape_interval_minutes,scraped_at,remote_software(remote_software_id)&active=is.true"; + String url = baseUrl.toString() + "/remote_rsd?" + filter; + + String response = Utils.getAsAdmin(url); + + return parseDomainsToScrapeResponse(response); + } + + static List parseDomainsToScrapeResponse(String response) { + JsonArray jsonTree = JsonParser.parseString(response).getAsJsonArray(); + List result = new ArrayList<>(jsonTree.size()); + + for (JsonElement element : jsonTree) { + try { + JsonObject jsonObject = element.getAsJsonObject(); + UUID id = UUID.fromString(jsonObject.getAsJsonPrimitive("id").getAsString()); + URI domain = URI.create(jsonObject.getAsJsonPrimitive("domain").getAsString()); + Duration refreshInterval = Duration.ofMinutes(jsonObject.getAsJsonPrimitive("scrape_interval_minutes") + .getAsLong()); + JsonElement refreshedAtElement = jsonObject.get("scraped_at"); + ZonedDateTime refreshedAt = refreshedAtElement.isJsonNull() ? null : ZonedDateTime.parse(refreshedAtElement.getAsString()); + + Collection softwareIds = new HashSet<>(); + JsonArray idsArray = jsonObject.getAsJsonArray("remote_software"); + for (JsonElement jsonElement : idsArray) { + UUID softwareId = UUID.fromString(jsonElement.getAsJsonObject() + .getAsJsonPrimitive("remote_software_id") + .getAsString()); + softwareIds.add(softwareId); + } + + result.add(new RemoteRsdData( + id, + domain, + refreshInterval, + refreshedAt, + softwareIds + )); + } catch (RuntimeException e) { + LOGGER.error("Exception when parsing item", e); + Utils.saveExceptionInDatabase(MainAggregator.AGGREGATOR_SERVICE_NAME, MainAggregator.REMOTE_SOFTWARE_TABLE_NAME, null, e); + } + } + + return result; + } + + public static void deleteSoftware(URI baseUrl, UUID remoteRsdId, UUID remoteSoftwareId) throws RsdResponseException, IOException, InterruptedException { + String filter = "remote_rsd_id=eq." + remoteRsdId + "&remote_software_id=eq." + remoteSoftwareId; + String url = baseUrl.toString() + "/remote_software?" + filter; + + Utils.deleteAsAdmin(url); + } + + public static void saveRemoteSoftware(URI baseUrl, JsonArray softwareArray) { + String url = baseUrl + "/remote_software?on_conflict=remote_rsd_id,remote_software_id"; + + Utils.postAsAdmin(url, softwareArray.toString(), "Prefer", "resolution=merge-duplicates"); + } + + public static void updateRefreshedTimeAndErrorMessage(URI baseUrl, UUID id, ZonedDateTime refreshedAt, String errorMessage) { + String filter = "id=eq." + id; + String url = baseUrl.toString() + "/remote_rsd?" + filter; + + JsonObject body = new JsonObject(); + body.addProperty("scraped_at", refreshedAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); + body.addProperty("last_err_msg", errorMessage); + + Utils.patchAsAdmin(url, body.toString()); + } +} diff --git a/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdConnector.java b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdConnector.java new file mode 100644 index 000000000..bb172fb11 --- /dev/null +++ b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdConnector.java @@ -0,0 +1,28 @@ +// SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) +// SPDX-FileCopyrightText: 2024 Netherlands eScience Center +// +// SPDX-License-Identifier: Apache-2.0 + +package nl.esciencecenter.rsd.scraper.aggregator; + +import com.google.gson.JsonArray; +import com.google.gson.JsonParser; +import nl.esciencecenter.rsd.scraper.RsdResponseException; +import nl.esciencecenter.rsd.scraper.Utils; + +import java.io.IOException; +import java.net.URI; + +public class RemoteRsdConnector { + + private RemoteRsdConnector() { + } + + public static JsonArray getAllSoftware(URI remoteDomain) throws RsdResponseException, IOException, InterruptedException { + String path = "/api/v1/rpc/software_overview"; + String url = remoteDomain.toString() + path; + + String response = Utils.get(url); + return JsonParser.parseString(response).getAsJsonArray(); + } +} diff --git a/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdData.java b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdData.java new file mode 100644 index 000000000..fc3a1f00b --- /dev/null +++ b/scrapers/src/main/java/nl/esciencecenter/rsd/scraper/aggregator/RemoteRsdData.java @@ -0,0 +1,21 @@ +// SPDX-FileCopyrightText: 2024 Ewan Cahen (Netherlands eScience Center) +// SPDX-FileCopyrightText: 2024 Netherlands eScience Center +// +// SPDX-License-Identifier: Apache-2.0 + +package nl.esciencecenter.rsd.scraper.aggregator; + +import java.net.URI; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.Collection; +import java.util.UUID; + +public record RemoteRsdData( + UUID id, + URI domain, + Duration refreshInterval, + ZonedDateTime refreshedAt, + Collection softwareIds +) { +}