Skip to content

Commit

Permalink
Initial Experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Jul 24, 2024
1 parent ac98f34 commit ef3d567
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
6 changes: 6 additions & 0 deletions contrib/storage-splunk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.drill.exec</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.drill.exec.store.splunk;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Sets;
import com.splunk.IndexCollection;
import org.apache.calcite.schema.Table;
import org.apache.drill.common.exceptions.UserException;
Expand All @@ -28,7 +31,6 @@
import org.apache.drill.exec.planner.logical.ModifyTableEntry;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.StorageStrategy;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,19 +40,26 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class SplunkSchema extends AbstractSchema {
private final static Logger logger = LoggerFactory.getLogger(SplunkSchema.class);
private static final String SPL_TABLE_NAME = "spl";
private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
private final SplunkStoragePlugin plugin;
private final String queryUserName;
private final Cache<String, Set<String>> cache;

public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) {
super(Collections.emptyList(), plugin.getName());
this.plugin = plugin;
this.queryUserName = queryUserName;

// TODO Add Configuration Parameters for the schema cache
this.cache = Caffeine.newBuilder()
.expireAfterAccess(90, TimeUnit.MINUTES)
.maximumSize(100)
.build();

registerIndexes();
}
Expand Down Expand Up @@ -148,8 +157,20 @@ private void registerIndexes() {
registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(),
new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName)));

Set<String> indexList;
// Retrieve and add all other Splunk indexes
for (String indexName : connection.getIndexes().keySet()) {
// First check the cache to see if we have a list of indexes.
String nameKey = queryUserName + "-" + plugin.getName();
indexList = cache.getIfPresent(nameKey);

// If the index list is not in the cache, query Splunk, retrieve the index list and add it to the cache.
if (indexList == null) {
logger.debug("Index list not in Splunk schema cache. Retrieving from Splunk.");
indexList = connection.getIndexes().keySet();
cache.put(nameKey, indexList);
}

for (String indexName : indexList) {
logger.debug("Registering {}", indexName);
registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(),
new SplunkScanSpec(plugin.getName(), indexName, plugin.getConfig(), queryUserName)));
Expand Down

0 comments on commit ef3d567

Please sign in to comment.