forked from pydicom/pydicom
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilebase.py
165 lines (130 loc) · 6.08 KB
/
filebase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# filebase.py
"""Hold DicomFile class, which does basic I/O for a dicom file."""
# Copyright (c) 2008-2012 Darcy Mason
# This file is part of pydicom, released under a modified MIT license.
# See the file license.txt included with this distribution, also
# available at https://github.com/darcymason/pydicom
from __future__ import absolute_import
from pydicom.tag import Tag
from struct import unpack, pack
from io import BytesIO
from pydicom.config import logger
class DicomIO(object):
"""File object which holds transfer syntax info and anything else we need."""
max_read_attempts = 3 # number of times to read if don't get requested bytes
defer_size = None # default
def __init__(self, *args, **kwargs):
self._implicit_VR = True # start with this by default
def __del__(self):
self.close()
def read_le_tag(self):
"""Read and return two unsigned shorts (little endian) from the file."""
bytes_read = self.read(4)
if len(bytes_read) < 4:
raise EOFError # needed for reading "next" tag when at end of file
return unpack(b"<HH", bytes_read)
def read_be_tag(self):
"""Read and return two unsigned shorts (little endian) from the file."""
bytes_read = self.read(4)
if len(bytes_read) < 4:
raise EOFError # needed for reading "next" tag when at end of file
return unpack(b">HH", bytes_read)
def write_tag(self, tag):
"""Write a dicom tag (two unsigned shorts) to the file."""
tag = Tag(tag) # make sure is an instance of class, not just a tuple or int
self.write_US(tag.group)
self.write_US(tag.element)
def read_leUS(self):
"""Return an unsigned short from the file with little endian byte order"""
return unpack(b"<H", self.read(2))[0]
def read_beUS(self):
"""Return an unsigned short from the file with big endian byte order"""
return unpack(b">H", self.read(2))[0]
def read_leUL(self):
"""Return an unsigned long read with little endian byte order"""
return unpack(b"<L", self.read(4))[0]
def read(self, length=None, need_exact_length=True):
"""Reads the required length, returns EOFError if gets less
If length is None, then read all bytes
"""
parent_read = self.parent_read # super(DicomIO, self).read
if length is None:
return parent_read() # get all of it
bytes_read = parent_read(length)
if len(bytes_read) < length and need_exact_length:
# Didn't get all the desired bytes. Keep trying to get the rest. If reading across network, might want to add a delay here
attempts = 0
while attempts < self.max_read_attempts and len(bytes_read) < length:
bytes_read += parent_read(length - len(bytes_read))
attempts += 1
if len(bytes_read) < length:
start_pos = self.tell() - len(bytes_read)
msg = "Unexpected end of file. "
msg += "Read {0} bytes of {1} expected starting at position 0x{2:x}".format(len(bytes_read), length, start_pos)
raise EOFError(msg)
return bytes_read
def write_leUS(self, val):
"""Write an unsigned short with little endian byte order"""
self.write(pack(b"<H", val))
def write_leUL(self, val):
"""Write an unsigned long with little endian byte order"""
self.write(pack(b"<L", val))
def write_beUS(self, val):
"""Write an unsigned short with big endian byte order"""
self.write(pack(b">H", val))
def write_beUL(self, val):
"""Write an unsigned long with big endian byte order"""
self.write(pack(b">L", val))
write_US = write_leUS # XXX should we default to this?
write_UL = write_leUL # XXX "
def read_beUL(self):
"""Return an unsigned long read with big endian byte order"""
return unpack(b">L", self.read(4))[0]
# Set up properties is_little_endian and is_implicit_VR
# Big/Little Endian changes functions to read unsigned short or long, e.g. length fields etc
@property
def is_little_endian(self):
return self._little_endian
@is_little_endian.setter
def is_little_endian(self, value):
self._little_endian = value
if value: # Little Endian
self.read_US = self.read_leUS
self.read_UL = self.read_leUL
self.write_US = self.write_leUS
self.write_UL = self.write_leUL
self.read_tag = self.read_le_tag
else: # Big Endian
self.read_US = self.read_beUS
self.read_UL = self.read_beUL
self.write_US = self.write_beUS
self.write_UL = self.write_beUL
self.read_tag = self.read_be_tag
@property
def is_implicit_VR(self):
return self._implicit_VR
@is_implicit_VR.setter
def is_implicit_VR(self, value):
self._implicit_VR = value
class DicomFileLike(DicomIO):
def __init__(self, file_like_obj):
self.parent = file_like_obj
self.parent_read = getattr(file_like_obj, "read", self.no_read)
self.write = getattr(file_like_obj, "write", self.no_write)
self.seek = getattr(file_like_obj, "seek", self.no_seek)
self.tell = file_like_obj.tell
self.close = file_like_obj.close
self.name = getattr(file_like_obj, 'name', '<no filename>')
def no_write(self, bytes_read):
"""Used for file-like objects where no write is available"""
raise IOError("This DicomFileLike object has no write() method")
def no_read(self, bytes_read):
"""Used for file-like objects where no read is available"""
raise IOError("This DicomFileLike object has no read() method")
def no_seek(offset, from_what):
"""Used for file-like objects where no seek is available"""
raise IOError("This DicomFileLike object has no seek() method")
def DicomFile(*args, **kwargs):
return DicomFileLike(open(*args, **kwargs))
def DicomBytesIO(*args, **kwargs):
return DicomFileLike(BytesIO(*args, **kwargs))