Skip to content

Commit

Permalink
interface IndexCache
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Oct 19, 2023
1 parent 017cbe6 commit 458a92c
Show file tree
Hide file tree
Showing 7 changed files with 428 additions and 77 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

import java.io.IOException;
import java.util.Set;

/**
* A cache for caching indexes(including: ColumnIndex, OffsetIndex and BloomFilter)
*/
public interface IndexCache {

enum CacheStrategy {
NONE, /* No cache */
PRECACHE_BLOCK /* Precache for block indexes */
}

/**
* Create an index cache for the given file reader
*
* @param fileReader the file reader
* @param columns the columns that need to do cache
* @param cacheStrategy the cache strategy, supports NONE and PRECACHE_BLOCK
* @return the index cache
*/
static IndexCache create(
ParquetFileReader fileReader,
Set<ColumnPath> columns,
CacheStrategy cacheStrategy) {
if (cacheStrategy == CacheStrategy.NONE) {
return new NoneIndexCache(fileReader);
} else if (cacheStrategy == CacheStrategy.PRECACHE_BLOCK) {
return new PrefetchIndexCache(fileReader, columns);
} else {
throw new UnsupportedOperationException("Unknown cache strategy: " + cacheStrategy);
}
}

/**
* Set the current BlockMetadata
*/
void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException;

/**
* Get the ColumnIndex for the given column in the set row group.
*
* @param chunk the given column chunk
* @return the ColumnIndex for the given column
* @throws IOException if any I/O error occurs during get the ColumnIndex
*/
ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException;

/**
* Get the OffsetIndex for the given column in the set row group.
*
* @param chunk the given column chunk
* @return the OffsetIndex for the given column
* @throws IOException if any I/O error occurs during get the OffsetIndex
*/
OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException;

/**
* Get the BloomFilter for the given column in the set row group.
*
* @param chunk the given column chunk
* @return the BloomFilter for the given column
* @throws IOException if any I/O error occurs during get the BloomFilter
*/
BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException;

/**
* Clean the cache
*/
void clean();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

import java.io.IOException;

/**
* Cache nothing. All the get methods are pushed to ParquetFileReader to read the given index.
*/
class NoneIndexCache implements IndexCache {
private final ParquetFileReader fileReader;

NoneIndexCache(ParquetFileReader fileReader) {
this.fileReader = fileReader;
}

@Override
public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException {
// Do nothing
}

@Override
public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException {
return fileReader.readColumnIndex(chunk);
}

@Override
public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException {
return fileReader.readOffsetIndex(chunk);
}

@Override
public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException {
return fileReader.readBloomFilter(chunk);
}

@Override
public void clean() {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.rewrite;
package org.apache.parquet.hadoop;

import org.apache.parquet.Preconditions;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
Expand All @@ -32,61 +32,79 @@
import java.util.Set;

/**
* A cacher for caching file indexes(ColumnIndex, OffsetIndex, BloomFilter)
* This index cache will prefetch those columns' indexes when calling {@link #setBlockMetadata(BlockMetaData)}.
* <p>
*
* Note: the given index will be freed from the cache after calling the related get method.
*/
class IndexCacher {
class PrefetchIndexCache implements IndexCache {
private final ParquetFileReader fileReader;
private final Set<ColumnPath> columnPathSet;
private final boolean prefetchBlockAllIndexes;
private final Set<ColumnPath> columns;

// Only used when prefetchBlockAllIndexes is true
private Map<ColumnPath, ColumnIndex> columnIndexCache;
private Map<ColumnPath, OffsetIndex> offsetIndexCache;
private Map<ColumnPath, BloomFilter> bloomIndexCache;

IndexCacher(
PrefetchIndexCache(
ParquetFileReader fileReader,
Set<ColumnPath> columnPathSet,
boolean prefetchBlockAllIndexes) {
Set<ColumnPath> columns) {
this.fileReader = fileReader;
this.columnPathSet = columnPathSet;
this.prefetchBlockAllIndexes = prefetchBlockAllIndexes;
this.columns = columns;
}

void setCurrentBlockMetadata(BlockMetaData blockMetaData) throws IOException {
if (prefetchBlockAllIndexes) {
free();
this.columnIndexCache = readAllColumnIndexes(blockMetaData);
this.offsetIndexCache = readAllOffsetIndexes(blockMetaData);
this.bloomIndexCache = readAllBloomFilters(blockMetaData);
}
@Override
public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException {
this.columnIndexCache = readAllColumnIndexes(currentBlockMetadata);
this.offsetIndexCache = readAllOffsetIndexes(currentBlockMetadata);
this.bloomIndexCache = readAllBloomFilters(currentBlockMetadata);
}

ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException {
if (prefetchBlockAllIndexes) {
return columnIndexCache.remove(chunk.getPath());
@Override
public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException {
ColumnPath columnPath = chunk.getPath();
if (columns.contains(columnPath)) {
Preconditions.checkState(
columnIndexCache.containsKey(columnPath),
"Not found cached ColumnIndex for column: %s with cache strategy: %s",
columnPath.toDotString(),
CacheStrategy.PRECACHE_BLOCK);
}

return fileReader.readColumnIndex(chunk);
return columnIndexCache.remove(columnPath);
}

OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException {
if (prefetchBlockAllIndexes) {
return offsetIndexCache.remove(chunk.getPath());
@Override
public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException {
ColumnPath columnPath = chunk.getPath();

if (columns.contains(columnPath)) {
Preconditions.checkState(
offsetIndexCache.containsKey(columnPath),
"Not found cached OffsetIndex for column: %s with cache strategy: %s",
columnPath.toDotString(),
CacheStrategy.PRECACHE_BLOCK);
}

return fileReader.readOffsetIndex(chunk);
return offsetIndexCache.remove(columnPath);
}

BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException {
if (prefetchBlockAllIndexes) {
return bloomIndexCache.remove(chunk.getPath());
@Override
public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException {
ColumnPath columnPath = chunk.getPath();

if (columns.contains(columnPath)) {
Preconditions.checkState(
bloomIndexCache.containsKey(columnPath),
"Not found cached BloomFilter for column: %s with cache strategy: %s",
columnPath.toDotString(),
CacheStrategy.PRECACHE_BLOCK);
}

return fileReader.readBloomFilter(chunk);
return bloomIndexCache.remove(columnPath);
}

void free() {
@Override
public void clean() {
if (columnIndexCache != null) {
columnIndexCache.clear();
columnIndexCache = null;
Expand All @@ -104,9 +122,9 @@ void free() {
}

private Map<ColumnPath, ColumnIndex> readAllColumnIndexes(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, ColumnIndex> columnIndexMap = new HashMap<>(columnPathSet.size());
Map<ColumnPath, ColumnIndex> columnIndexMap = new HashMap<>(columns.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columnPathSet.contains(chunk.getPath())) {
if (columns.contains(chunk.getPath())) {
columnIndexMap.put(chunk.getPath(), fileReader.readColumnIndex(chunk));
}
}
Expand All @@ -115,9 +133,9 @@ private Map<ColumnPath, ColumnIndex> readAllColumnIndexes(BlockMetaData blockMet
}

private Map<ColumnPath, OffsetIndex> readAllOffsetIndexes(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, OffsetIndex> offsetIndexMap = new HashMap<>(columnPathSet.size());
Map<ColumnPath, OffsetIndex> offsetIndexMap = new HashMap<>(columns.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columnPathSet.contains(chunk.getPath())) {
if (columns.contains(chunk.getPath())) {
offsetIndexMap.put(chunk.getPath(), fileReader.readOffsetIndex(chunk));
}
}
Expand All @@ -126,9 +144,9 @@ private Map<ColumnPath, OffsetIndex> readAllOffsetIndexes(BlockMetaData blockMet
}

private Map<ColumnPath, BloomFilter> readAllBloomFilters(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, BloomFilter> bloomFilterMap = new HashMap<>(columnPathSet.size());
Map<ColumnPath, BloomFilter> bloomFilterMap = new HashMap<>(columns.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columnPathSet.contains(chunk.getPath())) {
if (columns.contains(chunk.getPath())) {
bloomFilterMap.put(chunk.getPath(), fileReader.readBloomFilter(chunk));
}
}
Expand Down
Loading

0 comments on commit 458a92c

Please sign in to comment.