Skip to content

Commit

Permalink
Move alluxio_properties into config
Browse files Browse the repository at this point in the history
  • Loading branch information
LuQQiu committed Mar 7, 2024
1 parent 8e646a4 commit ca3a938
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 54 deletions.
36 changes: 16 additions & 20 deletions alluxio/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from typing import Any
from typing import Dict
from typing import Optional

import humanfriendly

from alluxio.annotations import PublicAPI
from alluxio.const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
from alluxio.const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from alluxio.const import ALLUXIO_HASH_NODE_PER_WORKER_KEY
from alluxio.const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from alluxio.const import ALLUXIO_PAGE_SIZE_KEY
from alluxio.const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE


Expand All @@ -22,11 +19,15 @@ def __init__(
self,
etcd_hosts: Optional[str] = None,
worker_hosts: Optional[str] = None,
alluxio_properties: Optional[Dict[str, Any]] = None,
concurrency=64,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE,
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE,
etcd_username: Optional[str] = None,
etcd_password: Optional[str] = None,
concurrency=64,
):
"""
Initializes Alluxio client configuration.
Expand All @@ -36,8 +37,6 @@ def __init__(
in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both.
worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format.
Either etcd_hosts or worker_hosts should be provided, not both.
alluxio_properties (Optional[Dict[str, Any]], optional): A dictionary of Alluxio property key and values.
Note that Alluxio Client only supports a limited set of Alluxio properties.
concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64.
etcd_port (int, optional): The port of each etcd server.
worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node.
Expand Down Expand Up @@ -70,21 +69,9 @@ def __init__(
)
self.etcd_hosts = etcd_hosts
self.worker_hosts = worker_hosts
self.alluxio_properties = alluxio_properties
self.concurrency = concurrency
self.etcd_port = etcd_port
self.worker_http_port = worker_http_port
self.etcd_refresh_workers_interval = etcd_refresh_workers_interval
# parse options
page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
if alluxio_properties is not None:
if ALLUXIO_PAGE_SIZE_KEY in alluxio_properties:
page_size = alluxio_properties[ALLUXIO_PAGE_SIZE_KEY]
if ALLUXIO_HASH_NODE_PER_WORKER_KEY in alluxio_properties:
hash_node_per_worker = int(
alluxio_properties[ALLUXIO_HASH_NODE_PER_WORKER_KEY]
)
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
Expand All @@ -95,3 +82,12 @@ def __init__(

self.hash_node_per_worker = hash_node_per_worker
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.cluster_name = cluster_name

if (etcd_username is None) != (etcd_password is None):
raise ValueError(
"Both ETCD username and password must be set or both should be unset."
)
self.etcd_username = etcd_username
self.etcd_password = etcd_password
self.concurrency = concurrency
6 changes: 0 additions & 6 deletions alluxio/const.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
ALLUXIO_CLUSTER_NAME_KEY = "alluxio.cluster.name"
ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE = "DefaultAlluxioCluster"
ALLUXIO_ETCD_USERNAME_KEY = "alluxio.etcd.username"
ALLUXIO_ETCD_PASSWORD_KEY = "alluxio.etcd.password"
ALLUXIO_PAGE_SIZE_KEY = "alluxio.worker.page.store.page.size"
ALLUXIO_PAGE_SIZE_DEFAULT_VALUE = "1MB"
ALLUXIO_HASH_NODE_PER_WORKER_KEY = (
"alluxio.user.consistent.hash.virtual.node.count.per.worker"
)
ALLUXIO_WORKER_HTTP_SERVER_PORT_KEY = "alluxio.worker.http.server.port"
ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE = 28080
ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE = 5
Expand Down
5 changes: 3 additions & 2 deletions alluxio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,8 @@ def __init__(
etcd_port=int(etcd_port),
worker_hosts=worker_hosts,
worker_http_port=int(http_port),
alluxio_properties=options,
etcd_refresh_workers_interval=120,
page_size=page_size,
),
self.logger,
)
Expand Down Expand Up @@ -1012,12 +1012,13 @@ async def write_page(
return 200 <= status < 300

async def _range_page_generator(
self, worker_host: str, path_id: str, offset: float, length: float
self, worker_host: str, path_id: str, offset: int, length: int
):
start_page_index = offset // self.page_size
start_page_offset = offset % self.page_size

# Determine the end page index and the read-to position
end_page_read_to = self.page_size
if length == -1:
end_page_index = None
else:
Expand Down
32 changes: 6 additions & 26 deletions alluxio/worker_ring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time
import uuid
from dataclasses import dataclass
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
Expand All @@ -16,10 +15,6 @@
from sortedcontainers import SortedDict

from alluxio.config import AlluxioClientConfig
from alluxio.const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
from alluxio.const import ALLUXIO_CLUSTER_NAME_KEY
from alluxio.const import ALLUXIO_ETCD_PASSWORD_KEY
from alluxio.const import ALLUXIO_ETCD_USERNAME_KEY
from alluxio.const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE
from alluxio.const import ETCD_PREFIX_FORMAT

Expand Down Expand Up @@ -144,9 +139,9 @@ def from_host_and_port(worker_host, worker_http_port):
class EtcdClient:
def __init__(
self,
config: AlluxioClientConfig,
host="localhost",
port=2379,
alluxio_properties: Optional[Dict[str, Any]] = None,
):
self._host = host
self._port = port
Expand All @@ -155,26 +150,11 @@ def __init__(
self._etcd_username = None
self._etcd_password = None
self._prefix = ETCD_PREFIX_FORMAT.format(
cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
cluster_name=config.cluster_name
)
if alluxio_properties is not None:
if ALLUXIO_ETCD_USERNAME_KEY in alluxio_properties:
self._etcd_username = alluxio_properties[
ALLUXIO_ETCD_USERNAME_KEY
]
if ALLUXIO_ETCD_PASSWORD_KEY in alluxio_properties:
self._etcd_password = alluxio_properties[
ALLUXIO_ETCD_PASSWORD_KEY
]
if ALLUXIO_CLUSTER_NAME_KEY in alluxio_properties:
self._prefix = ETCD_PREFIX_FORMAT.format(
cluster_name=alluxio_properties[ALLUXIO_CLUSTER_NAME_KEY]
)

if (self._etcd_username is None) != (self._etcd_password is None):
raise ValueError(
"Both ETCD username and password must be set or both should be unset."
)
if config.etcd_username is not None:
self._etcd_username = config.etcd_username
self._etcd_password = config.etcd_password

def get_worker_entities(self) -> Set[WorkerEntity]:
"""
Expand Down Expand Up @@ -324,7 +304,7 @@ def _fetch_workers_and_update_ring(self):
worker_entities = EtcdClient(
host=host,
port=self.config.etcd_port,
alluxio_properties=self.config.alluxio_properties,
config=self.config,
).get_worker_entities()
break
except Exception:
Expand Down

0 comments on commit ca3a938

Please sign in to comment.