Skip to content

Commit

Permalink
Fix bug in CCMimeCounter and update dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
tballison committed Oct 18, 2024
1 parent 2c106d1 commit e8a89cc
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 49 deletions.
27 changes: 16 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
<project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
<file.encoding>UTF-8</file.encoding>
<maven.compile.encoding>UTF-8</maven.compile.encoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>${project.build.sourceEncoding}</project.reporting.outputEncoding>
<httpcore.version>4.4.16</httpcore.version>
<httpclient.version>4.5.14</httpclient.version>
<httpcomponents.version>4.4.14</httpcomponents.version>
<jackson.version>2.15.2</jackson.version>
<jupiter.version>5.9.3</jupiter.version>
<tika.version>2.7.0</tika.version>
<jackson.version>2.18.0</jackson.version>
<jupiter.version>5.11.2</jupiter.version>
<tika.version>2.9.2</tika.version>
<maven.shade.version>3.4.1</maven.shade.version>
</properties>
<dependencyManagement>
Expand Down Expand Up @@ -71,7 +71,7 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand All @@ -81,7 +81,12 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down Expand Up @@ -118,7 +123,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
Expand All @@ -131,7 +136,7 @@
<dependency>
<groupId>org.netpreserve</groupId>
<artifactId>jwarc</artifactId>
<version>0.23.1</version>
<version>0.30.0</version>
</dependency>
<!-- logging -->
<dependency>
Expand All @@ -147,7 +152,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.10.0</version>
<version>1.12.0</version>
</dependency>
<!-- test -->
<dependency>
Expand Down Expand Up @@ -201,7 +206,7 @@
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>10.12.1</version>
<version>10.18.2</version>
</dependency>

</dependencies>
Expand Down
107 changes: 69 additions & 38 deletions src/main/java/org/tallison/cc/index/extractor/CCMimeCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public class CCMimeCounter {
private static final Long INDEX_WORKER_ID = 1l;
private static final Long INDEX_READER_ID = 2l;
private static final Logger LOGGER = LoggerFactory.getLogger(CCMimeCounter.class);
private static final int BATCH_SIZE = 100000;

public static void main(String[] args) throws Exception {
ExtractorConfig fetcherConfig =
new ObjectMapper().readValue(new File(args[0]), ExtractorConfig.class);
ExtractorConfig fetcherConfig = new ObjectMapper().readValue(new File(args[0]), ExtractorConfig.class);
execute(fetcherConfig);
}

Expand All @@ -93,8 +93,7 @@ private static void execute(ExtractorConfig fetcherConfig) throws IOException, T
int totalThreads = fetcherConfig.getNumThreads() + 1;

ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);
ExecutorCompletionService<Long> executorCompletionService =
new ExecutorCompletionService<>(executorService);
ExecutorCompletionService<Long> executorCompletionService = new ExecutorCompletionService<>(executorService);

IndexIterator indexIterator = fetcherConfig.getIndexIterator();
indexIterator.initialize(Collections.EMPTY_MAP);
Expand All @@ -106,12 +105,11 @@ private static void execute(ExtractorConfig fetcherConfig) throws IOException, T
for (int i = 0; i < fetcherConfig.getNumThreads(); i++) {
DetectedMimeCounter processor = new DetectedMimeCounter(fetcherConfig, counter);
detectedMimeCounters.add(processor);
executorCompletionService.submit(
new IndexWorker(fetcherConfig, indexPathsList, processor));
executorCompletionService.submit(new IndexWorker(fetcherConfig, indexPathsList, processor));
}


while (finishedWorkers < totalThreads) {
while (finishedWorkers < fetcherConfig.getNumThreads()) {
//blocking
Future<Long> future = executorCompletionService.take();
if (future != null) {
Expand Down Expand Up @@ -141,8 +139,7 @@ private static void execute(ExtractorConfig fetcherConfig) throws IOException, T
summarize(detectedMimeCounters);
}

private static void summarize(List<DetectedMimeCounter> detectedMimeCounters)
throws IOException {
private static void summarize(List<DetectedMimeCounter> detectedMimeCounters) throws IOException {
Map<String, Long> total = new HashMap<>();
Map<String, Long> truncated = new HashMap<>();
Map<String, Long> nonTruncated = new HashMap<>();
Expand All @@ -156,8 +153,8 @@ private static void summarize(List<DetectedMimeCounter> detectedMimeCounters)
report("non-truncated", nonTruncated);
}

private static void calcNonTruncated(Map<String, Long> truncated, Map<String, Long> total,
Map<String, Long> nonTruncated) {
private static void calcNonTruncated(Map<String, Long> truncated,
Map<String, Long> total, Map<String, Long> nonTruncated) {
for (Map.Entry<String, Long> e : total.entrySet()) {
Long val = e.getValue();
Long t = truncated.getOrDefault(e.getKey(), 0l);
Expand All @@ -167,11 +164,14 @@ private static void calcNonTruncated(Map<String, Long> truncated, Map<String, Lo
}

private static void report(String name, Map<String, Long> m) throws IOException {
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(name + ".csv"),
StandardCharsets.UTF_8)) {
try (BufferedWriter writer = Files.newBufferedWriter(
Paths.get(name + ".csv"), StandardCharsets.UTF_8)) {
try (CSVPrinter printer = new CSVPrinter(writer, CSVFormat.EXCEL)) {
printer.printRecord("mime", "count");
m.entrySet().stream().sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
m
.entrySet()
.stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.forEach(e -> {
try {
printer.printRecord(e.getKey(), e.getValue());
Expand All @@ -189,7 +189,9 @@ private static void update(Map<String, MutableLong> from, Map<String, Long> to)
if (cnt == null) {
cnt = 0l;
}
cnt += e.getValue().getValue();
cnt += e
.getValue()
.getValue();
to.put(e.getKey(), cnt);
}
}
Expand Down Expand Up @@ -230,50 +232,72 @@ public Long call() throws Exception {
}

private boolean processFile(FetchEmitTuple fetchEmitTuple,
AbstractRecordProcessor recordProcessor)
throws InterruptedException {
AbstractRecordProcessor recordProcessor) throws InterruptedException {
long start = System.currentTimeMillis();
LOGGER.info("starting to fetch index gz: {}",
fetchEmitTuple.getFetchKey().getFetchKey());
try (TikaInputStream tis = (TikaInputStream) fetcher.fetch(
fetchEmitTuple.getFetchKey().getFetchKey(), new Metadata())) {
LOGGER.info("starting to fetch index gz path={} with fetcher class={}", fetchEmitTuple
.getFetchKey()
.getFetchKey(), fetcher.getClass());
try (TikaInputStream tis = (TikaInputStream) fetcher.fetch(fetchEmitTuple
.getFetchKey()
.getFetchKey(), new Metadata())) {
try (InputStream is = new BufferedInputStream(new GZIPInputStream(tis))) {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(is, StandardCharsets.UTF_8))) {
String line = reader.readLine();
int lines = 0;
int lineCount = 0;
long elapsed = System.currentTimeMillis() - start;
LOGGER.info("Finished fetching {} bytes in {} ms for index gz: {}",
String.format(Locale.US, "%,d", tis.getLength()),
String.format(Locale.US, "%,d", elapsed),
fetchEmitTuple.getFetchKey().getFetchKey());
String.format(Locale.US, "%,d", elapsed), fetchEmitTuple
.getFetchKey()
.getFetchKey());
List<String> lines = new ArrayList<>();
String line = reader.readLine();
while (line != null) {
LOGGER.trace("about to add a line");
if (StringUtils.isBlank(line)) {
line = reader.readLine();
continue;
}
try {
boolean shouldContinue = recordProcessor.process(line);
lines.add(line);
if (lines.size() >= BATCH_SIZE) {
boolean shouldContinue = processLines(lines, recordProcessor);
if (!shouldContinue) {
return shouldContinue;
}
} catch (IOException e) {
LOGGER.warn("bad json: " + line);
lines.clear();
}
lines++;
line = reader.readLine();
}
boolean shouldContinue = processLines(lines, recordProcessor);
if (!shouldContinue) {
return shouldContinue;
}
}
}
} catch (TikaException | IOException e) {
LOGGER.error(
"failed while processing " + fetchEmitTuple.getFetchKey().getFetchKey(), e);
LOGGER.error("failed while processing " + fetchEmitTuple
.getFetchKey()
.getFetchKey(), e);
}
long elapsed = System.currentTimeMillis() - start;
LOGGER.info("finished processing index gz in ({}) ms: {}",
String.format(Locale.US, "%,d", elapsed),
fetchEmitTuple.getFetchKey().getFetchKey());
String.format(Locale.US, "%,d", elapsed), fetchEmitTuple
.getFetchKey()
.getFetchKey());
return true;
}

private boolean processLines(List<String> lines,
AbstractRecordProcessor recordProcessor) throws InterruptedException {
for (String line : lines) {
try {
boolean shouldContinue = recordProcessor.process(line);
if (!shouldContinue) {
return shouldContinue;
}
} catch (IOException e) {
LOGGER.warn("bad json: " + line);
}
}
return true;
}
}
Expand All @@ -283,14 +307,17 @@ private static class DetectedMimeCounter extends AbstractRecordProcessor {
private final CCIndexReaderCounter counter;
private final Map<String, MutableLong> totalCounts = new HashMap<>();
private final Map<String, MutableLong> truncatedCounts = new HashMap<>();

public DetectedMimeCounter(ExtractorConfig fetcherConfig, CCIndexReaderCounter counter) {
this.fetcherConfig = fetcherConfig;
this.counter = counter;
}

@Override
public boolean process(String json) throws IOException, InterruptedException {
long totalRead = counter.getRecordsRead().incrementAndGet();
long totalRead = counter
.getRecordsRead()
.incrementAndGet();
if (totalRead % 1000000 == 0) {
LOGGER.info("processed: {}", counter);
}
Expand All @@ -307,12 +334,16 @@ public boolean process(String json) throws IOException, InterruptedException {
return true;
}
CCIndexRecord r = record.get();
if (!fetcherConfig.getRecordSelector().select(r)) {
if (!fetcherConfig
.getRecordSelector()
.select(r)) {
return true;
}
increment(totalCounts, r.getNormalizedMimeDetected());
if (!StringUtils.isBlank(r.getTruncated())) {
long truncated = counter.getTruncated().incrementAndGet();
long truncated = counter
.getTruncated()
.incrementAndGet();
if (fetcherConfig.getMaxFilesTruncated() > -1 &&
truncated >= fetcherConfig.getMaxFilesTruncated()) {
LOGGER.info("hit max truncated files");
Expand Down

0 comments on commit e8a89cc

Please sign in to comment.