Skip to content

Commit

Permalink
Fix Azure transfers with additional permissions (#435)
Browse files Browse the repository at this point in the history
* Squashed commit of the following:

commit da64fa1
Author: Paras Jain <[email protected]>
Date:   Mon Jul 11 20:38:38 2022 +0000

    Squashed commit of the following:

    commit 0764e14
    Author: Paras Jain <[email protected]>
    Date:   Mon Jul 11 20:34:58 2022 +0000

        Squashed commit of the following:

        commit cd30b15
        Merge: 200c2f4 91724d7
        Author: Paras Jain <[email protected]>
        Date:   Mon Jul 11 20:34:30 2022 +0000

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit 200c2f4
        Author: Paras Jain <[email protected]>
        Date:   Mon Jul 11 13:30:56 2022 -0700

            Remove create_bucket option from ObjStoreInterface (#443)

        commit c260d7a
        Merge: 3181f2f 0e29e02
        Author: Paras Jain <[email protected]>
        Date:   Mon Jul 11 18:56:24 2022 +0000

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit 3181f2f
        Merge: 15f380a f08b9d9
        Author: Paras Jain <[email protected]>
        Date:   Thu Jul 7 20:18:54 2022 +0000

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit 15f380a
        Merge: ed5a8be e5c97e0
        Author: Paras Jain <[email protected]>
        Date:   Wed Jun 29 18:14:02 2022 -0700

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit ed5a8be
        Merge: 80bfcc6 7402a15
        Author: Paras Jain <[email protected]>
        Date:   Wed Jun 29 12:53:19 2022 -0700

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit 80bfcc6
        Merge: 00493b5 1598038
        Author: Paras Jain <[email protected]>
        Date:   Wed Jun 29 12:45:19 2022 -0700

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit 00493b5
        Author: Paras Jain <[email protected]>
        Date:   Tue Jun 28 01:33:37 2022 +0000

            Print error message

        commit a4a6a5e
        Author: Paras Jain <[email protected]>
        Date:   Tue Jun 28 01:27:29 2022 +0000

            Format

        commit d0669c7
        Author: Paras Jain <[email protected]>
        Date:   Tue Jun 28 01:27:22 2022 +0000

            Seems to work!

        commit c3124b8
        Author: Paras Jain <[email protected]>
        Date:   Tue Jun 28 00:19:47 2022 +0000

            Missing ref

        commit e362958
        Author: Paras Jain <[email protected]>
        Date:   Tue Jun 28 00:17:07 2022 +0000

            Seems to fix Azure transfers

        commit 9c5345d
        Author: Paras Jain <[email protected]>
        Date:   Mon Jun 27 22:06:56 2022 +0000

            Bump poetry.lock

        commit 48611f5
        Merge: 4d6632f a4c6d58
        Author: Paras Jain <[email protected]>
        Date:   Mon Jun 27 19:53:32 2022 +0000

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit 4d6632f
        Merge: 50f618c 5afe0d8
        Author: Paras Jain <[email protected]>
        Date:   Mon Jun 27 16:21:13 2022 +0000

            Merge branch 'main' into paras/sky-183-azure-transfers-are-broken-on-main

        commit 50f618c
        Author: Paras Jain <[email protected]>
        Date:   Mon Jun 27 16:20:45 2022 +0000

            Verify transfer if no error occurred

        commit 8929b2f
        Author: Paras Jain <[email protected]>
        Date:   Mon Jun 27 16:20:31 2022 +0000

            Gate GCP firewall additions behind check

commit 00b5264
Author: Paras Jain <[email protected]>
Date:   Mon Jul 11 20:37:30 2022 +0000

    Partial azure changes

* Wait for role assignment propagation
  • Loading branch information
parasj authored Jul 11, 2022
1 parent 77165ec commit d5dd450
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 30 deletions.
5 changes: 4 additions & 1 deletion skyplane/cli/cli_impl/cp_replicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ def launch_replication_job(
total_bytes = sum([chunk_req.chunk.chunk_length_bytes for chunk_req in job.chunk_requests])
console.print(f":rocket: [bold blue]{total_bytes / GB:.2f}GB transfer job launched[/bold blue]")
if topo.source_region().split(":")[0] == "azure" or topo.sink_region().split(":")[0] == "azure":
typer.secho(f"Warning: It can take up to 60s for role assignments to propagate on Azure. See issue #355", fg="yellow")
typer.secho(
f"Warning: For Azure transfers, your transfer may block for up to 120s waiting for role assignments to propagate. See issue #355.",
fg="yellow",
)
stats = rc.monitor_transfer(
job,
show_spinner=True,
Expand Down
48 changes: 27 additions & 21 deletions skyplane/compute/azure/azure_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ def terminate_instance_impl(self):
compute_client = self.auth.get_compute_client()
network_client = self.auth.get_network_client()

# remove any role assignments to the VM's system assigned identity
auth_client = self.auth.get_authorization_client()
vm = self.get_virtual_machine()
for assignment in auth_client.role_assignments.list(filter="principalId eq '{}'".format(vm.identity.principal_id)):
logger.fs.debug(f"Deleting role assignment {assignment.name}")
auth_client.role_assignments.delete(
scope=assignment.scope,
role_assignment_name=assignment.name,
)

vm_poller = compute_client.virtual_machines.begin_delete(AzureServer.resource_group_name, self.vm_name(self.name))
_ = vm_poller.result()
nic_poller = network_client.network_interfaces.begin_delete(AzureServer.resource_group_name, self.nic_name(self.name))
Expand All @@ -168,11 +178,14 @@ def terminate_instance_impl(self):
vnet_poller = network_client.virtual_networks.begin_delete(AzureServer.resource_group_name, self.vnet_name(self.name))
_ = vnet_poller.result()

def authorize_storage_account(self, storage_account_name: str):
# Assign roles to system MSI, see https://docs.microsoft.com/en-us/samples/azure-samples/compute-python-msi-vm/compute-python-msi-vm/#role-assignment
def authorize_subscription(self):
# Authorize system MSI to access subscription
auth_client = self.auth.get_authorization_client()
subscription_scope = "/subscriptions/{}".format(self.auth.subscription_id)
principal_id = self.get_virtual_machine().identity.principal_id

def grant_vm_role(principal_id, scope, role_name):
prefix = f"grant_vm_role({principal_id}, {scope.split('/')[-1]}, {role_name})"
try:
roles = list(auth_client.role_definitions.list(scope, filter="roleName eq '{}'".format(role_name)))
assert len(roles) == 1, f"Got roles {roles}"
Expand All @@ -181,35 +194,28 @@ def grant_vm_role(principal_id, scope, role_name):
)
auth_client.role_assignments.create(scope, uuid.uuid4(), params)
return roles[0]
except azure.core.exceptions.ResourceExistsError:
logger.fs.warning(f"Role {role_name} already exists")
except azure.core.exceptions.ResourceExistsError as e:
logger.fs.warning(f"{prefix}: Role '{role_name}' already exists: {e}")
return None

# get scope for storage account
def get_scope_for_storage_account(storage_account_name):
for sa in self.auth.get_storage_management_client().storage_accounts.list():
if sa.name == storage_account_name:
return sa.id
raise MissingBucketException("Storage account not found")

# grant "Storage Blob Data Contributor" role to the VM (self.get_virtual_machine().identity.principal_id)
scope = get_scope_for_storage_account(storage_account_name)
principal_id = self.get_virtual_machine().identity.principal_id
r1 = grant_vm_role(principal_id, scope, "Storage Blob Data Contributor")
r2 = grant_vm_role(principal_id, scope, "Storage Blob Data Reader")
r3 = grant_vm_role(principal_id, scope, "Storage Blob Delegator")
logger.fs.debug(f"grant_vm_role({principal_id}, {scope}, 'Storage Blob Data Contributor')")
r1 = grant_vm_role(principal_id, subscription_scope, "Storage Blob Data Contributor")
r2 = grant_vm_role(principal_id, subscription_scope, "Storage Blob Data Reader")
r3 = grant_vm_role(principal_id, subscription_scope, "Storage Blob Delegator")
r4 = grant_vm_role(principal_id, subscription_scope, "Storage Account Contributor")

# wait till the storage account is accessible by checking for roles
# wait till the subscription is accessible by checking for roles
def check_role(role):
if role is None:
return True
for assignment in auth_client.role_assignments.list_for_scope(scope, filter="principalId eq '{}'".format(principal_id)):
for assignment in auth_client.role_assignments.list_for_scope(
subscription_scope, filter="principalId eq '{}'".format(principal_id)
):
if assignment.role_definition_id == role.id:
return True
return False

wait_for(lambda: check_role(r1) and check_role(r2) and check_role(r3), timeout=60, desc="authorize_storage_account")
wait_for(lambda: all(check_role(role) for role in [r1, r2, r3, r4]), timeout=60, desc="authorize_subscription")
logger.fs.debug(f"Authorized subscription for VM {self.name}")

def get_ssh_client_impl(self, uname="skyplane", ssh_key_password="skyplane"):
"""Return paramiko client that connects to this instance."""
Expand Down
11 changes: 11 additions & 0 deletions skyplane/gateway/gateway_obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from skyplane.gateway.chunk_store import ChunkStore
from skyplane.obj_store.object_store_interface import ObjectStoreInterface
from skyplane.utils import logger
from skyplane.utils.fn import wait_for
from skyplane.utils.retry import retry_backoff


Expand Down Expand Up @@ -47,6 +48,16 @@ def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterf
self.obj_store_interfaces[key] = ObjectStoreInterface.create(region, bucket)
except Exception as e:
raise ValueError(f"Failed to create obj store interface {str(e)}")

def exists():
try:
return self.obj_store_interfaces[key].bucket_exists()
except Exception as e:
logger.error(f"[gateway_daemon] Failed to check bucket exists {str(e)}")
return False

propogation_time = wait_for(exists, timeout=120, desc=f"Wait for obj store interface {key}")
logger.info(f"[gateway_daemon] Created obj store interface {key}, waited {propogation_time:.2f}s for bucket to exist")
return self.obj_store_interfaces[key]

def start_workers(self):
Expand Down
10 changes: 3 additions & 7 deletions skyplane/replicate/replicator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,9 @@ def run_replication_plan(
# assign source and destination gateways permission to buckets
assign_jobs = []
if job.source_region.split(":")[0] == "azure":
for location, gateway in self.bound_nodes.items():
if isinstance(gateway, AzureServer) and location.region == job.source_region:
assign_jobs.append(partial(gateway.authorize_storage_account, job.source_bucket.split("/", 1)[0]))
if job.dest_region.split(":")[0] == "azure":
for location, gateway in self.bound_nodes.items():
if isinstance(gateway, AzureServer) and location.region == job.dest_region:
assign_jobs.append(partial(gateway.authorize_storage_account, job.dest_bucket.split("/", 1)[0]))
for gateway in self.bound_nodes.values():
if isinstance(gateway, AzureServer):
assign_jobs.append(gateway.authorize_subscription)
do_parallel(lambda fn: fn(), assign_jobs, spinner=True, spinner_persist=True, desc="Assigning gateways permissions to buckets")

with Progress(
Expand Down
2 changes: 1 addition & 1 deletion skyplane/utils/fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
R = TypeVar("R")


def wait_for(fn: Callable[[], bool], timeout=60, interval=0.25, desc="Waiting") -> Optional[float]:
def wait_for(fn: Callable[[], bool], timeout=60, interval=0.25, desc="Waiting", debug=False) -> Optional[float]:
"""Wait for fn to return True. Returns number of seconds waited."""
start = time.time()
while time.time() - start < timeout:
Expand Down

0 comments on commit d5dd450

Please sign in to comment.