Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preliminary support for on-prem transfers with HDFS and POSIX path support #735

Merged
merged 35 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
42aa7a9
File system interface
ShishirPatil Nov 16, 2022
3b1e8b2
Moving away from mnt
ShishirPatil Nov 22, 2022
a5ee402
Iterator return object of type file
ShishirPatil Nov 22, 2022
a103950
Merge branch 'dev/gateway' into 'dev/shishir/on-prem'
ShishirPatil Nov 22, 2022
4b13c6d
[onprem] HDFS Interface (#684)
HaileyJang Dec 7, 2022
e9d333c
Revert "[onprem] HDFS Interface " (#716)
ShishirPatil Dec 7, 2022
ee4fa78
[onprem] HDFS Interface (#719)
HaileyJang Dec 30, 2022
a1aa397
Fix test_hdfs workflow to terminate the cluster whenever
HaileyJang Dec 30, 2022
295b37a
Upstream POSIX file system interface
ShishirPatil Jan 2, 2023
353964e
[On-prem] Fix Test_HDFS (#729)
HaileyJang Jan 2, 2023
8357a77
Catch multipart for hdfs
ShishirPatil Jan 2, 2023
6035bd6
Fix: test_hdfs bugs
ShishirPatil Jan 6, 2023
dedb577
Use gateways
ShishirPatil Jan 6, 2023
2bbd728
Fix: File system interface
ShishirPatil Jan 6, 2023
b16267e
port from aws to gcp
ShishirPatil Jan 12, 2023
75aae41
List object fixed
HaileyJang Jan 17, 2023
987b5b3
Fix: test hdfs for aws, posix file system
ShishirPatil Jan 21, 2023
131c5eb
Fix Merge to main merge conflict
HaileyJang Jan 27, 2023
f5a3241
Adding local, nfs, hdfs portion to CLI
HaileyJang Jan 27, 2023
c21bd05
Fix dependency issues
HaileyJang Jan 27, 2023
9174d82
Minor mods:Resolving comments for PR
ShishirPatil Jan 30, 2023
c99ca36
Merge branch 'main' into dev/shishir/on-prem
ShishirPatil Jan 30, 2023
5601cf3
Fix upto gateway creation
HaileyJang Feb 1, 2023
0497e5d
Update skyplane/obj_store/object_store_interface.py
HaileyJang Feb 1, 2023
f9042e3
Merge branch 'main' into dev/shishir/on-prem
ShishirPatil Feb 14, 2023
a00829c
fix: lint
ShishirPatil Feb 14, 2023
b8d766c
End-to-End Integration for HDFS (#758)
HaileyJang Feb 24, 2023
b1c0bfe
Fix legacy codes from previous runs
HaileyJang Feb 28, 2023
a66073c
Fix all the comments
HaileyJang Mar 9, 2023
9b9c445
Fix black pytype issue
HaileyJang Mar 10, 2023
e05c323
Update Pyproject for dataproc
HaileyJang Mar 10, 2023
21e834d
Add readme
HaileyJang Mar 13, 2023
4cadac8
Delete hostname
HaileyJang Mar 13, 2023
b039b35
Fix merge conflict with main
HaileyJang Mar 13, 2023
f3052d0
GCP test should pass now:
HaileyJang Mar 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,25 @@
FROM python:3.11-slim

# install apt packages
RUN --mount=type=cache,target=/var/cache/apt apt update \
&& apt-get install --no-install-recommends -y curl ca-certificates stunnel4 gcc libc-dev \
RUN --mount=type=cache,target=/var/cache/apt apt-get update \
&& apt-get install --no-install-recommends -y curl ca-certificates stunnel4 gcc libc-dev wget \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

#install HDFS Onprem Packages
RUN apt-get update && \
apt-get install -y openjdk-11-jdk && \
apt-get clean

ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64

RUN wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.0/hadoop-3.3.0.tar.gz -P /tmp \
&& tar -xzf /tmp/hadoop-3.3.0.tar.gz -C /tmp \
&& mv /tmp/hadoop-3.3.0 /usr/local/hadoop \
&& rm /tmp/hadoop-3.3.0.tar.gz

ENV HADOOP_HOME /usr/local/hadoop

# configure stunnel
RUN mkdir -p /etc/stunnel \
&& openssl genrsa -out key.pem 2048 \
Expand All @@ -31,6 +45,7 @@ RUN (echo 'net.ipv4.ip_local_port_range = 12000 65535' >> /etc/sysctl.conf) \

# install gateway
COPY scripts/requirements-gateway.txt /tmp/requirements-gateway.txt
COPY scripts/hostname /tmp/hostname
RUN --mount=type=cache,target=/root/.cache/pip pip3 install --no-cache-dir -r /tmp/requirements-gateway.txt && rm -r /tmp/requirements-gateway.txt

WORKDIR /pkg
Expand Down
2,668 changes: 1,455 additions & 1,213 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ lz4 = { version = "^4.0.0", optional = true }
pynacl = { version = "^1.5.0", optional = true }
pyopenssl = { version = "^22.0.0", optional = true }
werkzeug = { version = "^2.1.2", optional = true }
pyarrow = "^10.0.1"

[tool.poetry.extras]
aws = ["boto3"]
Expand Down
121 changes: 121 additions & 0 deletions scripts/on_prem/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
import time
from pyarrow import fs
import argparse
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

KB = 1024
MB = 1024 * 1024
GB = MB * 1024

THREADS = 32


def transfer_file(in_fs, in_path, out_fs, out_path, BATCH_SIZE, final):
print("Starting transfer...")
before = time.time()
if out_fs is not None:
with in_fs.open_input_stream(in_path) as in_file:
with out_fs.open_output_stream(out_path) as out_file:
while True:
buf = in_file.read(BATCH_SIZE)
if buf:
out_file.write(buf)
else:
break
else:
in_file = in_fs.open_input_stream(in_path)
curr = 0
while curr < final:
buf = in_file.read(BATCH_SIZE)
# print(f"Reading!{threading.get_ident()}", flush=True)
curr += BATCH_SIZE
if not buf:
break
print(f"Thread Finished. Time: {time.time()-before}")


def setup_files_and_dirs(outdir, hdfs):
# setup 10GB file
hdfs.create_dir(f"/data")
if not os.path.exists(outdir):
os.mkdir(outdir)
os.system(f"dd if=/dev/zero of={outdir}/10GBdata.bin bs=128KB count=78125")


def cleanup_files_and_dirs(outdir, hdfs):
# setup 10GB file
hdfs.delete_dir(f"/data")
os.system(f"rm -rf {outdir}")


def transfer_local_to_hdfs(hdfs, local, outdir):
# 32/64/128/156 MBs
transfer_file(local, f"{outdir}/10GBdata.bin", hdfs, f"/data/10GBdata.bin", 32 * MB, 10 * GB)

transfer_file(local, f"{outdir}/10GBdata.bin", hdfs, f"/data/10GBdata.bin", 64 * MB, 10 * GB)

transfer_file(local, f"{outdir}/10GBdata.bin", hdfs, f"/data/10GBdata.bin", 128 * MB, 10 * GB)

transfer_file(local, f"{outdir}/10GBdata.bin", hdfs, f"/data/10GBdata.bin", 156 * MB, 10 * GB)


def transfer_hdfs_to_local(hdfs, local, outdir):
# 32/64/128/156 MBs
transfer_file(hdfs, f"/data/10GBdata.bin", local, f"{outdir}/10GBdata.bin", 32 * MB, 10 * GB)

transfer_file(hdfs, f"/data/10GBdata.bin", local, f"{outdir}/10GBdata.bin", 64 * MB, 10 * GB)

transfer_file(hdfs, f"/data/10GBdata.bin", local, f"{outdir}/10GBdata.bin", 128 * MB, 10 * GB)

transfer_file(hdfs, f"/data/10GBdata.bin", local, f"{outdir}/10GBdata.bin", 156 * MB, 10 * GB)


def parallel_reads(args):
(hdfs, final) = args
new_hdfs = fs.HadoopFileSystem(host=hdfs, port=8020, extra_conf={"dfs.client.use.datanode.hostname": "false"})
transfer_file(new_hdfs, f"/data/10GBdata.bin", None, f"data/10GBdata.bin", 128 * MB, final)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("HDFS", type=str, help="HDFS host")
parser.add_argument("--outdir", type=str, default="/tmp/data")
parser.add_argument("--threads", type=bool, default=True)
args = parser.parse_args()

hdfs = fs.HadoopFileSystem(host=args.HDFS, port=8020, user="hadoop", extra_conf={"dfs.client.use.datanode.hostname": "false"})
local = fs.LocalFileSystem()
thread = args.threads
# setup_files_and_dirs(args.outdir, hdfs)
transfer_local_to_hdfs(hdfs, local, args.outdir)
transfer_hdfs_to_local(hdfs, local)
arg = []
increment = 10 * GB / THREADS
curr = 0
multiprocessing.set_start_method("spawn")
# prepare args
for i in range(THREADS):
arg.append((args.HDFS, increment))
curr += increment
if thread:
# execute the threads
with ThreadPoolExecutor(max_workers=THREADS) as p:
before = time.time()
future = [p.submit(parallel_reads, arg[i]) for i in range(THREADS)]
# p.map(parallel_reads, args)
else:
with ProcessPoolExecutor(max_workers=THREADS) as p:
before = time.time()
future = [p.submit(parallel_reads, arg[i]) for i in range(THREADS)]
# print(future.result())
# p.map(example, args)

print(f"Finished! Time:{time.time()-before}")
"""
t1 = threading.Thread(target=parallel_reads, args=(hdfs, lock, sema))
t2 = threading.Thread(target=parallel_reads, args=(hdfs, lock, sema))
t1.start()
t2.start()
t2.join()"""
1 change: 1 addition & 0 deletions scripts/requirements-gateway.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ pyopenssl
werkzeug
numpy
pandas
pyarrow
27 changes: 18 additions & 9 deletions skyplane/api/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from skyplane.api.config import TransferConfig
from skyplane.chunk import Chunk, ChunkRequest
from skyplane.obj_store.azure_blob_interface import AzureBlobObject
from skyplane.obj_store.file_system_interface import FileSystemInterface
from skyplane.obj_store.gcs_interface import GCSObject
from skyplane.obj_store.object_store_interface import ObjectStoreInterface, ObjectStoreObject
from skyplane.obj_store.s3_interface import S3Object
Expand All @@ -37,8 +38,8 @@ class Chunker:

def __init__(
self,
src_iface: ObjectStoreInterface,
dst_iface: ObjectStoreInterface,
src_iface: ObjectStoreObject or FileSystemInterface,
dst_iface: ObjectStoreObject or FileSystemInterface,
transfer_config: TransferConfig,
concurrent_multipart_chunk_threads: int = 64,
):
Expand Down Expand Up @@ -370,13 +371,16 @@ def src_prefix(self) -> Optional[str]:
return self._src_prefix

@property
def src_iface(self) -> ObjectStoreInterface:
def src_iface(self) -> ObjectStoreInterface or FileSystemInterface:
"""Return the source object store interface"""
if not hasattr(self, "_src_iface"):
provider_src, bucket_src, _ = parse_path(self.src_path)
self._src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src)
if self.requester_pays:
self._src_iface.set_requester_bool(True)
provider_src, bucket_src, path_src = parse_path(self.src_path)
if provider_src in ("local", "nfs"):
self._src_iface = FileSystemInterface.create(f"{provider_src}:infer", path_src)
else:
self._src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src)
if self.requester_pays:
self._src_iface.set_requester_bool(True)
return self._src_iface

@property
Expand Down Expand Up @@ -444,13 +448,16 @@ def http_pool(self):
def estimate_cost(self):
raise NotImplementedError()

def gen_transfer_pairs(self, chunker: Optional[Chunker] = None) -> Generator[Tuple[ObjectStoreObject, ObjectStoreObject], None, None]:
def gen_transfer_pairs(
self, chunker: Optional[Chunker] = None
) -> Generator[Tuple[ObjectStoreObject or FileSystemInterface, ObjectStoreObject or FileSystemInterface], None, None]:
"""Generate transfer pairs for the transfer job.

:param chunker: chunker that makes the chunk requests
:type chunker: Chunker
"""
if chunker is None: # used for external access to transfer pair list
logger.fs.debug("Generating transfer pairs for external access, {} -> {}".format(self.src_iface, self.dst_iface))
chunker = Chunker(self.src_iface, self.dst_iface, TransferConfig())
yield from chunker.transfer_pair_generator(self.src_prefix, self.dst_prefix, self.recursive, self._pre_filter_fn)

Expand Down Expand Up @@ -549,7 +556,9 @@ class SyncJob(CopyJob):
def estimate_cost(self):
raise NotImplementedError()

def gen_transfer_pairs(self, chunker: Optional[Chunker] = None) -> Generator[Tuple[ObjectStoreObject, ObjectStoreObject], None, None]:
def gen_transfer_pairs(
self, chunker: Optional[Chunker] = None
) -> Generator[Tuple[ObjectStoreObject or FileSystemInterface, ObjectStoreObject or FileSystemInterface], None, None]:
"""Generate transfer pairs for the transfer job.

:param chunker: chunker that makes the chunk requests
Expand Down
47 changes: 38 additions & 9 deletions skyplane/cli/cli_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from skyplane.config import SkyplaneConfig
from skyplane.config_paths import cloud_config, config_path
from skyplane.obj_store.object_store_interface import ObjectStoreInterface
from skyplane.obj_store.file_system_interface import FileSystemInterface
from skyplane.cli.impl.progress_bar import ProgressBarTransferHook
from skyplane.utils import logger
from skyplane.utils.definitions import GB, format_bytes
Expand Down Expand Up @@ -166,7 +167,11 @@ def transfer_sync_small(self, src: str, dst: str) -> bool:
return False

def make_dataplane(self, **solver_args) -> skyplane.Dataplane:
dp = self.client.dataplane(*self.src_region_tag.split(":"), *self.dst_region_tag.split(":"), **solver_args)
if self.src_region_tag.split(":")[0] == "hdfs":
HaileyJang marked this conversation as resolved.
Show resolved Hide resolved
new_region_tag = "gcp:" + self.src_region_tag.split(":")[1]
else:
new_region_tag = self.src_region_tag
dp = self.client.dataplane(*new_region_tag.split(":"), *self.dst_region_tag.split(":"), **solver_args)
logger.fs.debug(f"Using dataplane: {dp}")
return dp

Expand Down Expand Up @@ -305,8 +310,16 @@ def cp(
print_header()
provider_src, bucket_src, path_src = parse_path(src)
provider_dst, bucket_dst, path_dst = parse_path(dst)
src_region_tag = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src).region_tag()
dst_region_tag = ObjectStoreInterface.create(f"{provider_dst}:infer", bucket_dst).region_tag()
if provider_src in ("local", "nfs"):
src_region_tag = FileSystemInterface.create(f"{provider_src}:infer", path_src).region_tag()
else:
src_region_tag = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src).region_tag()

if provider_dst in ("local", "nfs"):
dst_region_tag = FileSystemInterface.create(f"{provider_dst}:infer", path_dst).region_tag()
else:
dst_region_tag = ObjectStoreInterface.create(f"{provider_dst}:infer", bucket_dst).region_tag()

args = {
"cmd": "cp",
"recursive": recursive,
Expand All @@ -326,12 +339,28 @@ def cp(
)
return 1

if provider_src in ("local", "hdfs", "nfs") or provider_dst in ("local", "hdfs", "nfs"):
if provider_src == "hdfs" or provider_dst == "hdfs":
typer.secho("HDFS is not supported yet.", fg="red")
return 1
return 0 if cli.transfer_cp_onprem(src, dst, recursive) else 1
elif provider_src in ("aws", "gcp", "azure") and provider_dst in ("aws", "gcp", "azure"):
dp = cli.make_dataplane(
solver_type=solver,
n_vms=max_instances,
n_connections=max_connections,
solver_required_throughput_gbits=solver_required_throughput_gbits,
)

if provider_src in ("local", "nfs") and provider_dst in ("aws", "gcp", "azure"):
with dp.auto_deprovision():
ShishirPatil marked this conversation as resolved.
Show resolved Hide resolved
dp.queue_copy(src, dst, recursive=recursive)
try:
if not cli.confirm_transfer(dp, 5, ask_to_confirm_transfer=not confirm):
return 1
dp.provision(spinner=True)
dp.run(ProgressBarTransferHook())
except skyplane.exceptions.SkyplaneException as e:
console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]")
console.print(e.pretty_print_str())
UsageClient.log_exception("cli_query_objstore", e, args, src_region_tag, dst_region_tag)
return 1
# return 0 if cli.transfer_cp_onprem(src, dst, recursive) else 1
elif provider_src in ("aws", "gcp", "azure", "hdfs") and provider_dst in ("aws", "gcp", "azure"):
# todo support ILP solver params
dp = cli.make_dataplane(
solver_type=solver,
Expand Down
4 changes: 4 additions & 0 deletions skyplane/compute/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def get_transfer_cost(src_key, dst_key, premium_tier=True):
from skyplane.compute.azure.azure_cloud_provider import AzureCloudProvider

return AzureCloudProvider.get_transfer_cost(src_key, dst_key, premium_tier)
elif src_provider == "hdfs":
from skyplane.compute.gcp.gcp_cloud_provider import GCPCloudProvider
HaileyJang marked this conversation as resolved.
Show resolved Hide resolved

return GCPCloudProvider.get_transfer_cost(f"gcp:{_}", dst_key, premium_tier)
else:
raise NotImplementedError

Expand Down
1 change: 1 addition & 0 deletions skyplane/compute/gcp/gcp_cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def teardown_global(self):
@imports.inject("googleapiclient.errors", pip_extra="gcp")
def authorize_gateways(errors, self, ips: List[str], rule_name: Optional[str] = None) -> str:
firewall_name = f"skyplane-{uuid.uuid4().hex[:8]}" if rule_name is None else rule_name
self.network.create_firewall_rule(f"allow-default-{uuid.uuid4().hex[:8]}", "0.0.0.0/0", ["0-65535"], ["tcp"])
self.network.create_firewall_rule(firewall_name, ips, ["0-65535"], ["tcp", "udp", "icmp"])
return firewall_name

Expand Down
3 changes: 3 additions & 0 deletions skyplane/gateway/chunk_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_chunk_state(self, chunk_id: str) -> Optional[ChunkState]:
return self.chunk_status[chunk_id] if chunk_id in self.chunk_status else None

def set_chunk_state(self, chunk_id: str, new_status: ChunkState, log_metadata: Optional[Dict] = None):
logger.info(f"Chunk {chunk_id} state transition {self.get_chunk_state(chunk_id)} -> {new_status}")
self.chunk_status[chunk_id] = new_status
rec = {"chunk_id": chunk_id, "state": new_status.name, "time": str(datetime.utcnow().isoformat())}
if log_metadata is not None:
Expand All @@ -61,6 +62,7 @@ def drain_chunk_status_queue(self) -> List[Dict]:
def state_queue_download(self, chunk_id: str):
state = self.get_chunk_state(chunk_id)
if state in [ChunkState.registered, ChunkState.download_queued]:
logger.info(f"Queuing download for chunk {chunk_id} (state={state}")
self.set_chunk_state(chunk_id, ChunkState.download_queued)
else:
raise ValueError(f"Invalid transition queue_download from {state} (id={chunk_id})")
Expand Down Expand Up @@ -123,6 +125,7 @@ def get_chunk_request(self, chunk_id: str) -> ChunkRequest:
return self.chunk_requests[chunk_id]

def add_chunk_request(self, chunk_request: ChunkRequest, state=ChunkState.registered):
logger.info(f"Add_chunk_request: Adding chunk request {chunk_request}")
self.set_chunk_state(chunk_request.chunk.chunk_id, state)
self.chunk_requests[chunk_request.chunk.chunk_id] = chunk_request

Expand Down
Loading