Skip to content

Commit

Permalink
Merge pull request #948 from mediathekview/feature/ratelimiter
Browse files Browse the repository at this point in the history
Feature/ratelimiter
  • Loading branch information
codingPF authored Nov 16, 2023
2 parents b1af62d + 8a0a937 commit f35a682
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 83 deletions.
26 changes: 18 additions & 8 deletions MServer-Config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@ maximumRequestsPerSecond: 999.0

# If set only these Sender will be crawled all other will be ignored.
senderIncluded:
#- MDR
#- NDR
#- ARD
#- ARTE_DE
#- ARGE_FR
#- ARTE_EN
#- ARTE_PL
#- ARTE_IT
#- ARTE_ES
#- 3SAT
#- FUNK
#- KIKA
- DW
#- BR
#- DW
#- ORF
#- PHOENIX
#- SRF
- SR
#- ZDF

# If set the server will be awake after the crawler run and restarts the run after the given amount.
#schedules:
Expand Down Expand Up @@ -111,14 +121,14 @@ checkImportListUrlTimeoutInSec: 1800

#### Default crawler configurations ####
# The maximum amount of URLs to be processed per task.
maximumUrlsPerTask: 50
maximumUrlsPerTask: 10

# The maximum duration in minutes a crawler may run.
maximumCrawlDurationInMinutes: 120

# Enables the topics search
# maximumSubpages limits the depth of the topics search
topicsSearchEnabled: false
topicsSearchEnabled: true

# The maximum amount of sub pages to be crawled.<br>
# Example: If a Sendung overview side has 10 pages with videos for this Sendung and
Expand Down Expand Up @@ -164,14 +174,14 @@ senderConfigurations:
KIKA:
maximumSubpages: 2
maximumRequestsPerSecond: 8.0
SR:
maximumRequestsPerSecond: 2.0
ZDF:
maximumRequestsPerSecond: 10.0
FUNK:
maximumUrlsPerTask: 99
DW:
maximumSubpages: 0
SR:
maximumSubpages: 5

# configure string variables
crawlerApiParams:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.mediathekview.mserver.base.webaccess;

import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
Expand All @@ -22,12 +23,13 @@ public class JsoupConnection {
private static final String FILE_TYPE_M3U8 = "m3u8";
protected OkHttpClient client;

public JsoupConnection(final int timeout) {
public JsoupConnection(final int timeout, final int threadPoolSize) {
client =
new OkHttpClient.Builder()
.connectTimeout(timeout, TimeUnit.SECONDS)
.readTimeout(timeout, TimeUnit.SECONDS)
.callTimeout(timeout, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(threadPoolSize, 5L, TimeUnit.MINUTES))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package de.mediathekview.mserver.crawler.arte.tasks;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
Expand All @@ -19,7 +18,6 @@

public abstract class ArteTaskBase<T, D extends CrawlerUrlDTO> extends AbstractRestTask<T, D> {
private static final Logger LOG = LogManager.getLogger(ArteTaskBase.class);
private static RateLimiter limiter = null;
private final transient GsonBuilder gsonBuilder;

protected ArteTaskBase(
Expand Down Expand Up @@ -106,11 +104,6 @@ private Response executeRequest(final WebTarget aTarget) {
if (authKey.isPresent()) {
request = request.header(HEADER_AUTHORIZATION, authKey.get());
}

if (limiter == null) {
limiter = RateLimiter.create(crawler.getCrawlerConfig().getMaximumRequestsPerSecond());
}
limiter.acquire();
return request
.header(HEADER_ACCEPT_ENCODING, ENCODING_GZIP)
.header(HEADER_ACCEPT, APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.logging.log4j.Logger;
import org.jsoup.nodes.Document;

import com.google.common.util.concurrent.RateLimiter;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
Expand All @@ -41,6 +43,7 @@ public abstract class AbstractCrawler implements Callable<Set<Film>> {
protected Set<Film> films;
private LocalDateTime startTime;
protected JsoupConnection jsoupConnection;
protected RateLimiter rateLimiter;

protected AbstractCrawler(
final ForkJoinPool aForkJoinPool,
Expand All @@ -58,8 +61,11 @@ protected AbstractCrawler(

runtimeConfig = rootConfig.getConfig();
crawlerConfig = rootConfig.getSenderConfig(getSender());
jsoupConnection = new JsoupConnection(crawlerConfig.getSocketTimeoutInSeconds());

jsoupConnection = new JsoupConnection(
rootConfig.getSenderConfig(getSender()).getSocketTimeoutInSeconds(),
runtimeConfig.getMaximumCpuThreads());
rateLimiter = RateLimiter.create(rootConfig.getSenderConfig(getSender()).getMaximumRequestsPerSecond());

films = ConcurrentHashMap.newKeySet();
}

Expand Down Expand Up @@ -137,6 +143,14 @@ public JsoupConnection getConnection() {
public void setConnection(JsoupConnection connection) {
jsoupConnection = connection;
}

public RateLimiter getRateLimiter() {
return rateLimiter;
}

public void setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

/**
* Request an url and receive the body as String
Expand All @@ -145,6 +159,7 @@ public void setConnection(JsoupConnection connection) {
* @throws IOException
*/
public String requestBodyAsString(String url) throws IOException {
getRateLimiter().acquire();
return getConnection().requestBodyAsString(url);
}

Expand All @@ -155,6 +170,7 @@ public String requestBodyAsString(String url) throws IOException {
* @throws IOException
*/
public Document requestBodyAsHtmlDocument(String url) throws IOException {
getRateLimiter().acquire();
return getConnection().requestBodyAsHtmlDocument(url);
}

Expand All @@ -165,6 +181,7 @@ public Document requestBodyAsHtmlDocument(String url) throws IOException {
* @throws IOException
*/
public Document requestBodyAsXmlDocument(String url) throws IOException {
getRateLimiter().acquire();
return getConnection().requestBodyAsXmlDocument(url);
}

Expand All @@ -176,6 +193,7 @@ public Document requestBodyAsXmlDocument(String url) throws IOException {
* @return size of the response in KB or -1 in case we could not determine the size.
*/
public long determineFileSizeInKB(String url) {
getRateLimiter().acquire();
return getConnection().determineFileSize(url) / 1024;
}

Expand All @@ -185,6 +203,7 @@ public long determineFileSizeInKB(String url) {
* @return return true if the request was successfully processed by the server
*/
public boolean requestUrlExists(String url) {
getRateLimiter().acquire();
return getConnection().requestUrlExists(url);
}
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package de.mediathekview.mserver.crawler.basic;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

Expand All @@ -23,7 +22,6 @@ public abstract class AbstractJsonRestTask<T, R, D extends CrawlerUrlDTO>
protected static final String ENCODING_GZIP = "gzip";
private static final long serialVersionUID = -1090560363478964885L;
protected final transient GsonBuilder gsonBuilder;
private static RateLimiter limiter = null;

protected AbstractJsonRestTask(
final AbstractCrawler crawler,
Expand Down Expand Up @@ -63,10 +61,6 @@ protected void processRestTarget(final D aDTO, final WebTarget aTarget) {
}

protected Response createResponse(final Builder request, final D aDTO) {
if (limiter == null) {
limiter = RateLimiter.create(crawler.getCrawlerConfig().getMaximumRequestsPerSecond());
}
limiter.acquire();
request.header(ACCEPT_CHARSET, StandardCharsets.UTF_8);
return request.header(ACCEPT_ENCODING, ENCODING_GZIP).header("User-Agent", "Mozilla").get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected void processElement(final D aDTO) {
* @return the {@link WebTarget} to access the url.
*/
protected WebTarget createWebTarget(final String aUrl) {
crawler.getRateLimiter().acquire();
return client.target(aUrl);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package de.mediathekview.mserver.crawler.dw;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import de.mediathekview.mlib.daten.Sender;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractRestTask;
import de.mediathekview.mserver.crawler.basic.CrawlerUrlDTO;
Expand All @@ -23,9 +21,6 @@
@SuppressWarnings("serial")
public abstract class DWTaskBase<T, D extends CrawlerUrlDTO> extends AbstractRestTask<T, D> {
private static final Logger LOG = LogManager.getLogger(DWTaskBase.class);

private static RateLimiter limiter = null;

private final transient GsonBuilder gsonBuilder;

protected DWTaskBase(
Expand Down Expand Up @@ -78,10 +73,6 @@ private Response executeRequest(final WebTarget aTarget) {
request.header(
ZdfConstants.HEADER_AUTHENTIFICATION, AUTHORIZATION_BEARER + authKey.get());
}
if (limiter == null) {
limiter = RateLimiter.create(crawler.getRuntimeConfig().getSenderConfig(Sender.DW).getMaximumRequestsPerSecond());
}
limiter.acquire();
return request.header(HEADER_ACCEPT_ENCODING, ENCODING_GZIP).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,14 @@ protected void postProcessing(KikaApiVideoInfoDto aResponseObj, KikaApiFilmDto a
aFilm.setUrls(getVideoUrls(aResponseObj, aDTO));
aFilm.addAllSubtitleUrls(getSubtitle(aResponseObj, aDTO));
//
taskResults.add(aFilm);
crawler.incrementAndGetActualCount();


if (!taskResults.add(aFilm)) {
LOG.debug("Rejected duplicate {}",aFilm);
crawler.incrementAndGetErrorCount();
} else {
crawler.incrementAndGetActualCount();
}
crawler.updateProgress();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import de.mediathekview.mserver.crawler.ard.json.ArdVideoInfoDto;
import de.mediathekview.mserver.crawler.ard.json.ArdVideoInfoJsonDeserializer;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractDocumentTask;
import de.mediathekview.mserver.crawler.basic.AbstractUrlTask;
import de.mediathekview.mserver.crawler.sr.SrTopicUrlDTO;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -27,7 +28,7 @@
import java.time.format.DateTimeParseException;
import java.util.*;

public class SrFilmDetailTask extends SrRateLimitedDocumentTask<Film, SrTopicUrlDTO> {
public class SrFilmDetailTask extends AbstractDocumentTask<Film, SrTopicUrlDTO> {

private static final org.apache.logging.log4j.Logger LOG =
LogManager.getLogger(SrFilmDetailTask.class);
Expand Down Expand Up @@ -156,8 +157,12 @@ protected void processDocument(final SrTopicUrlDTO aUrlDTO, final Document aDocu

addUrls(film, videoInfo.getVideoUrls());

taskResults.add(film);
crawler.incrementAndGetActualCount();
if (taskResults.add(film)) {
crawler.incrementAndGetActualCount();
} else {
crawler.incrementAndGetErrorCount();
LOG.error("Rejected duplicate {}", film);
}
crawler.updateProgress();
} else {
LOG.error("SrFilmDetailTask: no title or video found for url {}", aUrlDTO.getUrl());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.mediathekview.mserver.base.HtmlConsts;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractDocumentTask;
import de.mediathekview.mserver.crawler.basic.AbstractUrlTask;
import de.mediathekview.mserver.crawler.sr.SrConstants;
import de.mediathekview.mserver.crawler.sr.SrTopicUrlDTO;
Expand All @@ -15,7 +16,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

public class SrTopicArchivePageTask
extends SrRateLimitedDocumentTask<SrTopicUrlDTO, SrTopicUrlDTO> {
extends AbstractDocumentTask<SrTopicUrlDTO, SrTopicUrlDTO> {

private static final String NEXT_PAGE_SELECTOR = "div.pagination__item > a[title*=weiter]";
private static final String SHOW_SELECTOR = "h3.teaser__text__header";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package de.mediathekview.mserver.crawler.zdf.tasks;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import de.mediathekview.mlib.daten.Sender;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractRestTask;
import de.mediathekview.mserver.crawler.basic.CrawlerUrlDTO;
Expand All @@ -20,9 +18,6 @@

public abstract class ZdfTaskBase<T, D extends CrawlerUrlDTO> extends AbstractRestTask<T, D> {
private static final Logger LOG = LogManager.getLogger(ZdfTaskBase.class);

private static RateLimiter limiter = null;

private final GsonBuilder gsonBuilder;

protected ZdfTaskBase(
Expand Down Expand Up @@ -73,11 +68,6 @@ private Response executeRequest(final WebTarget aTarget) {
request.header(
ZdfConstants.HEADER_AUTHENTIFICATION, AUTHORIZATION_BEARER + authKey.get());
}
if (limiter == null) {
limiter = RateLimiter.create(crawler.getRuntimeConfig().getSenderConfig(Sender.ZDF).getMaximumRequestsPerSecond());
}

limiter.acquire();
return request.header(HEADER_ACCEPT_ENCODING, ENCODING_GZIP).get();
}
}
Loading

0 comments on commit f35a682

Please sign in to comment.