Skip to content

Commit

Permalink
Fix replicate random due to source instances bug (#341)
Browse files Browse the repository at this point in the history
* Fix replicate random due bug in source_instances calculation

* Simpler logic

* Remove logging statements

* Only return gateways
  • Loading branch information
parasj authored May 13, 2022
1 parent ce19463 commit a0b8059
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 18 deletions.
4 changes: 2 additions & 2 deletions skylark/compute/aws/aws_cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ def remove_ip_from_security_group(self, aws_region: str, ip: str):
)
except botocore.exceptions.ClientError as e:
logger.fs.error(f"[AWS] Error removing IP {ip} from security group {sg.group_name}: {e}")
if not str(e).endswith("NotFound"):
logger.warn("[AWS] Error removing IP from security group")
if "The specified rule does not exist in this security group." not in str(e):
logger.warn(f"[AWS] Error removing IP from security group: {e}")

def ensure_keyfile_exists(self, aws_region, prefix=key_root / "aws"):
ec2 = self.auth.get_boto3_resource("ec2", aws_region)
Expand Down
16 changes: 5 additions & 11 deletions skylark/replicate/replication_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,12 @@ def get_incoming_paths(self, dest: ReplicationTopologyNode):
return {src_gateway: num_connections for dest_gateway, src_gateway, num_connections in self.edges if dest_gateway == dest}

def source_instances(self) -> Set[ReplicationTopologyNode]:
nodes = set()
for src, dest, _ in self.edges:
if isinstance(src, ReplicationTopologyObjectStore):
nodes.add(dest)
return nodes
nodes = self.nodes - {v for u, v, _ in self.edges if not isinstance(u, ReplicationTopologyObjectStore)}
return [n for n in nodes if isinstance(n, ReplicationTopologyGateway)]

def sink_instances(self) -> Set[ReplicationTopologyNode]:
nodes = set()
for src, dest, _ in self.edges:
if isinstance(dest, ReplicationTopologyObjectStore):
nodes.add(src)
return nodes
nodes = self.nodes - {u for u, v, _ in self.edges if not isinstance(v, ReplicationTopologyObjectStore)}
return [n for n in nodes if isinstance(n, ReplicationTopologyGateway)]

def source_region(self) -> str:
instances = list(self.source_instances())
Expand Down Expand Up @@ -197,7 +191,7 @@ def to_graphviz(self):
g.edge(
src_node,
dest_node,
label=f"{n_connections} connections" if src_instance is not "objstore" and dest_instance is not "objstore" else None,
label=f"{n_connections} connections" if src_instance != "objstore" and dest_instance != "objstore" else None,
)

for subgraph in subgraphs.values():
Expand Down
5 changes: 0 additions & 5 deletions skylark/replicate/replicator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,18 +490,14 @@ def monitor_transfer(
try:
with Timer() as t:
while True:
logger.fs.debug(f"Refreshing status, {t.elapsed:.2f}s elapsed")

# check for errors and exit if there are any
logger.fs.debug("Checking for errors")
errors = self.check_error_logs()
if any(errors.values()):
return {
"errors": errors,
"monitor_status": "error",
}

logger.fs.debug("Checking for completion")
log_df = self.get_chunk_status_log_df()
if log_df.empty:
logger.warning("No chunk status log entries yet")
Expand All @@ -521,7 +517,6 @@ def monitor_transfer(
)

# update progress bar
logger.fs.debug("Updating progress bar")
total_runtime_s = (log_df.time.max() - log_df.time.min()).total_seconds()
throughput_gbits = completed_bytes * 8 / GB / total_runtime_s if total_runtime_s > 0 else 0.0

Expand Down

0 comments on commit a0b8059

Please sign in to comment.