-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathhive_metastore_client.py
386 lines (316 loc) · 13.8 KB
/
hive_metastore_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
"""Hive Metastore Client main class."""
import copy
from typing import List, Any, Tuple
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from thrift_files.libraries.thrift_hive_metastore_client.ThriftHiveMetastore import ( # type: ignore # noqa: E501
Client as ThriftClient,
)
from thrift_files.libraries.thrift_hive_metastore_client.ttypes import ( # type: ignore # noqa: E501
StorageDescriptor,
Partition,
FieldSchema,
Database,
AlreadyExistsException,
Table,
PartitionValuesRequest,
NoSuchObjectException,
)
class HiveMetastoreClient(ThriftClient):
"""User main interface with the metastore server methods."""
COL_TYPE_INCOMPATIBILITY_DISALLOW_CONFIG = (
"hive.metastore.disallow.incompatible.col.type.changes"
)
def __init__(self, host: str, port: int = 9083) -> None:
"""
Instantiates the client object for given host and port.
:param host: thrive metastore host. I.g.: https://xpto.com
:param port: hive metastore port. Default is 9083.
"""
protocol = self._init_protocol(host, port)
super().__init__(protocol)
@staticmethod
def _init_protocol(host: str, port: int) -> TBinaryProtocol:
"""
Instantiates the binary protocol object.
This object contains the implementation of the Thrift protocol driver.
:param host: thrive metastore host. I.g.: https://xpto.com
:param port: the hive metastore port
:return: the Thrift protocol driver
:rtype: thrift.protocol.TBinaryProtocol.TBinaryProtocol
"""
transport = TSocket.TSocket(host, int(port))
transport = TTransport.TBufferedTransport(transport)
return TBinaryProtocol.TBinaryProtocol(transport)
def open(self) -> "HiveMetastoreClient":
"""
Opens the connection with the Thrift server.
:return: HiveMetastoreClientConnector instance
"""
self._oprot.trans.open()
return self
def close(self) -> None:
"""Closes the connection with the Thrift server."""
self._oprot.trans.close()
def __enter__(self) -> "HiveMetastoreClient":
"""Handles the conn opening whenever the 'with' block statement is used."""
self.open()
return self
def __exit__(self, exc_type: str, exc_val: str, exc_tb: str) -> None:
"""Handles the conn closing after the code inside 'with' block is ended."""
self.close()
def add_columns_to_table(
self, db_name: str, table_name: str, columns: List[FieldSchema]
) -> None:
"""
Adds columns to a table.
:param db_name: database name of the table
:param table_name: table name
:param columns: columns to be added to the table
"""
table = self.get_table(dbname=db_name, tbl_name=table_name)
# add more columns to the list of columns
table.sd.cols.extend(columns)
# call alter table to add columns
self.alter_table(dbname=db_name, tbl_name=table_name, new_tbl=table)
def drop_columns_from_table(
self, db_name: str, table_name: str, columns: List[str]
) -> None:
"""
Drops columns from a table.
It encapsulates the logic of calling alter table with removed columns from
the list of columns, since hive does not have a drop command.
:param db_name: database name of the table
:param table_name: table name
:param columns: names of the columns to be dropped from the table
"""
if columns:
table = self.get_table(dbname=db_name, tbl_name=table_name)
# remove columns from the list of columns in table object
cols = []
for col in table.sd.cols:
if col.name not in columns:
cols.append(col)
table.sd.cols = cols
# Hive Metastore enforces that the schema prior and after
# an ALTER TABLE should be the same,
# however when dropping a column the schema will definitely change
self.setMetaConf(self.COL_TYPE_INCOMPATIBILITY_DISALLOW_CONFIG, "false")
# call alter table to drop columns removed from list of table columns
self.alter_table(dbname=db_name, tbl_name=table_name, new_tbl=table)
def add_partitions_if_not_exists(
self, db_name: str, table_name: str, partition_list: List[Partition]
) -> None:
"""
Add partitions to a table if it does not exist.
If a partition is added twice, the method handles the
AlreadyExistsException, not raising the exception.
:param db_name: database name where the table is at
:param table_name: table name which the partitions belong to
:param partition_list: list of partitions to be added to the table
"""
if not partition_list:
raise ValueError(
"m=add_partitions_if_not_exists, msg=The partition list is empty."
)
table = self.get_table(dbname=db_name, tbl_name=table_name)
partition_list_with_correct_location = self._format_partitions_location(
partition_list=partition_list,
table_storage_descriptor=table.sd,
table_partition_keys=table.partitionKeys,
)
for partition in partition_list_with_correct_location:
try:
self.add_partition(partition)
except AlreadyExistsException:
pass
def add_partitions_to_table(
self, db_name: str, table_name: str, partition_list: List[Partition]
) -> None:
"""
Add partitions to a table.
If any partition of partition_list already exists, an
AlreadyExistsException, will be thrown and no partition
will be added.
:param db_name: database name where the table is at
:param table_name: table name which the partitions belong to
:param partition_list: list of partitions to be added to the table
"""
if not partition_list:
raise ValueError(
"m=add_partitions_if_not_exists, msg=The partition list is empty."
)
table = self.get_table(dbname=db_name, tbl_name=table_name)
partition_list_with_correct_location = self._format_partitions_location(
partition_list=partition_list,
table_storage_descriptor=table.sd,
table_partition_keys=table.partitionKeys,
)
self.add_partitions(partition_list_with_correct_location)
def create_database_if_not_exists(self, database: Database) -> None:
"""
Creates the table in Hive Metastore if it does not exist.
Since hive metastore server and thrift mapping do not have the option
of checking if the database does not exist, this method simulates this
this behavior.
:param database: the database object
"""
try:
self.create_database(database)
except AlreadyExistsException:
pass
def create_external_table(self, table: Table) -> None:
"""
Creates an external table in Hive Metastore.
When a table is created with tableType default (None) or equal to
EXTERNAL_TABLE there is an error in the server that creates the table
as a MANAGED_TABLE.
This method enforces the parameter EXTERNAL=TRUE so the table is
created correctly.
:param table: the table object
"""
table.parameters = {"EXTERNAL": "TRUE"}
table.tableType = "EXTERNAL_TABLE"
self.create_table(table)
@staticmethod
def _format_partitions_location(
partition_list: List[Partition],
table_storage_descriptor: StorageDescriptor,
table_partition_keys: List[FieldSchema],
) -> List[Partition]:
"""
Format the location of partitions, adding a specific value to each object.
It is based on the location of the Table plus the provided
individual values per Partition.
:param partition_list: list of partitions
:param table_storage_descriptor: the object StorageDescriptor related
to the Table
:param table_partition_keys: list of columns that are defined as the
Table partitions
:return: list of partitions with the correct location
"""
# identify partitions key from table definition
partition_keys = []
for key in table_partition_keys:
partition_keys.append(key.name)
for partition in partition_list:
HiveMetastoreClient._validate_lists_length(partition_keys, partition.values)
# organize keys and values in partition expected format
location_suffix = [
partition_name + "=" + value
for partition_name, value in zip(partition_keys, partition.values)
]
current_storage_descriptor = copy.deepcopy(table_storage_descriptor)
current_storage_descriptor.location += "/" + "/".join(location_suffix)
# set the changed storage_descriptor to the current partition
partition.sd = current_storage_descriptor
return partition_list
@staticmethod
def _validate_lists_length(list_a: List[Any], list_b: List[Any]) -> None:
"""
Validate if the two list have the same length.
:param list_a: first list to be compared
:param list_b: second list to be compared
"""
if len(list_a) != len(list_b):
raise ValueError(
"m=_validate_lists_length, msg=The length of the two provided "
"lists does not match"
)
def get_partition_keys_objects(
self, db_name: str, table_name: str
) -> List[FieldSchema]:
"""
Gets the partition keys objects, containing the metadata, from a table.
An empty list will be returned when no table is found or
when the table has no partitions
:param db_name: database name where the table is at
:param table_name: table name which the partition keys belong to
"""
table = self.get_table(dbname=db_name, tbl_name=table_name)
return list(table.partitionKeys) if table else []
def get_partition_keys_names(self, db_name: str, table_name: str) -> List[str]:
"""
Gets the partition keys names from a table.
An empty list will be returned when no table is found or
when the table has no partitions
:param db_name: database name where the table is at
:param table_name: table name which the partition keys belong to
"""
partition_keys = self.get_partition_keys_objects(
db_name=db_name, table_name=table_name
)
return [partition.name for partition in partition_keys]
def get_partition_keys(
self, db_name: str, table_name: str
) -> List[Tuple[str, str]]:
"""
Gets the partition keys from a table as a tuple: (name, type).
An empty list will be returned when no table is found or
when the table has no partitions.
:param db_name: database name where the table is at
:param table_name: table name which the partition keys belong to
"""
partition_keys = self.get_partition_keys_objects(
db_name=db_name, table_name=table_name
)
return [(partition.name, partition.type) for partition in partition_keys]
def bulk_drop_partitions(
self,
db_name: str,
table_name: str,
partition_list: List[List[str]],
delete_data: bool = False,
) -> None:
"""
Drops the partitions values from the partition list.
This methods simulates a bulk drop for the user, since the server only
supports an unitary drop.
If some partition cannot be dropped an exception will be thrown in the
end of execution.
:param db_name: database name of the table
:param table_name: table name
:param partition_list: the partitions to be dropped
:param delete_data: indicates whether the data respective to the
partition should be dropped in the source.
:raises: NoSuchObjectException
"""
partitions_not_dropped = []
for partition_values in partition_list:
try:
self.drop_partition(db_name, table_name, partition_values, delete_data)
except NoSuchObjectException:
partitions_not_dropped.append(partition_values)
if partitions_not_dropped:
raise NoSuchObjectException(
"m=bulk_drop_partitions, partitions_not_dropped="
f"{partitions_not_dropped}, msg=Some partition values were not "
"dropped because they do not exist."
)
def get_partition_values_from_table(
self, db_name: str, table_name: str
) -> List[List[str]]:
"""
Gets the partition names from a table.
It automatically fetches the table's partition keys.
An empty list will be returned when no table is found or
when the table has no partitions.
:param db_name: database name where the table is at
:param table_name: table name which the partitions belong to
"""
try:
partition_values_response = self.get_partition_values(
PartitionValuesRequest(
dbName=db_name,
tblName=table_name,
partitionKeys=self.get_partition_keys_objects(
db_name=db_name, table_name=table_name
),
)
)
partitions = [
partition.row for partition in partition_values_response.partitionValues
]
except Exception:
partitions = []
return partitions