Skip to content

Commit

Permalink
aerospike online store changes feast-dev#1
Browse files Browse the repository at this point in the history
  • Loading branch information
rana-daoud committed Dec 14, 2023
1 parent 774ed33 commit 11b11f2
Show file tree
Hide file tree
Showing 15 changed files with 707 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# AEROSPIKE Online Store
Aerospike is not included in current [Feast](https://github.com/feast-dev/feast) roadmap, this project intends to add
Aerospike support for Online Store.

[//]: # (We create a table <project_name>_<feature_view_name> which gets updated with data on every materialize call)


#### Create a feature repository

```shell
feast init feature_repo
cd feature_repo
```

#### Edit `feature_store.yaml`

set `online_store` type to be `aerospike`

```yaml
project: feature_repo
registry: data/registry.db
provider: local
online_store:
type: aerospike
host: 127.0.0.1 # aerospike endpoint, default to 127.0.0.1
port: 3000 # aerospike port, default to 3000
# user: test # aerospike user, default to test
# password: test # aerospike password, default to test
database: feast # aerospike database, default to feast
timeout: 50000
```
#### Apply the feature definitions in `example.py`

```shell
feast -c feature_repo apply
```
##### Output
```
Registered entity driver_id
Registered feature view driver_hourly_stats_view
Deploying infrastructure for driver_hourly_stats_view
```
### Materialize Latest Data to Online Feature Store (Mysql)
```
$ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
$ feast -c feature_repo materialize-incremental $CURRENT_TIME
```
#### Output
```
Materializing 1 feature views from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30 into the mysql online store.

driver_hourly_stats_view from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30:
100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 120.59it/s]
```
### Fetch the latest features for some entity id
```python
from pprint import pprint
from feast import FeatureStore
store = FeatureStore(repo_path=".")
feature_vector = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[
{"driver_id": 1004},
{"driver_id": 1005},
],
).to_dict()
pprint(feature_vector)
```
#### Output
```
{'acc_rate': [0.01390857808291912, 0.4063614010810852],
'avg_daily_trips': [69, 706],
'conv_rate': [0.6624961495399475, 0.7595928311347961],
'driver_id': [1004, 1005]}
```
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging
import aerospike


logger = logging.getLogger(__name__)


class AerospikeClient:
_client: aerospike.Client = None

def __init__(self, config, logger):
self.config = config
self._logger = logger
client_config = {
'hosts': [(self.config['host'], int(self.config['port']))],
'policies': {
'timeout': int(self.config['timeout'])
},
'user': self.config.get('username'),
'password': self.config.get('password')
}
self._client = aerospike.client(client_config)
self._client.connect(client_config['user'], client_config['password'])
self._logger.info("Aerospike client is connected successfully")

def update_record_if_existed(self, namespace, set_name, key, bins):
if not namespace or not set_name or not key or not bins:
self._logger.error("One of the required params [namespace, set_name, key, bins] is None")
return False
try:
update_policy = {'exists': aerospike.POLICY_EXISTS_UPDATE} # update only if key exists
self._client.put((namespace, set_name, key), bins, policy=update_policy)
return True
except Exception as ex:
self._logger.error(f"Failed to update record with primary key [{key}] : {str(ex)}")
return False

def put_record(self, namespace, set_name, key, bins):
try:
self._client.put((namespace, set_name, key), bins)
except Exception as ex:
self._logger.error("Failed to put record with primary key [{key}] : {error_msg}"
.format(key=key, error_msg=str(ex)))

def remove_record(self, namespace, set_name, key):
try:
self._client.remove((namespace, set_name, key))
except Exception as ex:
self._logger.error("Failed to remove record with primary key [{key}] : {error_msg}"
.format(key=key, error_msg=str(ex)))

def is_record_exists(self, namespace, set_name, key):
try:
(key, metadata) = self._client.exists((namespace, set_name, key))
if metadata is None:
return False
return True
except Exception as ex:
self._logger.error("Failed to check if record with primary key [{key}] exists: {error_msg}"
.format(key=key, error_msg=str(ex)))

def get_record(self, namespace, set_name, key):
try:
(key, metadata, bins) = self._client.get((namespace, set_name, key))
return bins
except Exception as ex:
self._logger.error("Failed to get record for primary key [{key}]: {error_msg}"
.format(key=key, error_msg=str(ex)))

def get_records(self, namespace, set_name, primary_keys):
try:
result = {}
key_list = []
for primary_key in primary_keys:
key = (namespace, set_name, primary_key)
key_list.append(key)

records = self._client.get_many(key_list)
if records is not None:
for record in records:
primary_key = record[0][2] # extract primary key from record
bins = record[2] # extract bins from record
if bins is not None:
result[primary_key] = bins

self._logger.info("Found {count} records for keys {keys}.".format(count=len(result), keys=primary_keys))
return result
except Exception as ex:
self._logger.error("Failed to get records :" + str(ex))

def select_bins_of_record(self, namespace, set_name, key, bins_to_select):
try:
(key, meta, bins) = self._client.select((namespace, set_name, key), bins_to_select)
return bins
except Exception as ex:
self._logger.error("Failed to select bins of record :" + str(ex))

def close(self):
try:
if self._client and self._client.is_connected():
self._client.close()
self._logger.info("Aerospike client was closed successfully")
except Exception as ex:
self._logger.error("Failed to close client :" + str(ex))
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import base64
import logging
from datetime import datetime
from typing import Sequence, List, Optional, Tuple, Dict, Callable, Any

import pytz

import feast.type_map
from feast import RepoConfig, FeatureView, Entity
from feast.infra.online_stores.contrib.aerospike_online_store.aerospike_client import AerospikeClient

from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel

logger = logging.getLogger(__name__)

AEROSPIKE_NAMESPACE = 'aerospike_namespace'
AEROSPIKE_SET_NAME = 'aerospike_set_name'


class AerospikeOnlineStoreConfig(FeastConfigBaseModel):
"""
Configuration for the Aerospike online store.
NOTE: The class *must* end with the `OnlineStoreConfig` suffix.
"""
type = "aerospike"
aerospike_config: dict = {}
feature_views_config: dict = {} # map feature_view to namespace/set in aerospike


class AerospikeOnlineStore(OnlineStore):
"""
An online store implementation that uses Aerospike.
NOTE: The class *must* end with the `OnlineStore` suffix.
"""
def __init__(self):
logger.info("Initializing aerospike online store")

def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
logger.info("AerospikeOnlineStore - UPDATE: feast apply was invoked")

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
feature_view_name = table.name
logger.info(f"AerospikeOnlineStore - Starting online write for feature view [{feature_view_name}]")
start = datetime.now()

client = AerospikeClient(config.online_store.aerospike_config, logger)

self.update_records(client, data, feature_view_name, progress, config)
client.close()
total_time = datetime.now() - start
logger.info(f"AerospikeOnlineStore - Finished online write successfully for feature view [{feature_view_name}]."
f"Total time in seconds: {total_time.seconds}")

def update_records(self, client, data, feature_view_name, progress, config):
feature_views_config = config.online_store.feature_views_config
feature_view_details = feature_views_config[feature_view_name]

failure_count = 0
for entity_key, values, timestamp, created in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
)

# timestamp = _to_naive_utc(timestamp)
# TODO make it general from ValueProto
entity_key_value = entity_key.entity_values.pop()
# entity_key_value = entity_key.entity_values[0].int64_val
aerospike_key = entity_key_value
feature_values_dict = self.generate_feature_values_dict(values)
bins_to_update = {
feature_view_name: feature_values_dict
}
# insert/update the bin based on primary key
success = client.update_record_if_existed(feature_view_details[AEROSPIKE_NAMESPACE],
feature_view_details[AEROSPIKE_SET_NAME],
aerospike_key,
bins_to_update)
if not success:
failure_count += 1
if progress and success:
progress(1)

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
logger.info(f"AerospikeOnlineStore - Starting online read from feature view [{table.name}]")
feature_views_config = config.online_store.feature_views_config

aerospike_keys = [item.entity_values[0].string_val for item in entity_keys]

client = AerospikeClient(config.online_store.aerospike_config, logger)
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
try:
feature_view_name = table.name
feature_view_details = feature_views_config[feature_view_name]
records = client.get_records(feature_view_details[AEROSPIKE_NAMESPACE],
feature_view_details[AEROSPIKE_SET_NAME],
aerospike_keys)

result = self._prepare_read_result(feature_view_name, records, requested_features)
logger.info(f"AerospikeOnlineStore - Finished online read successfully from feature view [{table.name}]")
except Exception as ex:
logger.error(f"AerospikeOnlineStore - Failed while updating records of feature view [{table.name}]" + str(ex))
finally:
client.close()

return result

@staticmethod
def generate_feature_values_dict(values: Dict[str, ValueProto]):
feature_values_dict = {}
for feature_name, val in values.items():
# result is tuple of field descriptor and value (<FileDescriptor>, <value>)
field = val.ListFields()[0]
# get value of field from tuple
if field:
feature_values_dict[feature_name] = field[1]

return feature_values_dict

@staticmethod
def _prepare_read_result(feature_view_name, records, requested_features):
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for primary_key, bins in records.items():
features = bins[feature_view_name]
res = {}
for feature_name, value in features.items():
if feature_name in requested_features:
res[feature_name] = feast.type_map.python_values_to_proto_values([value])[0]
if res:
# timestamp = None
result.append((None, res))
return result

def _to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)

def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
):
logger.info("AerospikeOnlineStore - teardown was invoked")

Empty file.
Loading

0 comments on commit 11b11f2

Please sign in to comment.