diff --git a/alluxio/config.py b/alluxio/config.py index 7aa5124..1a49fa2 100644 --- a/alluxio/config.py +++ b/alluxio/config.py @@ -1,14 +1,11 @@ -from typing import Any -from typing import Dict from typing import Optional import humanfriendly from alluxio.annotations import PublicAPI +from alluxio.const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE from alluxio.const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE -from alluxio.const import ALLUXIO_HASH_NODE_PER_WORKER_KEY from alluxio.const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE -from alluxio.const import ALLUXIO_PAGE_SIZE_KEY from alluxio.const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE @@ -22,11 +19,15 @@ def __init__( self, etcd_hosts: Optional[str] = None, worker_hosts: Optional[str] = None, - alluxio_properties: Optional[Dict[str, Any]] = None, - concurrency=64, etcd_port=2379, worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE, etcd_refresh_workers_interval=120, + page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE, + hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, + cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE, + etcd_username: Optional[str] = None, + etcd_password: Optional[str] = None, + concurrency=64, ): """ Initializes Alluxio client configuration. @@ -36,8 +37,6 @@ def __init__( in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both. worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both. - alluxio_properties (Optional[Dict[str, Any]], optional): A dictionary of Alluxio property key and values. - Note that Alluxio Client only supports a limited set of Alluxio properties. concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64. etcd_port (int, optional): The port of each etcd server. worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node. @@ -70,21 +69,9 @@ def __init__( ) self.etcd_hosts = etcd_hosts self.worker_hosts = worker_hosts - self.alluxio_properties = alluxio_properties - self.concurrency = concurrency self.etcd_port = etcd_port self.worker_http_port = worker_http_port self.etcd_refresh_workers_interval = etcd_refresh_workers_interval - # parse options - page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE - hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE - if alluxio_properties is not None: - if ALLUXIO_PAGE_SIZE_KEY in alluxio_properties: - page_size = alluxio_properties[ALLUXIO_PAGE_SIZE_KEY] - if ALLUXIO_HASH_NODE_PER_WORKER_KEY in alluxio_properties: - hash_node_per_worker = int( - alluxio_properties[ALLUXIO_HASH_NODE_PER_WORKER_KEY] - ) if ( not isinstance(hash_node_per_worker, int) or hash_node_per_worker <= 0 @@ -95,3 +82,12 @@ def __init__( self.hash_node_per_worker = hash_node_per_worker self.page_size = humanfriendly.parse_size(page_size, binary=True) + self.cluster_name = cluster_name + + if (etcd_username is None) != (etcd_password is None): + raise ValueError( + "Both ETCD username and password must be set or both should be unset." + ) + self.etcd_username = etcd_username + self.etcd_password = etcd_password + self.concurrency = concurrency diff --git a/alluxio/const.py b/alluxio/const.py index 3f68e67..6c55ca6 100644 --- a/alluxio/const.py +++ b/alluxio/const.py @@ -1,12 +1,6 @@ -ALLUXIO_CLUSTER_NAME_KEY = "alluxio.cluster.name" ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE = "DefaultAlluxioCluster" -ALLUXIO_ETCD_USERNAME_KEY = "alluxio.etcd.username" -ALLUXIO_ETCD_PASSWORD_KEY = "alluxio.etcd.password" ALLUXIO_PAGE_SIZE_KEY = "alluxio.worker.page.store.page.size" ALLUXIO_PAGE_SIZE_DEFAULT_VALUE = "1MB" -ALLUXIO_HASH_NODE_PER_WORKER_KEY = ( - "alluxio.user.consistent.hash.virtual.node.count.per.worker" -) ALLUXIO_WORKER_HTTP_SERVER_PORT_KEY = "alluxio.worker.http.server.port" ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE = 28080 ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE = 5 diff --git a/alluxio/core.py b/alluxio/core.py index 6956d4e..31f00e4 100644 --- a/alluxio/core.py +++ b/alluxio/core.py @@ -785,8 +785,8 @@ def __init__( etcd_port=int(etcd_port), worker_hosts=worker_hosts, worker_http_port=int(http_port), - alluxio_properties=options, etcd_refresh_workers_interval=120, + page_size=page_size, ), self.logger, ) @@ -1012,12 +1012,13 @@ async def write_page( return 200 <= status < 300 async def _range_page_generator( - self, worker_host: str, path_id: str, offset: float, length: float + self, worker_host: str, path_id: str, offset: int, length: int ): start_page_index = offset // self.page_size start_page_offset = offset % self.page_size # Determine the end page index and the read-to position + end_page_read_to = self.page_size if length == -1: end_page_index = None else: diff --git a/alluxio/worker_ring.py b/alluxio/worker_ring.py index 9ec06d6..bc1abee 100644 --- a/alluxio/worker_ring.py +++ b/alluxio/worker_ring.py @@ -5,7 +5,6 @@ import time import uuid from dataclasses import dataclass -from typing import Any from typing import Dict from typing import List from typing import Optional @@ -16,10 +15,6 @@ from sortedcontainers import SortedDict from alluxio.config import AlluxioClientConfig -from alluxio.const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE -from alluxio.const import ALLUXIO_CLUSTER_NAME_KEY -from alluxio.const import ALLUXIO_ETCD_PASSWORD_KEY -from alluxio.const import ALLUXIO_ETCD_USERNAME_KEY from alluxio.const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE from alluxio.const import ETCD_PREFIX_FORMAT @@ -144,9 +139,9 @@ def from_host_and_port(worker_host, worker_http_port): class EtcdClient: def __init__( self, + config: AlluxioClientConfig, host="localhost", port=2379, - alluxio_properties: Optional[Dict[str, Any]] = None, ): self._host = host self._port = port @@ -155,26 +150,11 @@ def __init__( self._etcd_username = None self._etcd_password = None self._prefix = ETCD_PREFIX_FORMAT.format( - cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE + cluster_name=config.cluster_name ) - if alluxio_properties is not None: - if ALLUXIO_ETCD_USERNAME_KEY in alluxio_properties: - self._etcd_username = alluxio_properties[ - ALLUXIO_ETCD_USERNAME_KEY - ] - if ALLUXIO_ETCD_PASSWORD_KEY in alluxio_properties: - self._etcd_password = alluxio_properties[ - ALLUXIO_ETCD_PASSWORD_KEY - ] - if ALLUXIO_CLUSTER_NAME_KEY in alluxio_properties: - self._prefix = ETCD_PREFIX_FORMAT.format( - cluster_name=alluxio_properties[ALLUXIO_CLUSTER_NAME_KEY] - ) - - if (self._etcd_username is None) != (self._etcd_password is None): - raise ValueError( - "Both ETCD username and password must be set or both should be unset." - ) + if config.etcd_username is not None: + self._etcd_username = config.etcd_username + self._etcd_password = config.etcd_password def get_worker_entities(self) -> Set[WorkerEntity]: """ @@ -324,7 +304,7 @@ def _fetch_workers_and_update_ring(self): worker_entities = EtcdClient( host=host, port=self.config.etcd_port, - alluxio_properties=self.config.alluxio_properties, + config=self.config, ).get_worker_entities() break except Exception: