-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
remove option to transform message in the indexer
- Loading branch information
Showing
22 changed files
with
585 additions
and
699 deletions.
There are no files selected for viewing
375 changes: 188 additions & 187 deletions
375
benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,187 +1,188 @@ | ||
package com.slack.kaldb; | ||
|
||
import com.google.protobuf.ByteString; | ||
import com.slack.kaldb.logstore.LogMessage; | ||
import com.slack.kaldb.logstore.LuceneIndexStoreImpl; | ||
import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; | ||
import com.slack.kaldb.writer.LogMessageWriterImpl; | ||
import com.slack.service.murron.Murron; | ||
import io.micrometer.core.instrument.MeterRegistry; | ||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry; | ||
import java.io.BufferedReader; | ||
import java.io.File; | ||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.text.SimpleDateFormat; | ||
import java.time.Duration; | ||
import java.util.Comparator; | ||
import java.util.Random; | ||
import java.util.stream.Stream; | ||
import org.apache.commons.io.FileUtils; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.common.record.TimestampType; | ||
import org.apache.lucene.store.Directory; | ||
import org.openjdk.jmh.annotations.*; | ||
|
||
@State(Scope.Thread) | ||
public class IndexAPILog { | ||
|
||
private Random random; | ||
private final Duration commitInterval = Duration.ofSeconds(5 * 60); | ||
private final Duration refreshInterval = Duration.ofSeconds(5 * 60); | ||
|
||
private Path tempDirectory; | ||
private MeterRegistry registry; | ||
LuceneIndexStoreImpl logStore; | ||
|
||
private String apiLogFile; | ||
private BufferedReader reader; | ||
private static SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); | ||
private int skipCount; | ||
private int indexCount; | ||
|
||
@Setup(Level.Iteration) | ||
public void createIndexer() throws Exception { | ||
random = new Random(); | ||
registry = new SimpleMeterRegistry(); | ||
tempDirectory = | ||
Files.createDirectories( | ||
Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); | ||
logStore = | ||
LuceneIndexStoreImpl.makeLogStore( | ||
tempDirectory.toFile(), | ||
commitInterval, | ||
refreshInterval, | ||
true, | ||
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, | ||
registry); | ||
|
||
apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); | ||
reader = Files.newBufferedReader(Path.of(apiLogFile)); | ||
skipCount = 0; | ||
indexCount = 0; | ||
} | ||
|
||
@TearDown(Level.Iteration) | ||
public void tearDown() throws IOException { | ||
Directory directory = logStore.getIndexWriter().getDirectory(); | ||
String[] segmentFiles = directory.listAll(); | ||
long indexedBytes = 0; | ||
for (String segmentFile : segmentFiles) { | ||
indexedBytes += directory.fileLength(segmentFile); | ||
} | ||
if (indexCount != 0) { | ||
// Displaying indexCount only makes sense in measureAPILogIndexingSlingshotMode | ||
System.out.println( | ||
"Indexed = " | ||
+ indexCount | ||
+ " Skipped = " | ||
+ skipCount | ||
+ " Index size = " | ||
+ FileUtils.byteCountToDisplaySize(indexedBytes)); | ||
} else { | ||
System.out.println( | ||
"Skipped = " | ||
+ skipCount | ||
+ " Index size = " | ||
+ FileUtils.byteCountToDisplaySize(indexedBytes)); | ||
} | ||
logStore.close(); | ||
try (Stream<Path> walk = Files.walk(tempDirectory)) { | ||
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); | ||
} | ||
registry.close(); | ||
if (reader != null) { | ||
reader.close(); | ||
} | ||
} | ||
|
||
@Benchmark | ||
public void measureAPILogIndexing() throws IOException { | ||
String line = reader.readLine(); | ||
if (line != null) { | ||
// Work that ideally shouldn't count towards benchmark performance result | ||
ConsumerRecord<String, byte[]> kafkaRecord = makeConsumerRecord(line); | ||
if (kafkaRecord == null) { | ||
// makeConsumerRecord will print why we skipped | ||
return; | ||
} | ||
// Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic | ||
try { | ||
LogMessage localLogMessage = | ||
LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); | ||
logStore.addMessage(localLogMessage); | ||
} catch (Exception e) { | ||
System.out.println("skipping - cannot transform " + e); | ||
skipCount++; | ||
} | ||
} else { | ||
System.out.println("resetting - reach EOF"); | ||
reader = Files.newBufferedReader(Path.of(apiLogFile)); | ||
} | ||
} | ||
|
||
@Benchmark | ||
public void measureAPILogIndexingSlingshotMode() throws IOException { | ||
String line; | ||
do { | ||
line = reader.readLine(); | ||
if (line != null) { | ||
// Work that ideally shouldn't count towards benchmark performance result | ||
ConsumerRecord<String, byte[]> kafkaRecord = makeConsumerRecord(line); | ||
if (kafkaRecord == null) { | ||
// makeConsumerRecord will print why we skipped | ||
continue; | ||
} | ||
// Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic | ||
try { | ||
LogMessage localLogMessage = | ||
LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); | ||
logStore.addMessage(localLogMessage); | ||
indexCount++; | ||
} catch (Exception e) { | ||
System.out.println("skipping - cannot transform " + e); | ||
} | ||
} | ||
} while (line != null); | ||
} | ||
|
||
public ConsumerRecord<String, byte[]> makeConsumerRecord(String line) { | ||
try { | ||
// get start of messageBody | ||
int messageDivision = line.indexOf("{"); | ||
|
||
// Everything will there is metadata | ||
String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); | ||
String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; | ||
long timestamp = df.parse(ts).toInstant().toEpochMilli(); | ||
|
||
String message = line.substring(messageDivision); | ||
Murron.MurronMessage testMurronMsg = | ||
Murron.MurronMessage.newBuilder() | ||
.setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) | ||
.setType(splitLine[5]) | ||
.setHost(splitLine[4]) | ||
.setTimestamp(timestamp) | ||
.build(); | ||
return new ConsumerRecord<>( | ||
"testTopic", | ||
1, | ||
10, | ||
0L, | ||
TimestampType.CREATE_TIME, | ||
0L, | ||
0, | ||
0, | ||
"testKey", | ||
testMurronMsg.toByteString().toByteArray()); | ||
} catch (Exception e) { | ||
System.out.println("skipping - cannot parse input" + e); | ||
skipCount++; | ||
return null; | ||
} | ||
} | ||
} | ||
// package com.slack.kaldb; | ||
// | ||
// import com.google.protobuf.ByteString; | ||
// import com.slack.kaldb.logstore.LogMessage; | ||
// import com.slack.kaldb.logstore.LuceneIndexStoreImpl; | ||
// import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; | ||
// import com.slack.kaldb.writer.LogMessageWriterImpl; | ||
// import com.slack.service.murron.Murron; | ||
// import io.micrometer.core.instrument.MeterRegistry; | ||
// import io.micrometer.core.instrument.simple.SimpleMeterRegistry; | ||
// import java.io.BufferedReader; | ||
// import java.io.File; | ||
// import java.io.IOException; | ||
// import java.nio.charset.StandardCharsets; | ||
// import java.nio.file.Files; | ||
// import java.nio.file.Path; | ||
// import java.nio.file.Paths; | ||
// import java.text.SimpleDateFormat; | ||
// import java.time.Duration; | ||
// import java.util.Comparator; | ||
// import java.util.Random; | ||
// import java.util.stream.Stream; | ||
// import org.apache.commons.io.FileUtils; | ||
// import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
// import org.apache.kafka.common.record.TimestampType; | ||
// import org.apache.lucene.store.Directory; | ||
// import org.openjdk.jmh.annotations.*; | ||
// | ||
// @State(Scope.Thread) | ||
// public class IndexAPILog { | ||
// | ||
// private Random random; | ||
// private final Duration commitInterval = Duration.ofSeconds(5 * 60); | ||
// private final Duration refreshInterval = Duration.ofSeconds(5 * 60); | ||
// | ||
// private Path tempDirectory; | ||
// private MeterRegistry registry; | ||
// LuceneIndexStoreImpl logStore; | ||
// | ||
// private String apiLogFile; | ||
// private BufferedReader reader; | ||
// private static SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); | ||
// private int skipCount; | ||
// private int indexCount; | ||
// | ||
// @Setup(Level.Iteration) | ||
// public void createIndexer() throws Exception { | ||
// random = new Random(); | ||
// registry = new SimpleMeterRegistry(); | ||
// tempDirectory = | ||
// Files.createDirectories( | ||
// Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); | ||
// logStore = | ||
// LuceneIndexStoreImpl.makeLogStore( | ||
// tempDirectory.toFile(), | ||
// commitInterval, | ||
// refreshInterval, | ||
// true, | ||
// | ||
// SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, | ||
// registry); | ||
// | ||
// apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); | ||
// reader = Files.newBufferedReader(Path.of(apiLogFile)); | ||
// skipCount = 0; | ||
// indexCount = 0; | ||
// } | ||
// | ||
// @TearDown(Level.Iteration) | ||
// public void tearDown() throws IOException { | ||
// Directory directory = logStore.getIndexWriter().getDirectory(); | ||
// String[] segmentFiles = directory.listAll(); | ||
// long indexedBytes = 0; | ||
// for (String segmentFile : segmentFiles) { | ||
// indexedBytes += directory.fileLength(segmentFile); | ||
// } | ||
// if (indexCount != 0) { | ||
// // Displaying indexCount only makes sense in measureAPILogIndexingSlingshotMode | ||
// System.out.println( | ||
// "Indexed = " | ||
// + indexCount | ||
// + " Skipped = " | ||
// + skipCount | ||
// + " Index size = " | ||
// + FileUtils.byteCountToDisplaySize(indexedBytes)); | ||
// } else { | ||
// System.out.println( | ||
// "Skipped = " | ||
// + skipCount | ||
// + " Index size = " | ||
// + FileUtils.byteCountToDisplaySize(indexedBytes)); | ||
// } | ||
// logStore.close(); | ||
// try (Stream<Path> walk = Files.walk(tempDirectory)) { | ||
// walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); | ||
// } | ||
// registry.close(); | ||
// if (reader != null) { | ||
// reader.close(); | ||
// } | ||
// } | ||
// | ||
// @Benchmark | ||
// public void measureAPILogIndexing() throws IOException { | ||
// String line = reader.readLine(); | ||
// if (line != null) { | ||
// // Work that ideally shouldn't count towards benchmark performance result | ||
// ConsumerRecord<String, byte[]> kafkaRecord = makeConsumerRecord(line); | ||
// if (kafkaRecord == null) { | ||
// // makeConsumerRecord will print why we skipped | ||
// return; | ||
// } | ||
// // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic | ||
// try { | ||
// LogMessage localLogMessage = | ||
// LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); | ||
// logStore.addMessage(localLogMessage); | ||
// } catch (Exception e) { | ||
// System.out.println("skipping - cannot transform " + e); | ||
// skipCount++; | ||
// } | ||
// } else { | ||
// System.out.println("resetting - reach EOF"); | ||
// reader = Files.newBufferedReader(Path.of(apiLogFile)); | ||
// } | ||
// } | ||
// | ||
// @Benchmark | ||
// public void measureAPILogIndexingSlingshotMode() throws IOException { | ||
// String line; | ||
// do { | ||
// line = reader.readLine(); | ||
// if (line != null) { | ||
// // Work that ideally shouldn't count towards benchmark performance result | ||
// ConsumerRecord<String, byte[]> kafkaRecord = makeConsumerRecord(line); | ||
// if (kafkaRecord == null) { | ||
// // makeConsumerRecord will print why we skipped | ||
// continue; | ||
// } | ||
// // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic | ||
// try { | ||
// LogMessage localLogMessage = | ||
// LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); | ||
// logStore.addMessage(localLogMessage); | ||
// indexCount++; | ||
// } catch (Exception e) { | ||
// System.out.println("skipping - cannot transform " + e); | ||
// } | ||
// } | ||
// } while (line != null); | ||
// } | ||
// | ||
// public ConsumerRecord<String, byte[]> makeConsumerRecord(String line) { | ||
// try { | ||
// // get start of messageBody | ||
// int messageDivision = line.indexOf("{"); | ||
// | ||
// // Everything will there is metadata | ||
// String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); | ||
// String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; | ||
// long timestamp = df.parse(ts).toInstant().toEpochMilli(); | ||
// | ||
// String message = line.substring(messageDivision); | ||
// Murron.MurronMessage testMurronMsg = | ||
// Murron.MurronMessage.newBuilder() | ||
// .setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) | ||
// .setType(splitLine[5]) | ||
// .setHost(splitLine[4]) | ||
// .setTimestamp(timestamp) | ||
// .build(); | ||
// return new ConsumerRecord<>( | ||
// "testTopic", | ||
// 1, | ||
// 10, | ||
// 0L, | ||
// TimestampType.CREATE_TIME, | ||
// 0L, | ||
// 0, | ||
// 0, | ||
// "testKey", | ||
// testMurronMsg.toByteString().toByteArray()); | ||
// } catch (Exception e) { | ||
// System.out.println("skipping - cannot parse input" + e); | ||
// skipCount++; | ||
// return null; | ||
// } | ||
// } | ||
// } |
Oops, something went wrong.