result = new ArrayList<>();
+
+ if (pmemBlockLocations.length == 0) {
+ result.addAll(Arrays.asList(hdfsBlockLocations));
+ return result;
+ }
+
+ long currentOffset = start;
+ int pmemIndex = 0;
+ int hdfsIndex = 0;
+ while (currentOffset < start + len) {
+
+ long pmemOffset = pmemIndex >= pmemBlockLocations.length ?
+ Long.MAX_VALUE : pmemBlockLocations[pmemIndex].getOffset();
+ long hdfsOffset = hdfsIndex >= hdfsBlockLocations.length ?
+ Long.MAX_VALUE : hdfsBlockLocations[hdfsIndex].getOffset();
+
+ if (pmemOffset <= currentOffset) {
+
+ result.add(pmemBlockLocations[pmemIndex]);
+ currentOffset = pmemBlockLocations[pmemIndex].getOffset()
+ + pmemBlockLocations[pmemIndex].getLength();
+ pmemIndex ++;
+
+ } else if (hdfsOffset <= currentOffset) {
+
+ if (hdfsOffset + hdfsBlockLocations[hdfsIndex].getLength() > currentOffset) {
+ // copy block location data. keep no changes to hdfsBlockLocations[hdfsIndex]
+ BlockLocation temp = new BlockLocation(hdfsBlockLocations[hdfsIndex]);
+
+ temp.setOffset(currentOffset);
+ temp.setLength(Math.min(
+ hdfsOffset + temp.getLength() - currentOffset,
+ pmemOffset - currentOffset
+ ));
+
+ result.add(temp);
+ currentOffset = temp.getOffset() + temp.getLength();
+ } else {
+ hdfsIndex ++;
+ }
+
+ } else {
+ break;
+ }
+ }
+
+ return result;
+ }
+
+ public FSDataOutputStream create(Path path,
+ FsPermission fsPermission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progressable) throws IOException {
+ return this.hdfs.create(PathConverter.toHDFSScheme(path),
+ fsPermission,
+ overwrite,
+ bufferSize,
+ replication,
+ blockSize,
+ progressable);
+ }
+
+ public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable)
+ throws IOException {
+ return this.hdfs.append(PathConverter.toHDFSScheme(path), bufferSize, progressable);
+ }
+
+ public boolean rename(Path srcPath, Path dstPath) throws IOException {
+ return this.hdfs.rename(PathConverter.toHDFSScheme(srcPath),
+ PathConverter.toHDFSScheme(dstPath));
+ }
+
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ return this.hdfs.delete(PathConverter.toHDFSScheme(path), recursive);
+ }
+
+ public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
+ FileStatus[] result = this.hdfs.listStatus(PathConverter.toHDFSScheme(path));
+ for (FileStatus status : result) {
+ // convert scheme back
+ status.setPath(PathConverter.toScheme(status.getPath(), scheme));
+ }
+ return result;
+ }
+
+ public void setWorkingDirectory(Path path) {
+ this.hdfs.setWorkingDirectory(PathConverter.toHDFSScheme(path));
+ }
+
+ public Path getWorkingDirectory() {
+ Path result = this.hdfs.getWorkingDirectory();
+ // convert scheme back
+ return PathConverter.toScheme(result, this.scheme);
+ }
+
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+ return this.hdfs.mkdirs(PathConverter.toHDFSScheme(path), fsPermission);
+ }
+
+ public FileStatus getFileStatus(Path path) throws IOException {
+ FileStatus result = this.hdfs.getFileStatus(PathConverter.toHDFSScheme(path));
+ // convert scheme back
+ result.setPath(PathConverter.toScheme(result.getPath(), scheme));
+ return result;
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedFileSystemUtils.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedFileSystemUtils.java
new file mode 100644
index 000000000..2ed70d19a
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedFileSystemUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import org.apache.hadoop.fs.Path;
+
+public class CachedFileSystemUtils {
+
+ public static PMemBlock[] computePossiblePMemBlocks(Path path,
+ long start,
+ long len,
+ long blockSize) {
+ PMemBlock[] ret = new PMemBlock[0];
+
+ if (path == null || start < 0 || len <= 0 || blockSize <= 0) {
+ return ret;
+ }
+
+ long blkStart = start - (start % blockSize);
+ long blkEnd = ((start + len) % blockSize == 0) ? start + len
+ : start + len - ((start + len) % blockSize) + blockSize;
+ long blkNum = (blkEnd - blkStart) / blockSize;
+
+ ret = new PMemBlock[(int)blkNum];
+
+ for (int i = 0; i < blkNum; i++) {
+ ret[i] = new PMemBlock(path, blkStart + (i * blockSize), blockSize);
+ }
+
+ return ret;
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedHadoopFileSystem.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedHadoopFileSystem.java
new file mode 100644
index 000000000..16deccdab
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedHadoopFileSystem.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+/**
+ * The implementation of Hadoop AbstractFileSystem. The implementation delegates to the
+ * existing {@link CachedFileSystem} and is only necessary for use with
+ * Hadoop 2.x. Configuration example in Hadoop core-site.xml file:
+ *
+ * <property>
+ * <name>fs.AbstractFileSystem.cachedFs.impl</name>
+ * <value>com.intel.oap.fs.hadoop.cachedfs.CachedHadoopFileSystem</value>
+ * </property>
+ *
+ *
+ * The above configuration is used when you want to start a hadoop cluster
+ * after changing the 'fs.defaultFS' to
+ * the new schema started with 'cachedFs' in core-site.xml
+ *
+ */
+public class CachedHadoopFileSystem extends DelegateToFileSystem {
+ /**
+ * This constructor has the signature needed by
+ * {@link org.apache.hadoop.fs.AbstractFileSystem#createFileSystem(URI, Configuration)}
+ * in Hadoop 2.x.
+ *
+ * @param uri the uri for this Alluxio filesystem
+ * @param conf Hadoop configuration
+ * @throws URISyntaxException if uri
has syntax error
+ */
+ protected CachedHadoopFileSystem(final URI uri, final Configuration conf)
+ throws IOException, URISyntaxException {
+ super(uri, new CachedFileSystem(), conf, Constants.CACHED_FS_SCHEME, false);
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedInputStream.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedInputStream.java
new file mode 100644
index 000000000..91b27e273
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/CachedInputStream.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.CacheManager;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.CacheManagerFactory;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.FiberCache;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.ObjectId;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.SimpleFiberCache;
+import com.intel.oap.fs.hadoop.cachedfs.redis.RedisGlobalPMemCacheStatisticsStore;
+import com.intel.oap.fs.hadoop.cachedfs.redis.RedisPMemBlockLocationStore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CachedInputStream extends FSInputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(CachedInputStream.class);
+
+ private final FSDataInputStream hdfsInputStream;
+ private final Configuration conf;
+ private final Path path;
+ private final int bufferSize;
+ private final long contentLength;
+
+ private long pos;
+ private long currentCachePos;
+ private boolean closed;
+
+ private byte[] oneByte;
+
+ private PMemBlock currentBlock;
+ private CacheManager cacheManager;
+ private PMemBlockLocationStore locationStore;
+ private PMemCacheStatisticsStore statisticsStore;
+
+ private final long pmemCachedBlockSize;
+ private ObjectId[] ids;
+
+ private int cacheMissCount = 0;
+ private int cacheHitCount = 0;
+ private List cachedBlocks = new ArrayList<>();
+
+ // white list and black list regular expressions that decide whether to cache or not
+ private String cacheWhiteListRegexp;
+ private String cacheBlackListRegexp;
+ private boolean fileShouldBeCached;
+
+ public CachedInputStream(FSDataInputStream hdfsInputStream, Configuration conf,
+ Path path, int bufferSize, long contentLength) {
+ this.hdfsInputStream = hdfsInputStream;
+ this.conf = conf;
+ this.path = path;
+ this.bufferSize = bufferSize;
+ this.contentLength = contentLength;
+
+ this.pos = 0;
+ this.currentCachePos = 0;
+ this.closed = false;
+
+ this.currentBlock = null;
+ this.cacheManager = CacheManagerFactory.getOrCreate();
+ this.locationStore = new RedisPMemBlockLocationStore(conf);
+ this.pmemCachedBlockSize = conf.getLong(Constants.CONF_KEY_CACHED_FS_BLOCK_SIZE,
+ Constants.DEFAULT_CACHED_BLOCK_SIZE);
+ this.statisticsStore = new RedisGlobalPMemCacheStatisticsStore(conf);
+ this.ids = new ObjectId[(int)((contentLength + pmemCachedBlockSize - 1) / pmemCachedBlockSize)];
+
+ cacheWhiteListRegexp = conf.get(Constants.CONF_KEY_CACHE_WHITE_LIST_REGEXP,
+ Constants.DEFAULT_CACHE_WHITE_LIST_REGEXP);
+
+ cacheBlackListRegexp = conf.get(Constants.CONF_KEY_CACHE_BLACK_LIST_REGEXP,
+ Constants.DEFAULT_CACHE_BLACK_LIST_REGEXP);
+
+ fileShouldBeCached = checkFileShouldBeCached();
+
+ LOG.info("Opening file: {} for reading. fileShouldBeCached: {}", path, fileShouldBeCached);
+ }
+
+ private boolean checkFileShouldBeCached() {
+ return (cacheWhiteListRegexp.isEmpty()
+ || Pattern.compile(cacheWhiteListRegexp).matcher(path.toString()).find())
+ && (cacheBlackListRegexp.isEmpty()
+ || !Pattern.compile(cacheBlackListRegexp).matcher(path.toString()).find());
+ }
+
+ private void advanceCachePosition(long pos) {
+ if ((pos >= currentCachePos) && (pos < currentCachePos + pmemCachedBlockSize)) {
+ return;
+ }
+ currentCachePos = pos / pmemCachedBlockSize * pmemCachedBlockSize;
+ currentBlock = null;
+ }
+
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ checkNotClosed();
+
+ if (this.pos == pos) {
+ return;
+ }
+
+ if (pos < 0) {
+ throw new EOFException("Cannot seek to negative offset");
+ }
+
+ if (contentLength > 0 && pos > contentLength - 1) {
+ throw new EOFException("Cannot seek after EOF");
+ }
+
+ this.pos = pos;
+
+ advanceCachePosition(pos);
+
+ LOG.debug("Seeking file: {} to pos: {}.", path, pos);
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ boolean newNode = hdfsInputStream.seekToNewSource(targetPos);
+ if (!newNode) {
+ seek(targetPos);
+ }
+ return newNode;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (oneByte == null) {
+ oneByte = new byte[1];
+ }
+ if (read(oneByte, 0, 1) <= 0) {
+ return -1;
+ }
+ return oneByte[0] & 0xFF;
+ }
+
+ private boolean dataAvailableInCache() {
+ return currentBlock != null && pos >= currentCachePos
+ && pos < currentCachePos + currentBlock.getLength();
+ }
+
+ private boolean ensureDataInCache() throws IOException {
+ if (dataAvailableInCache()) {
+ return true;
+ }
+ advanceCachePosition(pos);
+ if (currentCachePos >= contentLength) {
+ return false;
+ }
+ final long bytesToRead = Math.min(pmemCachedBlockSize, contentLength - currentCachePos);
+ currentBlock = new PMemBlock(path, currentCachePos, bytesToRead);
+ ObjectId id = new ObjectId(currentBlock.getCacheKey());
+
+ boolean cacheHit = false;
+ boolean cacheValid = false;
+ ByteBuffer cachedByteBuffer = null;
+ if (fileShouldBeCached) {
+ cacheHit = cacheManager.contains(id);
+ }
+
+ // read block from cache
+ if (cacheHit) {
+ LOG.debug("read block from cache: {}", currentBlock);
+
+ // get cache
+ try {
+ cachedByteBuffer = ((SimpleFiberCache)cacheManager.get(id)).getBuffer();
+ cacheHitCount += 1;
+ cacheValid = true;
+ } catch (Exception ex) {
+ // fail
+ LOG.warn("exception when get cache: {}, block: {}", ex.toString(), currentBlock);
+ cacheValid = false;
+
+ // remove cache
+ try {
+ cacheManager.delete(id);
+ LOG.info("block cache removed, block: {}", currentBlock);
+ } catch (Exception ex1) {
+ // ignore
+ LOG.warn("exception when removing block cache: {}, block: {}",
+ ex1.toString(), currentBlock);
+ }
+ }
+ }
+
+ // read block from HDFS
+ if (!cacheHit || !cacheValid){
+ LOG.info("read block from hdfs: {}", currentBlock);
+ if (fileShouldBeCached) {
+ cacheMissCount += 1;
+ }
+ hdfsInputStream.seek(currentCachePos);
+ byte[] cacheBytes = new byte[(int)bytesToRead];
+ hdfsInputStream.readFully(cacheBytes);
+ cachedByteBuffer = ByteBuffer.wrap(cacheBytes);
+ hdfsInputStream.seek(pos);
+
+ // save to cache
+ if (fileShouldBeCached && !cacheManager.contains(id)) {
+ try {
+ FiberCache fiberCache = cacheManager.create(id, bytesToRead);
+ ((SimpleFiberCache)fiberCache).getBuffer().put(cacheBytes);
+ cacheManager.seal(id);
+ ids[(int)(currentCachePos / pmemCachedBlockSize)] = id;
+ LOG.info("data cached to pmem for block: {}", currentBlock);
+ cacheValid = true;
+ } catch (Exception ex) {
+ cacheValid = false;
+ LOG.warn("exception, data not cached to pmem for block: {}", currentBlock);
+ }
+ } else {
+ LOG.debug("data will not be cached since it's in blacklist or it's already cached: {}",
+ currentBlock);
+ }
+ }
+
+ if (fileShouldBeCached && cacheValid) {
+ cachedBlocks.add(new PMemBlock(currentBlock.getPath(),
+ currentBlock.getOffset(),
+ currentBlock.getLength()));
+ }
+
+ currentBlock.setData(cachedByteBuffer);
+ return true;
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len) throws IOException {
+ checkNotClosed();
+
+ int totalBytesRead = 0;
+ while (len > 0 && pos < contentLength) {
+ if (!ensureDataInCache()) {
+ return totalBytesRead;
+ }
+ int currentOffsetInCache = (int)(pos - currentCachePos);
+ int bytesRemainingInCurrentCache = (int)(currentBlock.getLength() - currentOffsetInCache);
+ int bytesToRead = Math.min(len, bytesRemainingInCurrentCache);
+
+ currentBlock.getData().position(currentOffsetInCache);
+ currentBlock.getData().get(buf, off, bytesToRead);
+
+ totalBytesRead += bytesToRead;
+ pos += bytesToRead;
+ off += bytesToRead;
+ len -= bytesToRead;
+ }
+
+ if (len > 0 && totalBytesRead == 0) {
+ return -1;
+ }
+
+ return totalBytesRead;
+ }
+
+ @Override
+ public synchronized int available() throws IOException {
+ checkNotClosed();
+
+ long remaining = this.contentLength - this.pos;
+ return (int)remaining;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ super.close();
+ hdfsInputStream.close();
+ closed = true;
+
+ // set cache locations to redis
+ String host = "";
+ try {
+ host = InetAddress.getLocalHost().getHostName();
+ locationStore.addBlockLocations(cachedBlocks, host);
+ LOG.debug("block locations saved. path: {}, host: {}", path.toString(), host);
+ } catch (Exception ex) {
+ // ignore
+ LOG.warn("block locations failed to be saved. path: {}, host: {}", path.toString(), host);
+ }
+
+ // set cache hit/miss count
+ statisticsStore.incrementCacheHit(cacheHitCount);
+ statisticsStore.incrementCacheMissed(cacheMissCount);
+
+ for (int i = 0; i < ids.length; i++) {
+ if (ids[i] != null) {
+ cacheManager.release(ids[i]);
+ LOG.debug("release id: {}", ids[i]);
+ }
+ }
+ }
+ }
+
+ private void checkNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed!");
+ }
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/Constants.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/Constants.java
new file mode 100644
index 000000000..c551e5bcc
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/Constants.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+public class Constants {
+ public static final String HDFS_SCHEME = "hdfs";
+
+ public static final String CACHED_FS_SCHEME = "cachedFs";
+
+ public static final long DEFAULT_CACHED_BLOCK_SIZE = 1024 * 1024 * 16;
+
+ public static final String DEFAULT_REDIS_HOST = "localhost";
+
+ public static final int DEFAULT_REDIS_PORT = 6379;
+
+ public static final String DEFAULT_REDIS_AUTH = "";
+
+ public static final int DEFAULT_REDIS_POOL_MAX_TOTAL = 100;
+
+ public static final int DEFAULT_REDIS_POOL_MAX_IDLE = 1000;
+
+ public static final String CONF_KEY_CACHED_FS_BLOCK_SIZE = "fs.cachedFs.block.size";
+
+ public static final String CONF_KEY_CACHED_FS_REDIS_HOST = "fs.cachedFs.redis.host";
+
+ public static final String CONF_KEY_CACHED_FS_REDIS_PORT = "fs.cachedFs.redis.port";
+
+ public static final String CONF_KEY_CACHED_FS_REDIS_AUTH = "fs.cachedFs.redis.auth";
+
+ public static final String CONF_KEY_CACHED_FS_REDIS_MAX_TOTAL = "fs.cachedFs.redis.maxTotal";
+
+ public static final String CONF_KEY_CACHED_FS_REDIS_MAX_IDLE = "fs.cachedFs.redis.maxIdle";
+
+ public static final String REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_HIT
+ = "pmem_cache_global_cache_hit";
+
+ public static final String REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_MISSED
+ = "pmem_cache_global_cache_missed";
+
+ public static final long UNSAFE_COPY_MEMORY_STEP_LENGTH = 1024 * 1024;
+
+ public static final String CONF_KEY_CACHED_FS_BLOCK_LOCATION_POLICY
+ = "fs.cachedFs.blockLocation.policy";
+
+ // default policy. file block locations consist of cached blocks
+ // and hdfs blocks (if cached blocks are incomplete)
+ public static final String CACHE_LOCATION_POLICY_DEFAULT = "default";
+ public static final String CACHE_LOCATION_POLICY_MERGING_HDFS = "cache_merging_hdfs";
+
+ // use cached block location only if all requested content is cached,
+ // otherwise use HDFS block locations.
+ public static final String CACHE_LOCATION_POLICY_OVER_HDFS = "cache_over_hdfs";
+
+ // use HDFS file block locations directly.
+ // ignoring cached blocks when finding file block locations
+ public static final String CACHE_LOCATION_POLICY_HDFS_ONLY = "hdfs_only";
+
+ // regular expression that contains patterns of paths which will be cached.
+ // files will not be cached when their paths match black list regexp.
+ // an empty regexp results in matching everything.
+ // eg. cachedFs://localhost:9000/dir/
+ public static final String CONF_KEY_CACHE_WHITE_LIST_REGEXP = "fs.cachedFs.whiteList.regexp";
+
+ public static final String DEFAULT_CACHE_WHITE_LIST_REGEXP = ".*";
+
+ // regular expression that contains patterns of paths which will not be cached.
+ // an empty regexp results in no matching of black list.
+ // eg. io_data|io_control
+ public static final String CONF_KEY_CACHE_BLACK_LIST_REGEXP = "fs.cachedFs.blacklist.regexp";
+
+ public static final String DEFAULT_CACHE_BLACK_LIST_REGEXP = "";
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlock.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlock.java
new file mode 100644
index 000000000..a4817625d
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlock.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * pmem cache block
+ */
+public class PMemBlock {
+
+ private Path path;
+
+ private long offset;
+
+ private long length;
+
+ private String cacheKey;
+
+ private ByteBuffer data;
+
+ public PMemBlock(Path path, long offset, long length) {
+ this(path, offset, length, null);
+ }
+
+ public PMemBlock(Path path, long offset, long length, ByteBuffer data) {
+ this.path = path;
+ this.offset = offset;
+ this.length = length;
+ this.cacheKey = "pmem_hcfs_blk:" + path.toUri().toString() + ":" + offset + "_" + length;
+ this.data = data;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public void setPath(Path path) {
+ this.path = path;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ public String getCacheKey() {
+ return cacheKey;
+ }
+
+ public void setCacheKey(String cacheKey) {
+ this.cacheKey = cacheKey;
+ }
+
+ public ByteBuffer getData() {
+ return data;
+ }
+
+ public void setData(ByteBuffer data) {
+ this.data = data;
+ }
+
+ @Override
+ public String toString() {
+ return "path: " + path.toString() + ", offset: " + offset + ", length: " + length;
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlockLocation.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlockLocation.java
new file mode 100644
index 000000000..3e3340d9e
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlockLocation.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import org.apache.hadoop.fs.BlockLocation;
+
+/**
+ * pmem cache block location
+ */
+public class PMemBlockLocation extends BlockLocation {
+ private PMemBlock cachedBlock;
+
+ public PMemBlockLocation(String[] hosts, PMemBlock cachedBlock) {
+ super(null, hosts, cachedBlock.getOffset(), cachedBlock.getLength());
+ this.cachedBlock = cachedBlock;
+ }
+
+ public PMemBlock getCachedBlock() {
+ return cachedBlock;
+ }
+
+ public void setCachedBlock(PMemBlock cachedBlock) {
+ this.cachedBlock = cachedBlock;
+ }
+
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlockLocationStore.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlockLocationStore.java
new file mode 100644
index 000000000..97cfb70e8
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemBlockLocationStore.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import java.util.List;
+
+public interface PMemBlockLocationStore {
+ void addBlockLocation(PMemBlock block, String host);
+
+ void addBlockLocations(List blocks, String host);
+
+ PMemBlockLocation getBlockLocation(PMemBlock block);
+
+ PMemBlockLocation[] getBlockLocations(PMemBlock[] blocks, boolean consecutive);
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemCacheStatisticsStore.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemCacheStatisticsStore.java
new file mode 100644
index 000000000..e319311ce
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PMemCacheStatisticsStore.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+public interface PMemCacheStatisticsStore {
+ void reset();
+
+ void incrementCacheHit(int count);
+
+ void incrementCacheMissed(int count);
+
+ long getCacheHit();
+
+ long getCacheMissed();
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PathConverter.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PathConverter.java
new file mode 100644
index 000000000..ce3d8f55c
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/PathConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import org.apache.hadoop.fs.Path;
+
+public class PathConverter {
+ public static Path toHDFSScheme(Path path) {
+ if (path == null || path.toUri() == null
+ || path.toUri().getScheme() == null
+ || path.toUri().getScheme().length() == 0) {
+ return path;
+ }
+
+ return new Path(URIConverter.toHDFSScheme(path.toUri()));
+ }
+
+ public static Path toCachedFSScheme(Path path) {
+ if (path == null || path.toUri() == null
+ || path.toUri().getScheme() == null
+ || path.toUri().getScheme().length() == 0) {
+ return path;
+ }
+
+ return new Path(URIConverter.toCachedFSScheme(path.toUri()));
+ }
+ public static Path toScheme(Path path, String newScheme) {
+ if (path == null || path.toUri() == null
+ || path.toUri().getScheme() == null
+ || path.toUri().getScheme().length() == 0) {
+ return path;
+ }
+
+ return new Path(URIConverter.toScheme(path.toUri(), newScheme));
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/SimpleCachedInputStream.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/SimpleCachedInputStream.java
new file mode 100644
index 000000000..989a7c9c2
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/SimpleCachedInputStream.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.CacheManager;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.CacheManagerFactory;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.FiberCache;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.ObjectId;
+import com.intel.oap.fs.hadoop.cachedfs.cacheutil.SimpleFiberCache;
+import com.intel.oap.fs.hadoop.cachedfs.redis.RedisGlobalPMemCacheStatisticsStore;
+import com.intel.oap.fs.hadoop.cachedfs.redis.RedisPMemBlockLocationStore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleCachedInputStream extends FSInputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleCachedInputStream.class);
+
+ private FSDataInputStream hdfsInputStream;
+
+ private Configuration conf;
+ private Path path;
+
+ private int bufferSize;
+
+ private long contentLength;
+ private long position;
+ private boolean closed;
+ private long partRemaining;
+
+ private long expectNextPos;
+ private long lastByteStart;
+
+ private long pmemCachedBlockSize;
+
+ private PMemBlock currentBlock;
+
+ private CacheManager cacheManager;
+ private PMemBlockLocationStore locationStore;
+ private PMemCacheStatisticsStore statisticsStore;
+
+ public SimpleCachedInputStream(FSDataInputStream hdfsInputStream,
+ Configuration conf,
+ Path path,
+ int bufferSize,
+ Long contentLength) {
+ this.hdfsInputStream = hdfsInputStream;
+
+ this.conf = conf;
+ this.path = path;
+ this.bufferSize = bufferSize;
+
+ this.contentLength = contentLength;
+ this.expectNextPos = 0L;
+ this.lastByteStart = -1L;
+ this.closed = false;
+
+ this.pmemCachedBlockSize = conf.getLong(Constants.CONF_KEY_CACHED_FS_BLOCK_SIZE,
+ Constants.DEFAULT_CACHED_BLOCK_SIZE);
+
+ this.cacheManager = CacheManagerFactory.getOrCreate();
+ this.locationStore = new RedisPMemBlockLocationStore(conf);
+ this.statisticsStore = new RedisGlobalPMemCacheStatisticsStore(conf);
+ }
+
+ public synchronized void seek(long pos) throws IOException {
+ LOG.info("seek, path: {}, pos: {}", path, pos);
+
+ if (this.position == pos) {
+ return;
+ }
+
+ // compute cache block
+ PMemBlock block = CachedFileSystemUtils
+ .computePossiblePMemBlocks(path, pos, 1, pmemCachedBlockSize)[0];
+
+ // create new block
+ if (currentBlock == null || currentBlock.getOffset() != block.getOffset()) {
+ this.fetchBlockDataAndCache(block);
+ this.currentBlock = block;
+ this.partRemaining = block.getLength();
+
+ LOG.info("new block created to seek, path: {}, pos: {}", path, pos);
+ }
+
+ // seek in current block
+ long len = pos - this.currentBlock.getOffset();
+ this.partRemaining = this.currentBlock.getLength() - len;
+ this.position = pos;
+
+ LOG.info("seek in current block, path: {}, pos: {}", path, pos);
+
+ // seek in backend stream
+ this.hdfsInputStream.seek(pos);
+ }
+
+ private void fetchBlockDataAndCache(PMemBlock block) throws IOException {
+ LOG.info("fetch block: {}", block);
+
+ // read data from backend stream
+ long len = block.getOffset() + block.getLength() > this.contentLength ?
+ this.contentLength - block.getOffset() : block.getLength();
+ block.setLength(len);
+ byte[] buffer = new byte[(int)len];
+
+ LOG.info("init block buffer with length: {}", len);
+
+ // check pmem cache for new block
+ ObjectId objectId = new ObjectId(block.getCacheKey());
+
+ boolean cached = true;
+ if (cacheManager.contains(objectId)) {
+ LOG.info("pmem cache found for block: {}", block);
+
+ this.statisticsStore.incrementCacheHit(1);
+
+ // read data from local pmem cache
+ FiberCache cacheObject = cacheManager.get(objectId);
+ ByteBuffer cacheBuffer = ((SimpleFiberCache)cacheObject).getBuffer();
+ block.setData(cacheBuffer);
+
+ LOG.info("data read from pmem for block: {}", block);
+ } else {
+ LOG.info("pmem cache NOT found for block: {}", block);
+
+ this.statisticsStore.incrementCacheMissed(1);
+
+ // read data from backend stream
+ this.hdfsInputStream.seek(block.getOffset());
+ this.hdfsInputStream.readFully(buffer, 0, (int)len);
+ block.setData(ByteBuffer.wrap(buffer));
+
+ LOG.info("data read from HDFS for block: {}", block);
+
+ // reset backend stream position
+ this.hdfsInputStream.seek(this.position);
+
+ // cache data to pmem
+ // double check
+ if (!cacheManager.contains(objectId)) {
+ try {
+ FiberCache cacheObject = cacheManager.create(objectId, block.getLength());
+ ((SimpleFiberCache)cacheObject).getBuffer().put(buffer);
+ cacheManager.seal(objectId);
+ LOG.info("data cached to pmem for block: {}", block);
+ } catch (Exception exception) {
+ LOG.warn("exception, data not cached to pmem for block: {}", block);
+ cached = false;
+ }
+ } else {
+ LOG.info("data already cached to pmem by others for block: {}", block);
+ }
+ }
+
+ if (cached) {
+ // save location info to redis
+ String host = "";
+ try {
+ host = InetAddress.getLocalHost().getHostName();
+
+ this.locationStore.addBlockLocation(block, host);
+ LOG.info("block location saved for block: {}, host: {}", block, host);
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+ }
+
+ public synchronized long getPos() throws IOException {
+ return this.position;
+ }
+
+ public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ LOG.info("seekToNewSource, path: {}, : {}", path, targetPos);
+
+ boolean ret = this.hdfsInputStream.seekToNewSource(targetPos);
+ if (ret) {
+ this.seek(targetPos);
+ }
+ return ret;
+ }
+
+ public synchronized int read() throws IOException {
+ checkNotClosed();
+
+ // create new block
+ if ((currentBlock == null || partRemaining <= 0) && this.position < this.contentLength) {
+ // compute cache block
+ PMemBlock block = CachedFileSystemUtils
+ .computePossiblePMemBlocks(path, this.position, 1, pmemCachedBlockSize)[0];
+ this.fetchBlockDataAndCache(block);
+ this.currentBlock = block;
+ this.partRemaining = block.getLength();
+ LOG.info("read new block, remaining, {}", this.partRemaining);
+ }
+
+ // seek in current block
+ long len = this.position - this.currentBlock.getOffset();
+ this.partRemaining = this.currentBlock.getLength() - len;
+
+ // read byte
+ int byteRead = -1;
+ if (this.partRemaining != 0L) {
+ int idx = (int)this.currentBlock.getLength() - (int)this.partRemaining;
+ byteRead = this.currentBlock.getData().get(idx) & 255;
+
+ }
+
+ if (byteRead >= 0) {
+ ++this.position;
+ --this.partRemaining;
+ }
+
+ return byteRead;
+ }
+
+ @Override
+ public synchronized int available() throws IOException {
+ return hdfsInputStream.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ this.hdfsInputStream.close();
+ this.closed = true;
+ }
+
+ private void checkNotClosed() throws IOException {
+ if (this.closed) {
+ throw new IOException("Stream is closed!");
+ }
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/URIConverter.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/URIConverter.java
new file mode 100644
index 000000000..d809eec53
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/URIConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs;
+
+import java.net.URI;
+import javax.ws.rs.core.UriBuilder;
+
+public class URIConverter {
+ public static URI toHDFSScheme(URI name) {
+ if (name == null || name.getScheme() == null || name.getScheme().length() == 0) {
+ return name;
+ }
+ return UriBuilder.fromUri(name).scheme(Constants.HDFS_SCHEME).build();
+ }
+
+ public static URI toCachedFSScheme(URI name) {
+ if (name == null || name.getScheme() == null || name.getScheme().length() == 0) {
+ return name;
+ }
+ return UriBuilder.fromUri(name).scheme(Constants.CACHED_FS_SCHEME).build();
+ }
+
+ public static URI toScheme(URI name, String newScheme) {
+ if (name == null || name.getScheme() == null || name.getScheme().length() == 0) {
+ return name;
+ }
+ return UriBuilder.fromUri(name).scheme(newScheme).build();
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManager.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManager.java
new file mode 100644
index 000000000..447c7d247
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.cacheutil;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+public interface CacheManager {
+
+ void init();
+
+ void put(ObjectId id);
+
+ FiberCache get(ObjectId id);
+
+ Boolean contains(ObjectId id);
+
+ void delete(ObjectId id);
+
+ void status();
+
+ FiberCache create(ObjectId id, Long length);
+
+ void seal(ObjectId id);
+
+ void release(ObjectId id);
+
+ FiberCache reCreate(ObjectId id, Long length);
+
+}
+
+class HashHelper {
+ static HashFunction hf = Hashing.murmur3_128();
+
+ public static byte[] hash(byte[] key) {
+ byte[] ret = new byte[20];
+ hf.newHasher().putBytes(key).hash().writeBytesTo(ret, 0, 20);
+ return ret;
+ }
+
+ public static byte[] hash(String key) {
+ return hash(key.getBytes());
+ }
+}
+
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManagerException.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManagerException.java
new file mode 100644
index 000000000..e06bdf825
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManagerException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.cacheutil;
+
+public class CacheManagerException extends RuntimeException {
+ public CacheManagerException() {
+ super();
+ }
+
+ public CacheManagerException(String message) {
+ super(message);
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManagerFactory.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManagerFactory.java
new file mode 100644
index 000000000..b394a67b9
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/CacheManagerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.cacheutil;
+
+public class CacheManagerFactory {
+ private static final Object lock = new Object();
+ private static CacheManager manager;
+
+ public static CacheManager getOrCreate() {
+ synchronized (lock) {
+ if(manager == null) {
+ manager = createCacheManager();
+ manager.init();
+ }
+ return manager;
+ }
+ }
+
+ private static CacheManager createCacheManager() {
+ // TODO: will use reflection to construct a new instance. For now, Let's just
+ // new a PlasmaCacheManager.
+ return new PlasmaCacheManager();
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/FiberCache.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/FiberCache.java
new file mode 100644
index 000000000..675507436
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/FiberCache.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.cacheutil;
+
+public interface FiberCache {
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/ObjectId.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/ObjectId.java
new file mode 100644
index 000000000..d73948b1d
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/ObjectId.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.cacheutil;
+
+public class ObjectId {
+ private String key;
+
+ public ObjectId(String key) {
+ this.key = key;
+ }
+
+ public String toString() {
+ return this.key;
+ }
+
+ public byte[] toByteArray() {
+ return HashHelper.hash(this.key);
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/PlasmaCacheManager.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/PlasmaCacheManager.java
new file mode 100644
index 000000000..4b0046b13
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/PlasmaCacheManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.cacheutil;
+
+import org.apache.arrow.plasma.PlasmaClient;
+import org.apache.arrow.plasma.exceptions.*;
+
+
+public class PlasmaCacheManager implements CacheManager {
+ private PlasmaClient client;
+
+ public void init() {
+ try {
+ System.loadLibrary("plasma_java");
+ } catch(Exception e) {
+ //This is ignored
+ }
+ // TODO: get socket via conf(hadoopConf/SparkConf)
+ client = new PlasmaClient("/tmp/plasmaStore", "", 0);
+ }
+
+ public void put(ObjectId id) {
+ throw new UnsupportedOperationException("Not support yet");
+ }
+
+ public FiberCache get(ObjectId id) {
+ // TODO: what if get an unsealed object? Let's throw an exception here,
+ // higher level should catch this exception and do some fall back.
+ try {
+ // TODO: should not return a ArrowFiberCache directly
+ return new SimpleFiberCache(client.getObjAsByteBuffer(id.toByteArray(), -1, false));
+ } catch(PlasmaGetException e) {
+ throw new CacheManagerException("Plasma exception:" + e.getMessage());
+ }
+ }
+
+ public Boolean contains(ObjectId id) {
+ return client.contains(id.toByteArray());
+ }
+
+ public void delete(ObjectId id) {
+ client.delete(id.toByteArray());
+ }
+
+ public void status() {
+ throw new UnsupportedOperationException("Not support yet");
+ }
+
+ public FiberCache create(ObjectId id, Long length) {
+ try {
+ // TODO: We should extend plasma.create to support larger size object.
+ if (length > Integer.MAX_VALUE) {
+ throw new ArithmeticException("Can't create $length bytes Object");
+ }
+ return new SimpleFiberCache(client.create(id.toByteArray(), length.intValue()));
+ } catch (DuplicateObjectException | PlasmaOutOfMemoryException e) {
+ throw new CacheManagerException("Plasma exception:" + e.getMessage());
+ }
+
+ }
+
+ public void seal(ObjectId id) {
+ try {
+ client.seal(id.toByteArray());
+ } catch (PlasmaClientException e) {
+ // TODO: print some log
+ }
+ }
+
+ public void release(ObjectId id) {
+ client.release(id.toByteArray());
+ }
+
+ public FiberCache reCreate(ObjectId id, Long length) {
+ seal(id);
+ release(id);
+ delete(id);
+ return create(id, length);
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/SimpleFiberCache.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/SimpleFiberCache.java
new file mode 100644
index 000000000..9874b56b9
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/cacheUtil/SimpleFiberCache.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.cacheutil;
+
+import java.nio.ByteBuffer;
+
+public class SimpleFiberCache implements FiberCache {
+ private ByteBuffer buffer;
+
+ public SimpleFiberCache(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisClient.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisClient.java
new file mode 100644
index 000000000..f15fe02d5
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisClient.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.redis;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.exceptions.JedisException;
+
+/**
+ * read and write data with redis.
+ * a singleton instance mechanism is needed.
+ */
+public class RedisClient {
+ private static final Logger LOG = LoggerFactory.getLogger(RedisClient.class);
+
+ private JedisPool jedisPool;
+
+ private String password = "";
+
+ private static volatile RedisClient instance;
+
+ private RedisClient(String host, int port, String auth, int maxTotal, int maxIdle) {
+ JedisPoolConfig config = new JedisPoolConfig();
+ config.setMaxTotal(maxTotal);
+ config.setMaxIdle(maxIdle);
+ jedisPool = new JedisPool(config, host, port);
+
+ password = auth;
+ }
+
+ public static RedisClient getInstance(String host,
+ int port, String auth, int maxTotal, int maxIdle) {
+ if (instance == null) {
+ synchronized (RedisClient.class) {
+ if (instance == null) {
+ instance = new RedisClient(host, port, auth, maxTotal, maxIdle);
+ }
+ }
+ }
+
+ return instance;
+ }
+
+ public JedisPool getJedisPool() {
+ return jedisPool;
+ }
+
+ /**
+ * @return Jedis
+ */
+ public Jedis getJedis() {
+ Jedis jedis = jedisPool.getResource();
+ if (!password.isEmpty()) {
+ jedis.auth(password);
+ }
+ return jedis;
+ }
+
+ /**
+ * jedis release
+ *
+ * @param jedis Jedis
+ */
+ public void close(Jedis jedis) {
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+
+ /**
+ * get
+ *
+ * @param key String
+ * @return String
+ */
+ public String get(String key) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.get(key);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "get", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * set
+ *
+ * @param key String
+ * @param value String
+ */
+ public void set(String key, String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ jedis.set(key, value);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "set", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * @param key String
+ * @return Long
+ */
+ public Long incr(String key) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.incr(key);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "incr", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * @param key String
+ * @param count long
+ * @return Long
+ */
+ public Long incrBy(String key, long count) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.incrBy(key, count);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "incrBy", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * hset
+ *
+ * @param key String
+ * @param field String
+ * @param value String
+ */
+ public void hset(String key, String field, String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ jedis.hset(key, field, value);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "hset", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * hget
+ *
+ * @param key String
+ * @param field String
+ * @return String
+ */
+ public String hget(String key, String field) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.hget(key, field);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "hget", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * hgetAll
+ *
+ * @param key String
+ * @return Map
+ */
+ public Map hgetAll(String key) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.hgetAll(key);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "hgetAll", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ public void lpush(String key, String... value) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ jedis.lpush(key, value);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "lpush", key);
+ throw new JedisException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param key String
+ * @param value String...
+ * @return Long
+ */
+ public Long sadd(String key, String... value) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.sadd(key, value);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "sadd", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * @param key String
+ * @return Set
+ */
+ public Set smembers(String key) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.smembers(key);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "smembers", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * @param key String
+ * @param score double
+ * @param member String
+ * @return Long
+ */
+ public Long zadd(String key, double score, String member) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.zadd(key, score, member);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "zadd", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ public Long zadd(String key, Map scoreMembers) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.zadd(key, scoreMembers);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "zadd", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ public Long zrem(String key, String... members) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.zrem(key, members);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "zrem", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * @param key String
+ * @param start long
+ * @param stop long
+ * @return Set
+ */
+ public Set zrange(String key, long start, long stop) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.zrange(key, start, stop);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "zrange", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * @param key String
+ * @param min double
+ * @param max double
+ * @return Set
+ */
+ public Set zrangeByScoreWithScores(String key, double min, double max) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.zrangeByScoreWithScores(key, min, max);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}",
+ e.toString(), "zrangeByScoreWithScores", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+ /**
+ * @param key String
+ * @param seconds int
+ * @return Long
+ */
+ public Long expire(String key, int seconds) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.expire(key, seconds);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "expire", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+ /**
+ * timeout for key
+ *
+ * @param key String
+ * @return long
+ */
+ public long ttl(String key) {
+ Jedis jedis = null;
+ try {
+ jedis = getJedis();
+ return jedis.ttl(key);
+ } catch (Exception e) {
+ LOG.error("redis exception: {}, when: {}, key: {}", e.toString(), "ttl", key);
+ throw new JedisException(e.getMessage(), e);
+ } finally {
+ close(jedis);
+ }
+ }
+
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisGlobalPMemCacheStatisticsStore.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisGlobalPMemCacheStatisticsStore.java
new file mode 100644
index 000000000..277f592df
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisGlobalPMemCacheStatisticsStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.redis;
+
+import com.intel.oap.fs.hadoop.cachedfs.Constants;
+import com.intel.oap.fs.hadoop.cachedfs.PMemCacheStatisticsStore;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedisGlobalPMemCacheStatisticsStore implements PMemCacheStatisticsStore {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RedisGlobalPMemCacheStatisticsStore.class);
+
+ private final Configuration conf;
+
+ public RedisGlobalPMemCacheStatisticsStore(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void reset() {
+ RedisUtils
+ .getRedisClient(this.conf)
+ .set(Constants.REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_HIT, "0");
+
+ RedisUtils
+ .getRedisClient(this.conf)
+ .set(Constants.REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_MISSED, "0");
+ }
+
+ @Override
+ public void incrementCacheHit(int count) {
+ RedisUtils
+ .getRedisClient(this.conf)
+ .incrBy(Constants.REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_HIT, count);
+ }
+
+ @Override
+ public void incrementCacheMissed(int count) {
+ RedisUtils
+ .getRedisClient(this.conf)
+ .incrBy(Constants.REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_MISSED, count);
+ }
+
+ @Override
+ public long getCacheHit() {
+ String result = RedisUtils
+ .getRedisClient(this.conf)
+ .get(Constants.REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_HIT);
+
+ if (result != null) {
+ try {
+ return Long.parseLong(result);
+ } catch (Exception ex) {
+ LOG.error("exception when parse cache hit count: {}", ex.toString());
+ throw ex;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public long getCacheMissed() {
+ String result = RedisUtils
+ .getRedisClient(this.conf)
+ .get(Constants.REDIS_KEY_PMEM_CACHE_GLOBAL_STATISTICS_CACHE_MISSED);
+
+ if (result != null) {
+ try {
+ return Long.parseLong(result);
+ } catch (Exception ex) {
+ LOG.error("exception when parse cache missed count: {}", ex.toString());
+ throw ex;
+ }
+ }
+
+ return 0;
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisPMemBlockLocationStore.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisPMemBlockLocationStore.java
new file mode 100644
index 000000000..c78fec262
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisPMemBlockLocationStore.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.intel.oap.fs.hadoop.cachedfs.PMemBlock;
+import com.intel.oap.fs.hadoop.cachedfs.PMemBlockLocation;
+import com.intel.oap.fs.hadoop.cachedfs.PMemBlockLocationStore;
+import org.apache.hadoop.conf.Configuration;
+import redis.clients.jedis.Tuple;
+
+/**
+ * Class for storage of block locations.
+ */
+public class RedisPMemBlockLocationStore implements PMemBlockLocationStore {
+ private static final String REDIS_ZSET_VALUE_DELIM = "_";
+
+ private final Configuration conf;
+
+ public RedisPMemBlockLocationStore(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void addBlockLocation(PMemBlock block, String host) {
+ if (block == null || host == null || host.trim().isEmpty()) {
+ return;
+ }
+
+ RedisUtils
+ .getRedisClient(this.conf)
+ .zadd(
+ block.getPath().toString(),
+ block.getOffset(),
+ String.format(
+ "%d%s%d%s%s",
+ block.getOffset(),
+ REDIS_ZSET_VALUE_DELIM,
+ block.getLength(),
+ REDIS_ZSET_VALUE_DELIM,
+ host)
+ );
+
+ }
+
+
+ @Override
+ public void addBlockLocations(List blocks, String host) {
+ if (blocks == null || blocks.isEmpty() || host == null || host.trim().isEmpty()) {
+ return;
+ }
+
+ String path = blocks.get(0).getPath().toString();
+
+ Map scoreMembers = new HashMap<>();
+ blocks.forEach(block -> {
+ scoreMembers.put(
+ String.format(
+ "%d%s%d%s%s",
+ block.getOffset(),
+ REDIS_ZSET_VALUE_DELIM,
+ block.getLength(),
+ REDIS_ZSET_VALUE_DELIM,
+ host
+ ),
+ (double) block.getOffset()
+ );
+ });
+
+ RedisUtils.getRedisClient(this.conf).zadd(path, scoreMembers);
+ }
+
+ @Override
+ public PMemBlockLocation getBlockLocation(PMemBlock block) {
+ if (block == null) {
+ return null;
+ }
+
+ PMemBlockLocation[] locations = this.getBlockLocations(new PMemBlock[]{block}, false);
+
+ if (locations.length > 0) {
+ return locations[0];
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get locations from redis.
+ * @param blocks PMemBlock[]
+ * @param consecutive boolean
+ * @return PMemBlockLocation[]
+ */
+ @Override
+ public PMemBlockLocation[] getBlockLocations(PMemBlock[] blocks, boolean consecutive) {
+ PMemBlockLocation[] ret = new PMemBlockLocation[0];
+
+ if (blocks == null || blocks.length == 0) {
+ return ret;
+ }
+
+ // consecutive blocks or not
+ if (consecutive) {
+ return this.getConsecutiveBlockLocations(blocks);
+ } else {
+ return this.getDiscreteBlockLocations(blocks);
+ }
+ }
+
+ /**
+ * Get locations from redis for consecutive blocks.
+ * @param blocks PMemBlock[]
+ * @return PMemBlockLocation[]
+ */
+ private PMemBlockLocation[] getConsecutiveBlockLocations(PMemBlock[] blocks) {
+ List result = new ArrayList<>();
+
+ long minOffset = blocks[0].getOffset();
+ long maxOffset = blocks[blocks.length - 1].getOffset();
+
+ // get locations with the right offset range
+ Set locationStrings = RedisUtils
+ .getRedisClient(this.conf)
+ .zrangeByScoreWithScores(blocks[0].getPath().toString(), minOffset, maxOffset);
+
+ // parse and get location info
+ if (locationStrings != null && locationStrings.size() > 0) {
+ for (PMemBlock block : blocks) {
+
+ PMemBlockLocation location = this.filterLocationInfo(block,locationStrings);
+
+ if (location != null) {
+ result.add(location);
+ }
+ }
+ }
+
+ return result.toArray(new PMemBlockLocation[0]);
+ }
+
+ /**
+ * Get locations from redis for discrete blocks.
+ * Discrete blocks will result a for-loop of redis look-up.
+ * @param blocks PMemBlock[]
+ * @return PMemBlockLocation[]
+ */
+ private PMemBlockLocation[] getDiscreteBlockLocations(PMemBlock[] blocks) {
+ List result = new ArrayList<>();
+
+ for (PMemBlock block : blocks) {
+
+ // get locations with the right offset range
+ Set locationStrings = RedisUtils
+ .getRedisClient(this.conf)
+ .zrangeByScoreWithScores(blocks[0].getPath().toString(),
+ block.getOffset(), block.getOffset());
+
+ PMemBlockLocation location = this.filterLocationInfo(block,locationStrings);
+
+ if (location != null) {
+ result.add(location);
+ }
+ }
+
+ return result.toArray(new PMemBlockLocation[0]);
+ }
+
+ private PMemBlockLocation filterLocationInfo(PMemBlock block, Set locationStrings) {
+ // hosts for current block
+ Set hosts = new HashSet<>();
+
+ // parse and get location info
+ if (locationStrings != null && locationStrings.size() > 0) {
+
+ // get locations for current block
+ locationStrings.forEach(t -> {
+ String[] parts = t.getElement().split(REDIS_ZSET_VALUE_DELIM);
+
+ if (parts.length >= 3) {
+ long offset = Long.parseLong(parts[0]);
+ long length = Long.parseLong(parts[1]);
+ String host = parts[2];
+
+ // check cached block's offset and length
+ if (offset == block.getOffset() && length == block.getLength()) {
+ hosts.add(host);
+ }
+ }
+ });
+ }
+
+ if (hosts.size() > 0) {
+ return new PMemBlockLocation(hosts.toArray(new String[0]), block);
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisUtils.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisUtils.java
new file mode 100644
index 000000000..7d6c03053
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/redis/RedisUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.redis;
+
+import com.intel.oap.fs.hadoop.cachedfs.Constants;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * redis utils
+ */
+public class RedisUtils {
+
+ /**
+ * get redis client based on config
+ * @param configuration Configuration
+ * @return RedisClient
+ */
+ public static RedisClient getRedisClient(Configuration configuration) {
+ String host = configuration.get(Constants.CONF_KEY_CACHED_FS_REDIS_HOST,
+ Constants.DEFAULT_REDIS_HOST);
+ int port = configuration.getInt(Constants.CONF_KEY_CACHED_FS_REDIS_PORT,
+ Constants.DEFAULT_REDIS_PORT);
+ String auth = configuration.get(Constants.CONF_KEY_CACHED_FS_REDIS_AUTH,
+ Constants.DEFAULT_REDIS_AUTH);
+ int maxTotal = configuration.getInt(Constants.CONF_KEY_CACHED_FS_REDIS_MAX_TOTAL,
+ Constants.DEFAULT_REDIS_POOL_MAX_TOTAL);
+ int maxIdle = configuration.getInt(Constants.CONF_KEY_CACHED_FS_REDIS_MAX_IDLE,
+ Constants.DEFAULT_REDIS_POOL_MAX_IDLE);
+
+ return RedisClient.getInstance(host, port, auth, maxTotal, maxIdle);
+ }
+}
diff --git a/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/unsafe/UnsafeUtils.java b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/unsafe/UnsafeUtils.java
new file mode 100644
index 000000000..57f809013
--- /dev/null
+++ b/HCFS-based-cache/src/main/java/com/intel/oap/fs/hadoop/cachedfs/unsafe/UnsafeUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.intel.oap.fs.hadoop.cachedfs.unsafe;
+
+import java.lang.reflect.Field;
+
+import com.intel.oap.fs.hadoop.cachedfs.Constants;
+import sun.misc.Unsafe;
+
+
+public class UnsafeUtils {
+ private static final Unsafe _UNSAFE;
+ public static final int BYTE_ARRAY_OFFSET;
+
+ static {
+ Unsafe unsafe;
+ try {
+ Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
+ unsafeField.setAccessible(true);
+ unsafe = (Unsafe)unsafeField.get((Object)null);
+ } catch (Throwable var4) {
+ unsafe = null;
+ }
+
+ _UNSAFE = unsafe;
+ if (_UNSAFE != null) {
+ BYTE_ARRAY_OFFSET = _UNSAFE.arrayBaseOffset(byte[].class);
+ } else {
+ BYTE_ARRAY_OFFSET = 0;
+ }
+ }
+
+ public static boolean available() {
+ return _UNSAFE != null;
+ }
+
+ public static void copyMemory(Object src, long srcOffset,
+ Object dst, long dstOffset, long length) {
+ long size;
+ if (dstOffset < srcOffset) {
+ while(length > 0L) {
+ size = Math.min(length, Constants.UNSAFE_COPY_MEMORY_STEP_LENGTH);
+ _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+ length -= size;
+ srcOffset += size;
+ dstOffset += size;
+ }
+ } else {
+ srcOffset += length;
+
+ for(dstOffset += length; length > 0L; length -= size) {
+ size = Math.min(length, Constants.UNSAFE_COPY_MEMORY_STEP_LENGTH);
+ srcOffset -= size;
+ dstOffset -= size;
+ _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+ }
+ }
+
+ }
+}
diff --git a/pom.xml b/pom.xml
index 17e4de7f5..3260bc0b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,6 +23,7 @@
Plasma-based-cache
+ HCFS-based-cache