From fb9cd96e4c3f35bce62b420b05305d0a3896578f Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Sat, 21 Oct 2023 23:50:59 +0700 Subject: [PATCH 1/4] fix: Adopt connection pooling for HBase Signed-off-by: Hai Nguyen --- .../contrib/hbase_online_store/hbase.py | 13 +- sdk/python/feast/infra/utils/hbase_utils.py | 125 +++++++++++------- 2 files changed, 87 insertions(+), 51 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index 2636cf95e2..179893e6b4 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple -from happybase import Connection +from happybase import Connection, ConnectionPool from pydantic.typing import Literal from feast import Entity @@ -29,6 +29,9 @@ class HbaseOnlineStoreConfig(FeastConfigBaseModel): port: str """Port in which Hbase Thrift server is running""" + connection_pool_size: int = 4 + """Number of connections to Hbase Thrift server to keep in the connection pool""" + class HbaseConnection: """ @@ -62,7 +65,7 @@ class HbaseOnlineStore(OnlineStore): _conn: Happybase Connection to connect to hbase thrift server. """ - _conn: Connection = None + _conn: ConnectionPool = None def _get_conn(self, config: RepoConfig): """ @@ -76,7 +79,11 @@ def _get_conn(self, config: RepoConfig): assert isinstance(store_config, HbaseOnlineStoreConfig) if not self._conn: - self._conn = Connection(host=store_config.host, port=int(store_config.port)) + self._conn = ConnectionPool( + host=store_config.host, + port=int(store_config.port), + size=int(store_config.connection_pool_size), + ) return self._conn @log_exceptions_and_usage(online_store="hbase") diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py index 4816a60087..94d3f56213 100644 --- a/sdk/python/feast/infra/utils/hbase_utils.py +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -1,9 +1,6 @@ from typing import List -from happybase import Connection - -from feast.infra.key_encoding_utils import serialize_entity_key -from feast.protos.feast.types.EntityKey_pb2 import EntityKey +from happybase import ConnectionPool class HbaseConstants: @@ -40,14 +37,22 @@ class HbaseUtils: """ def __init__( - self, conn: Connection = None, host: str = None, port: int = None, timeout=None + self, + pool: ConnectionPool = None, + host: str = None, + port: int = None, + connection_pool_size: int = 4, ): - if conn is None: + if pool is None: self.host = host self.port = port - self.conn = Connection(host=host, port=port, timeout=timeout) + self.pool = ConnectionPool( + host=host, + port=port, + size=connection_pool_size, + ) else: - self.conn = conn + self.pool = pool def create_table(self, table_name: str, colm_family: List[str]): """ @@ -60,7 +65,9 @@ def create_table(self, table_name: str, colm_family: List[str]): cf_dict: dict = {} for cf in colm_family: cf_dict[cf] = dict() - return self.conn.create_table(table_name, cf_dict) + + with self.pool.connection() as conn: + return conn.create_table(table_name, cf_dict) def create_table_with_default_cf(self, table_name: str): """ @@ -69,7 +76,8 @@ def create_table_with_default_cf(self, table_name: str): Arguments: table_name: Name of the Hbase table. """ - return self.conn.create_table(table_name, {"default": dict()}) + with self.pool.connection() as conn: + return conn.create_table(table_name, {"default": dict()}) def check_if_table_exist(self, table_name: str): """ @@ -78,16 +86,18 @@ def check_if_table_exist(self, table_name: str): Arguments: table_name: Name of the Hbase table. """ - return bytes(table_name, "utf-8") in self.conn.tables() + with self.pool.connection() as conn: + return bytes(table_name, "utf-8") in conn.tables() def batch(self, table_name: str): """ - Returns a 'Batch' instance that can be used for mass data manipulation in the hbase table. + Returns a "Batch" instance that can be used for mass data manipulation in the hbase table. Arguments: table_name: Name of the Hbase table. """ - return self.conn.table(table_name).batch() + with self.pool.connection() as conn: + return conn.table(table_name).batch() def put(self, table_name: str, row_key: str, data: dict): """ @@ -98,8 +108,9 @@ def put(self, table_name: str, row_key: str, data: dict): row_key: Row key of the row to be inserted to hbase table. data: Mapping of column family name:column name to column values """ - table = self.conn.table(table_name) - table.put(row_key, data) + with self.pool.connection() as conn: + table = conn.table(table_name) + table.put(row_key, data) def row( self, @@ -119,8 +130,9 @@ def row( timestamp: timestamp specifies the maximum version the cells can have. include_timestamp: specifies if (column, timestamp) to be return instead of only column. """ - table = self.conn.table(table_name) - return table.row(row_key, columns, timestamp, include_timestamp) + with self.pool.connection() as conn: + table = conn.table(table_name) + return table.row(row_key, columns, timestamp, include_timestamp) def rows( self, @@ -140,52 +152,69 @@ def rows( timestamp: timestamp specifies the maximum version the cells can have. include_timestamp: specifies if (column, timestamp) to be return instead of only column. """ - table = self.conn.table(table_name) - return table.rows(row_keys, columns, timestamp, include_timestamp) + with self.pool.connection() as conn: + table = conn.table(table_name) + return table.rows(row_keys, columns, timestamp, include_timestamp) def print_table(self, table_name): """Prints the table scanning all the rows of the hbase table.""" - table = self.conn.table(table_name) - scan_data = table.scan() - for row_key, cols in scan_data: - print(row_key.decode("utf-8"), cols) + with self.pool.connection() as conn: + table = conn.table(table_name) + scan_data = table.scan() + for row_key, cols in scan_data: + print(row_key.decode("utf-8"), cols) def delete_table(self, table: str): """Deletes the hbase table given the table name.""" if self.check_if_table_exist(table): - self.conn.delete_table(table, disable=True) + with self.pool.connection() as conn: + conn.delete_table(table, disable=True) def close_conn(self): """Closes the happybase connection.""" - self.conn.close() + with self.pool.connection() as conn: + conn.close() def main(): + from feast.infra.key_encoding_utils import serialize_entity_key + from feast.protos.feast.types.EntityKey_pb2 import EntityKey from feast.protos.feast.types.Value_pb2 import Value - connection = Connection(host="localhost", port=9090) - table = connection.table("test_hbase_driver_hourly_stats") - row_keys = [ - serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1004)]), - entity_key_serialization_version=2, - ).hex(), - serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1005)]), - entity_key_serialization_version=2, - ).hex(), - serialize_entity_key( - EntityKey(join_keys=["driver_id"], entity_values=[Value(int64_val=1024)]), - entity_key_serialization_version=2, - ).hex(), - ] - rows = table.rows(row_keys) - - for row_key, row in rows: - for key, value in row.items(): - col_name = bytes.decode(key, "utf-8").split(":")[1] - print(col_name, value) - print() + pool = ConnectionPool( + host="localhost", + port=9090, + size=2, + ) + with pool.connection() as connection: + table = connection.table("test_hbase_driver_hourly_stats") + row_keys = [ + serialize_entity_key( + EntityKey( + join_keys=["driver_id"], entity_values=[Value(int64_val=1004)] + ), + entity_key_serialization_version=2, + ).hex(), + serialize_entity_key( + EntityKey( + join_keys=["driver_id"], entity_values=[Value(int64_val=1005)] + ), + entity_key_serialization_version=2, + ).hex(), + serialize_entity_key( + EntityKey( + join_keys=["driver_id"], entity_values=[Value(int64_val=1024)] + ), + entity_key_serialization_version=2, + ).hex(), + ] + rows = table.rows(row_keys) + + for _, row in rows: + for key, value in row.items(): + col_name = bytes.decode(key, "utf-8").split(":")[1] + print(col_name, value) + print() if __name__ == "__main__": From 9a1c5a2643926cdb443b1291738826f99347ac2f Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Sat, 21 Oct 2023 23:51:19 +0700 Subject: [PATCH 2/4] chore: Rename hbaseutils to HBaseConnector Signed-off-by: Hai Nguyen --- .../contrib/hbase_online_store/hbase.py | 36 ++++--------------- sdk/python/feast/infra/utils/hbase_utils.py | 2 +- 2 files changed, 7 insertions(+), 31 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index 179893e6b4..6c9e4d4b63 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -3,14 +3,14 @@ from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple -from happybase import Connection, ConnectionPool +from happybase import ConnectionPool from pydantic.typing import Literal from feast import Entity from feast.feature_view import FeatureView from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore -from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils +from feast.infra.utils.hbase_utils import HbaseConstants, HBaseConnector from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -33,30 +33,6 @@ class HbaseOnlineStoreConfig(FeastConfigBaseModel): """Number of connections to Hbase Thrift server to keep in the connection pool""" -class HbaseConnection: - """ - Hbase connecttion to connect to hbase. - - Attributes: - store_config: Online store config for Hbase store. - """ - - def __init__(self, store_config: HbaseOnlineStoreConfig): - self._store_config = store_config - self._real_conn = Connection( - host=store_config.host, port=int(store_config.port) - ) - - @property - def real_conn(self) -> Connection: - """Stores the real happybase Connection to connect to hbase.""" - return self._real_conn - - def close(self) -> None: - """Close the happybase connection.""" - self.real_conn.close() - - class HbaseOnlineStore(OnlineStore): """ Online feature store for Hbase. @@ -109,7 +85,7 @@ def online_write_batch( the online store. Can be used to display progress. """ - hbase = HbaseUtils(self._get_conn(config)) + hbase = HBaseConnector(self._get_conn(config)) project = config.project table_name = self._table_id(project, table) @@ -161,7 +137,7 @@ def online_read( entity_keys: a list of entity keys that should be read from the FeatureStore. requested_features: a list of requested feature names. """ - hbase = HbaseUtils(self._get_conn(config)) + hbase = HBaseConnector(self._get_conn(config)) project = config.project table_name = self._table_id(project, table) @@ -213,7 +189,7 @@ def update( tables_to_delete: Tables to delete from the Hbase Online Store. tables_to_keep: Tables to keep in the Hbase Online Store. """ - hbase = HbaseUtils(self._get_conn(config)) + hbase = HBaseConnector(self._get_conn(config)) project = config.project # We don't create any special state for the entites in this implementation. @@ -239,7 +215,7 @@ def teardown( config: The RepoConfig for the current FeatureStore. tables: Tables to delete from the feature repo. """ - hbase = HbaseUtils(self._get_conn(config)) + hbase = HBaseConnector(self._get_conn(config)) project = config.project for table in tables: diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py index 94d3f56213..d44f93f161 100644 --- a/sdk/python/feast/infra/utils/hbase_utils.py +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -25,7 +25,7 @@ def get_col_from_feature(feature): return HbaseConstants.DEFAULT_COLUMN_FAMILY + ":" + feature -class HbaseUtils: +class HBaseConnector: """ Utils class to manage different Hbase operations. From d272e75f25194cbe2726faa432439b2a6b64d69b Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Sat, 21 Oct 2023 23:51:50 +0700 Subject: [PATCH 3/4] feat: Expose thrift connection configuration to hbase online store config Signed-off-by: Hai Nguyen --- .../contrib/hbase_online_store/hbase.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index 6c9e4d4b63..1da9de89a8 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -4,13 +4,15 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple from happybase import ConnectionPool +from happybase.connection import DEFAULT_PROTOCOL, DEFAULT_TRANSPORT +from pydantic import StrictStr from pydantic.typing import Literal from feast import Entity from feast.feature_view import FeatureView from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore -from feast.infra.utils.hbase_utils import HbaseConstants, HBaseConnector +from feast.infra.utils.hbase_utils import HBaseConnector, HbaseConstants from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -23,15 +25,21 @@ class HbaseOnlineStoreConfig(FeastConfigBaseModel): type: Literal["hbase"] = "hbase" """Online store type selector""" - host: str + host: StrictStr """Hostname of Hbase Thrift server""" - port: str + port: StrictStr """Port in which Hbase Thrift server is running""" connection_pool_size: int = 4 """Number of connections to Hbase Thrift server to keep in the connection pool""" + protocol: StrictStr = DEFAULT_PROTOCOL + """Protocol used to communicate with Hbase Thrift server""" + + transport: StrictStr = DEFAULT_TRANSPORT + """Transport used to communicate with Hbase Thrift server""" + class HbaseOnlineStore(OnlineStore): """ @@ -59,6 +67,8 @@ def _get_conn(self, config: RepoConfig): host=store_config.host, port=int(store_config.port), size=int(store_config.connection_pool_size), + protocol=store_config.protocol, + transport=store_config.transport, ) return self._conn From 744df526cae2e266e1b6bc3e3749dd9f339f2575 Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Sun, 22 Oct 2023 00:20:30 +0700 Subject: [PATCH 4/4] fix: check fv entities emptiness before if the first one is dummy Signed-off-by: Hai Nguyen --- sdk/python/feast/feature_store.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 70f7d3dcb7..4b8200d96f 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -287,7 +287,11 @@ def _list_feature_views( for fv in self._registry.list_feature_views( self.project, allow_cache=allow_cache ): - if hide_dummy_entity and fv.entities[0] == DUMMY_ENTITY_NAME: + if ( + hide_dummy_entity + and fv.entities + and fv.entities[0] == DUMMY_ENTITY_NAME + ): fv.entities = [] fv.entity_columns = [] feature_views.append(fv)