Skip to content

Commit

Permalink
Close input stream when reading a file (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Oct 24, 2024
1 parent 03f2710 commit 8a88c65
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
7 changes: 7 additions & 0 deletions hadoop_fs_wrapper/models/buffered_input_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ def from_input_stream(cls, jvm, input_stream):
:return: BufferedOutputStream
"""
return cls(jvm.java.io.BufferedInputStream(input_stream))

def close(self):
"""
Closes this input stream and releases any system resources associated with the stream.
:return:
"""
return self.underlying.close()
20 changes: 13 additions & 7 deletions hadoop_fs_wrapper/wrappers/file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from pyspark.sql import SparkSession

from hadoop_fs_wrapper.models.buffered_input_stream import BufferedInputStream
from hadoop_fs_wrapper.wrappers.hadoop_fs_wrapper import HadoopFsWrapper
from hadoop_fs_wrapper.models.hadoop_file_status import HadoopFileStatus
from hadoop_fs_wrapper.models.file_status import FileStatus
Expand Down Expand Up @@ -157,14 +158,19 @@ def write(self, path: str, data: str, overwrite=False, encoding="utf-8"):

def read(self, path: str, encoding="utf-8") -> str:
"""
reads stringdata from file
reads string data from file
:param path: path to read
:param encoding: encoding to read
:return: string
"""
stream = self._fsw.open(self._fsw.path(path))
buffered_stream = self._fsw.buffered_input_stream(stream)
input_stream_reader = self._fsw.input_stream_reader(buffered_stream, encoding)
buffered_reader = self._fsw.buffered_reader(input_stream_reader)
lines_collector = self._jvm.java.util.stream.Collectors.joining("\n")
return buffered_reader.underlying.lines().collect(lines_collector)
buffered_stream: BufferedInputStream | None = None
try:
stream = self._fsw.open(self._fsw.path(path))
buffered_stream = self._fsw.buffered_input_stream(stream)
input_stream_reader = self._fsw.input_stream_reader(buffered_stream, encoding)
buffered_reader = self._fsw.buffered_reader(input_stream_reader)
lines_collector = self._jvm.java.util.stream.Collectors.joining("\n")
return buffered_reader.underlying.lines().collect(lines_collector)
finally:
if buffered_stream is not None:
buffered_stream.close()
2 changes: 1 addition & 1 deletion hadoop_fs_wrapper/wrappers/hadoop_fs_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def buffered_reader(self, input_reader: InputStreamReader) -> BufferedReader:
"""
return BufferedReader.from_reader(self._jvm, input_reader.underlying)

def input_stream_reader(self, input_stream: InputStreamReader, charset_name: str = None) -> InputStreamReader:
def input_stream_reader(self, input_stream: BufferedInputStream, charset_name: str = None) -> InputStreamReader:
"""
Wraps constructor java.io.InputStreamReader(InputStream in)
Creates an InputStreamReader that uses the named charset.
Expand Down

0 comments on commit 8a88c65

Please sign in to comment.