Skip to content

Commit

Permalink
Field-level Redaction (#1122)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
ermontross and bryanlb authored Nov 13, 2024
1 parent 713a6fb commit 7f46620
Show file tree
Hide file tree
Showing 39 changed files with 1,823 additions and 68 deletions.
22 changes: 16 additions & 6 deletions astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.slack.astra.metadata.cache.CacheSlotMetadata;
import com.slack.astra.metadata.cache.CacheSlotMetadataStore;
import com.slack.astra.metadata.core.AstraMetadataStoreChangeListener;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.replica.ReplicaMetadata;
import com.slack.astra.metadata.replica.ReplicaMetadataStore;
import com.slack.astra.metadata.schema.ChunkSchema;
Expand Down Expand Up @@ -50,6 +51,7 @@
* BlobFs.
*/
public class ReadOnlyChunkImpl<T> implements Chunk<T> {

private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyChunkImpl.class);
public static final String ASTRA_S3_STREAMING_FLAG = "astra.s3Streaming.enabled";

Expand All @@ -71,6 +73,7 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private final SnapshotMetadataStore snapshotMetadataStore;
private final SearchMetadataStore searchMetadataStore;
private CacheNodeAssignmentStore cacheNodeAssignmentStore;
private final FieldRedactionMetadataStore fieldRedactionMetadataStore;
private final MeterRegistry meterRegistry;
private final BlobStore blobStore;

Expand Down Expand Up @@ -104,7 +107,8 @@ public ReadOnlyChunkImpl(
SearchMetadataStore searchMetadataStore,
CacheNodeAssignmentStore cacheNodeAssignmentStore,
CacheNodeAssignment assignment,
SnapshotMetadata snapshotMetadata)
SnapshotMetadata snapshotMetadata,
FieldRedactionMetadataStore fieldRedactionMetadataStore)
throws Exception {
this(
curatorFramework,
Expand All @@ -117,7 +121,8 @@ public ReadOnlyChunkImpl(
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore);
searchMetadataStore,
fieldRedactionMetadataStore);
this.assignment = assignment;
this.lastKnownAssignmentState = assignment.state;
this.snapshotMetadata = snapshotMetadata;
Expand All @@ -135,7 +140,8 @@ public ReadOnlyChunkImpl(
CacheSlotMetadataStore cacheSlotMetadataStore,
ReplicaMetadataStore replicaMetadataStore,
SnapshotMetadataStore snapshotMetadataStore,
SearchMetadataStore searchMetadataStore)
SearchMetadataStore searchMetadataStore,
FieldRedactionMetadataStore fieldRedactionMetadataStore)
throws Exception {
this.meterRegistry = meterRegistry;
this.blobStore = blobStore;
Expand All @@ -147,6 +153,7 @@ public ReadOnlyChunkImpl(
this.replicaMetadataStore = replicaMetadataStore;
this.snapshotMetadataStore = snapshotMetadataStore;
this.searchMetadataStore = searchMetadataStore;
this.fieldRedactionMetadataStore = fieldRedactionMetadataStore;

if (!Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) {
CacheSlotMetadata cacheSlotMetadata =
Expand Down Expand Up @@ -230,7 +237,8 @@ public void downloadChunkData() {
this.logSearcher =
(LogIndexSearcher<T>)
new LogIndexSearcherImpl(
LogIndexSearcherImpl.searcherManagerFromChunkId(chunkInfo.chunkId, blobStore),
LogIndexSearcherImpl.searcherManagerFromChunkId(
chunkInfo.chunkId, blobStore, fieldRedactionMetadataStore),
chunkSchema.fieldDefMap);
} else {
// get data directory
Expand Down Expand Up @@ -264,7 +272,8 @@ public void downloadChunkData() {
this.logSearcher =
(LogIndexSearcher<T>)
new LogIndexSearcherImpl(
LogIndexSearcherImpl.searcherManagerFromPath(dataDirectory),
LogIndexSearcherImpl.searcherManagerFromPath(
dataDirectory, fieldRedactionMetadataStore),
chunkSchema.fieldDefMap);
}

Expand Down Expand Up @@ -414,7 +423,8 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
this.logSearcher =
(LogIndexSearcher<T>)
new LogIndexSearcherImpl(
LogIndexSearcherImpl.searcherManagerFromPath(dataDirectory),
LogIndexSearcherImpl.searcherManagerFromPath(
dataDirectory, fieldRedactionMetadataStore),
chunkSchema.fieldDefMap);

// we first mark the slot LIVE before registering the search metadata as available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.slack.astra.logstore.LogStore;
import com.slack.astra.logstore.LuceneIndexStoreImpl;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.search.SearchMetadataStore;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
Expand All @@ -24,6 +25,7 @@ public class RecoveryChunkFactoryImpl<T> implements ChunkFactory<T> {
private final MeterRegistry meterRegistry;
private final SearchMetadataStore searchMetadataStore;
private final SnapshotMetadataStore snapshotMetadataStore;
private final FieldRedactionMetadataStore fieldRedactionMetadataStore;
private final SearchContext searchContext;
private final AstraConfigs.IndexerConfig indexerConfig;
private String kafkaPartitionId = null;
Expand All @@ -34,13 +36,15 @@ public RecoveryChunkFactoryImpl(
MeterRegistry meterRegistry,
SearchMetadataStore searchMetadataStore,
SnapshotMetadataStore snapshotMetadataStore,
FieldRedactionMetadataStore fieldRedactionMetadataStore,
SearchContext searchContext) {
checkNotNull(indexerConfig, "indexerConfig can't be null");
this.indexerConfig = indexerConfig;
this.chunkDataPrefix = chunkDataPrefix;
this.meterRegistry = meterRegistry;
this.searchMetadataStore = searchMetadataStore;
this.snapshotMetadataStore = snapshotMetadataStore;
this.fieldRedactionMetadataStore = fieldRedactionMetadataStore;
this.searchContext = searchContext;
}

Expand All @@ -49,9 +53,13 @@ public ReadWriteChunk<T> makeChunk() throws IOException {
ensureNonNullString(kafkaPartitionId, "kafkaPartitionId can't be null and should be set.");
ensureNonNullString(indexerConfig.getDataDirectory(), "The data directory shouldn't be empty");
final File dataDirectory = new File(indexerConfig.getDataDirectory());

LogStore logStore =
LuceneIndexStoreImpl.makeLogStore(
dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry);
dataDirectory,
indexerConfig.getLuceneConfig(),
meterRegistry,
fieldRedactionMetadataStore);

return new RecoveryChunkImpl<>(
logStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.slack.astra.metadata.cache.CacheNodeMetadataStore;
import com.slack.astra.metadata.cache.CacheSlotMetadataStore;
import com.slack.astra.metadata.core.AstraMetadataStoreChangeListener;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.replica.ReplicaMetadataStore;
import com.slack.astra.metadata.search.SearchMetadataStore;
import com.slack.astra.metadata.snapshot.SnapshotMetadata;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {
private SnapshotMetadataStore snapshotMetadataStore;
private SearchMetadataStore searchMetadataStore;
private CacheSlotMetadataStore cacheSlotMetadataStore;
private FieldRedactionMetadataStore fieldRedactionMetadataStore;

// for flag "astra.ng.dynamicChunkSizes"
private final String cacheNodeId;
Expand Down Expand Up @@ -101,6 +103,7 @@ protected void startUp() throws Exception {
cacheNodeAssignmentStore =
new CacheNodeAssignmentStore(curatorFramework, zkConfig, cacheNodeId);
cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework, zkConfig);
fieldRedactionMetadataStore = new FieldRedactionMetadataStore(curatorFramework, zkConfig, true);

if (Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) {
cacheNodeAssignmentStore.addListener(cacheNodeAssignmentChangeListener);
Expand All @@ -122,7 +125,8 @@ protected void startUp() throws Exception {
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore);
searchMetadataStore,
fieldRedactionMetadataStore);

chunkMap.put(newChunk.getSlotId(), newChunk);
}
Expand Down Expand Up @@ -155,6 +159,7 @@ protected void shutDown() throws Exception {
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
fieldRedactionMetadataStore.close();

LOG.info("Closed caching chunk manager.");
}
Expand Down Expand Up @@ -235,7 +240,8 @@ private void onAssignmentHandler(CacheNodeAssignment assignment) {
searchMetadataStore,
cacheNodeAssignmentStore,
assignment,
snapshotsBySnapshotId.get(assignment.snapshotId));
snapshotsBySnapshotId.get(assignment.snapshotId),
fieldRedactionMetadataStore);
executorService.submit(newChunk::downloadChunkData);
chunkMap.put(assignment.assignmentId, newChunk);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.logstore.LogStore;
import com.slack.astra.logstore.LuceneIndexStoreImpl;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.search.SearchMetadataStore;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class IndexingChunkManager<T> extends ChunkManagerBase<T> {
private SnapshotMetadataStore snapshotMetadataStore;

private SearchMetadataStore searchMetadataStore;
private FieldRedactionMetadataStore fieldRedactionMetadataStore;

/**
* For capacity planning, we want to control how many roll overs are in progress at the same time.
Expand Down Expand Up @@ -261,7 +263,10 @@ private ReadWriteChunk<T> getOrCreateActiveChunk(
@SuppressWarnings("unchecked")
LogStore logStore =
LuceneIndexStoreImpl.makeLogStore(
dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry);
dataDirectory,
indexerConfig.getLuceneConfig(),
meterRegistry,
fieldRedactionMetadataStore);

chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory());

Expand Down Expand Up @@ -389,6 +394,7 @@ protected void startUp() throws Exception {

searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig);
fieldRedactionMetadataStore = new FieldRedactionMetadataStore(curatorFramework, zkConfig, true);

stopIngestion = false;
}
Expand Down Expand Up @@ -440,6 +446,7 @@ protected void shutDown() throws IOException {

searchMetadataStore.close();
snapshotMetadataStore.close();
fieldRedactionMetadataStore.close();
LOG.info("Closed indexing chunk manager.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.slack.astra.chunk.SearchContext;
import com.slack.astra.chunkrollover.NeverRolloverChunkStrategy;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.search.SearchMetadataStore;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
Expand Down Expand Up @@ -217,6 +218,7 @@ public static RecoveryChunkManager<LogMessage> fromConfig(
MeterRegistry meterRegistry,
SearchMetadataStore searchMetadataStore,
SnapshotMetadataStore snapshotMetadataStore,
FieldRedactionMetadataStore fieldRedactionMetadataStore,
AstraConfigs.IndexerConfig indexerConfig,
BlobStore blobStore)
throws Exception {
Expand All @@ -230,6 +232,7 @@ public static RecoveryChunkManager<LogMessage> fromConfig(
meterRegistry,
searchMetadataStore,
snapshotMetadataStore,
fieldRedactionMetadataStore,
searchContext);

ChunkRolloverFactory chunkRolloverFactory =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.slack.astra.logstore;

import com.slack.astra.logstore.schema.SchemaAwareLogDocumentBuilderImpl;
import com.slack.astra.logstore.search.fieldRedaction.RedactionFilterDirectoryReader;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.schema.LuceneFieldDef;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.util.RuntimeHalterImpl;
Expand All @@ -21,6 +23,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
Expand Down Expand Up @@ -81,15 +84,19 @@ public class LuceneIndexStoreImpl implements LogStore {

// TODO: Set the policy via a lucene config file.
public static LuceneIndexStoreImpl makeLogStore(
File dataDirectory, AstraConfigs.LuceneConfig luceneConfig, MeterRegistry metricsRegistry)
File dataDirectory,
AstraConfigs.LuceneConfig luceneConfig,
MeterRegistry metricsRegistry,
FieldRedactionMetadataStore fieldRedactionMetadataStore)
throws IOException {
return makeLogStore(
dataDirectory,
LuceneIndexStoreConfig.getCommitDuration(luceneConfig.getCommitDurationSecs()),
LuceneIndexStoreConfig.getRefreshDuration(luceneConfig.getRefreshDurationSecs()),
luceneConfig.getEnableFullTextSearch(),
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD,
metricsRegistry);
metricsRegistry,
fieldRedactionMetadataStore);
}

public static LuceneIndexStoreImpl makeLogStore(
Expand All @@ -98,7 +105,8 @@ public static LuceneIndexStoreImpl makeLogStore(
Duration refreshInterval,
boolean enableFullTextSearch,
SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy fieldConflictPolicy,
MeterRegistry metricsRegistry)
MeterRegistry metricsRegistry,
FieldRedactionMetadataStore fieldRedactionMetadataStore)
throws IOException {
// TODO: Move all these config values into chunk?
// TODO: Chunk should create log store?
Expand All @@ -110,11 +118,15 @@ public static LuceneIndexStoreImpl makeLogStore(
indexStoreCfg,
SchemaAwareLogDocumentBuilderImpl.build(
fieldConflictPolicy, enableFullTextSearch, metricsRegistry),
metricsRegistry);
metricsRegistry,
fieldRedactionMetadataStore);
}

public LuceneIndexStoreImpl(
LuceneIndexStoreConfig config, DocumentBuilder documentBuilder, MeterRegistry registry)
LuceneIndexStoreConfig config,
DocumentBuilder documentBuilder,
MeterRegistry registry,
FieldRedactionMetadataStore fieldRedactionMetadataStore)
throws IOException {

this.documentBuilder = documentBuilder;
Expand All @@ -126,7 +138,11 @@ public LuceneIndexStoreImpl(
buildIndexWriterConfig(analyzer, this.snapshotDeletionPolicy, config, registry);
indexDirectory = new MMapDirectory(config.indexFolder(id).toPath());
indexWriter = Optional.of(new IndexWriter(indexDirectory, indexWriterConfig));
this.searcherManager = new SearcherManager(indexWriter.get(), false, false, null);

RedactionFilterDirectoryReader reader =
new RedactionFilterDirectoryReader(
DirectoryReader.open(indexWriter.get(), false, false), fieldRedactionMetadataStore);
this.searcherManager = new SearcherManager(reader, null);

scheduledCommit.scheduleWithFixedDelay(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.slack.astra.logstore.LogMessage.SystemField;
import com.slack.astra.logstore.LogWireMessage;
import com.slack.astra.logstore.opensearch.OpenSearchAdapter;
import com.slack.astra.logstore.search.fieldRedaction.RedactionFilterDirectoryReader;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.schema.LuceneFieldDef;
import com.slack.astra.util.JsonUtil;
import java.io.IOException;
Expand All @@ -24,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MultiCollectorManager;
Expand Down Expand Up @@ -58,16 +61,26 @@ public class LogIndexSearcherImpl implements LogIndexSearcher<LogMessage> {
private final ReferenceManager.RefreshListener refreshListener;

@VisibleForTesting
public static SearcherManager searcherManagerFromChunkId(String chunkId, BlobStore blobStore)
public static SearcherManager searcherManagerFromChunkId(
String chunkId, BlobStore blobStore, FieldRedactionMetadataStore fieldRedactionMetadataStore)
throws IOException {
Directory directory = new S3RemoteDirectory(chunkId, blobStore);
return new SearcherManager(directory, null);
DirectoryReader directoryReader = DirectoryReader.open(directory);

RedactionFilterDirectoryReader reader =
new RedactionFilterDirectoryReader(directoryReader, fieldRedactionMetadataStore);
return new SearcherManager(reader, null);
}

@VisibleForTesting
public static SearcherManager searcherManagerFromPath(Path path) throws IOException {
public static SearcherManager searcherManagerFromPath(
Path path, FieldRedactionMetadataStore fieldRedactionMetadataStore) throws IOException {
MMapDirectory directory = new MMapDirectory(path);
return new SearcherManager(directory, null);
DirectoryReader directoryReader = DirectoryReader.open(directory);

RedactionFilterDirectoryReader reader =
new RedactionFilterDirectoryReader(directoryReader, fieldRedactionMetadataStore);
return new SearcherManager(reader, null);
}

public LogIndexSearcherImpl(
Expand All @@ -87,7 +100,6 @@ public void afterRefresh(boolean didRefresh) {
};
this.searcherManager = searcherManager;
this.searcherManager.addListener(refreshListener);

// initialize the adapter with whatever the default schema is

openSearchAdapter.loadSchema();
Expand Down
Loading

0 comments on commit 7f46620

Please sign in to comment.