Skip to content

Commit

Permalink
Close Zstd Dictionary after execution to avoid any memory leak. (open…
Browse files Browse the repository at this point in the history
…search-project#9403)

* Close Dictionary after every execution to avoid any memory leak

Signed-off-by: Mohit Godwani <[email protected]>

* Close Dictionary after every execution to avoid any memory leak

Signed-off-by: Mohit Godwani <[email protected]>

* Add changelog

Signed-off-by: Mohit Godwani <[email protected]>

---------

Signed-off-by: Mohit Godwani <[email protected]>
(cherry picked from commit 5cc7313)
  • Loading branch information
mgodwan authored and andrross committed Aug 17, 2023
1 parent d555d1d commit 18d3f5a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- Fix flaky ResourceAwareTasksTests.testBasicTaskResourceTracking test ([#8993](https://github.com/opensearch-project/OpenSearch/pull/8993))
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ private void compress(byte[] bytes, int offset, int length, DataOutput out) thro

// dictionary compression first
doCompress(bytes, offset, dictLength, cctx, out);
cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel));
try (ZstdDictCompress dictCompress = new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)) {
cctx.loadDict(dictCompress);

for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
}
}
}
}
Expand Down Expand Up @@ -170,32 +172,33 @@ public void decompress(DataInput in, int originalLength, int offset, int length,

// decompress dictionary first
doDecompress(in, dctx, bytes, dictLength);

dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength));

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
try (ZstdDictDecompress dictDecompress = new ZstdDictDecompress(bytes.bytes, 0, dictLength)) {
dctx.loadDict(dictDecompress);

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}
}

Expand Down

0 comments on commit 18d3f5a

Please sign in to comment.