Skip to content

Commit

Permalink
cascache.py: Preparation for remote execution
Browse files Browse the repository at this point in the history
Refactor the push() and pull() implementations so that API additions
needed for remote-execution is made easier.

https://gitlab.com/BuildStream/buildstream/issues/454
  • Loading branch information
jmacarthur authored and gtristan committed Oct 3, 2018
1 parent 6e82036 commit 9568824
Showing 1 changed file with 111 additions and 94 deletions.
205 changes: 111 additions & 94 deletions buildstream/_artifactcache/cascache.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(self, context, *, enable_push=True):
################################################
# Implementation of abstract methods #
################################################

def contains(self, element, key):
refpath = self._refpath(self.get_artifact_fullname(element, key))

Expand Down Expand Up @@ -156,6 +157,7 @@ def initialize_remotes(self, *, on_failure=None):
q = multiprocessing.Queue()
for remote_spec in remote_specs:
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))

try:
Expand Down Expand Up @@ -268,109 +270,69 @@ def link_key(self, element, oldkey, newkey):

self.set_ref(newref, tree)

def _push_refs_to_remote(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)

# Check whether ref is already on the server in which case
# there is no need to push the artifact
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
response = remote.ref_storage.GetReference(request)

if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue

except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise

self._send_directory(remote, tree)

request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
remote.ref_storage.UpdateReference(request)

skipped_remote = False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e

return not skipped_remote

def push(self, element, keys):
refs = [self.get_artifact_fullname(element, key) for key in keys]

refs = [self.get_artifact_fullname(element, key) for key in list(keys)]

project = element._get_project()

push_remotes = [r for r in self._remotes[project] if r.spec.push]

pushed = False
display_key = element._get_brief_display_key()

for remote in push_remotes:
remote.init()
skipped_remote = True
display_key = element._get_brief_display_key()
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))

try:
for ref in refs:
tree = self.resolve_ref(ref)

# Check whether ref is already on the server in which case
# there is no need to push the artifact
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
response = remote.ref_storage.GetReference(request)

if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue

except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
# Intentionally re-raise RpcError for outer except block.
raise

missing_blobs = {}
required_blobs = self._required_blobs(tree)

# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()

for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes

response = remote.cas.FindMissingBlobs(request)
for digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
missing_blobs[d.hash] = d

# Upload any blobs missing on the server
skipped_remote = False
for digest in missing_blobs.values():
uuid_ = uuid.uuid4()
resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
digest.hash, str(digest.size_bytes)])

def request_stream(resname):
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
remaining -= chunk_size

request = bytestream_pb2.WriteRequest()
request.write_offset = offset
# max. _MAX_PAYLOAD_BYTES chunks
request.data = f.read(chunk_size)
request.resource_name = resname
request.finish_write = remaining <= 0
yield request
offset += chunk_size
finished = request.finish_write
response = remote.bytestream.Write(request_stream(resource_name))

request = buildstream_pb2.UpdateReferenceRequest()
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
remote.ref_storage.UpdateReference(request)

pushed = True

if not skipped_remote:
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))

except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e

if skipped_remote:
if self._push_refs_to_remote(refs, remote):
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
pushed = True
else:
self.context.message(Message(
None,
MessageType.INFO,
"Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key())
))

return pushed

################################################
Expand Down Expand Up @@ -599,6 +561,7 @@ def prune(self):
################################################
# Local Private Methods #
################################################

def _checkout(self, dest, tree):
os.makedirs(dest, exist_ok=True)

Expand Down Expand Up @@ -776,16 +739,16 @@ def _initialize_remote(self, remote_spec, q):
#
q.put(str(e))

def _required_blobs(self, tree):
def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
d.hash = tree.hash
d.size_bytes = tree.size_bytes
d.hash = directory_digest.hash
d.size_bytes = directory_digest.size_bytes
yield d

directory = remote_execution_pb2.Directory()

with open(self.objpath(tree), 'rb') as f:
with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())

for filenode in directory.files:
Expand All @@ -797,16 +760,16 @@ def _required_blobs(self, tree):
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)

def _fetch_blob(self, remote, digest, out):
def _fetch_blob(self, remote, digest, stream):
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.bytestream.Read(request):
out.write(response.data)
stream.write(response.data)
stream.flush()

out.flush()
assert digest.size_bytes == os.fstat(out.fileno()).st_size
assert digest.size_bytes == os.fstat(stream.fileno()).st_size

# _ensure_blob():
#
Expand Down Expand Up @@ -922,6 +885,60 @@ def _fetch_directory(self, remote, dir_digest):
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)

def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
digest.hash, str(digest.size_bytes)])

def request_stream(resname, instream):
offset = 0
finished = False
remaining = digest.size_bytes
while not finished:
chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
remaining -= chunk_size

request = bytestream_pb2.WriteRequest()
request.write_offset = offset
# max. _MAX_PAYLOAD_BYTES chunks
request.data = instream.read(chunk_size)
request.resource_name = resname
request.finish_write = remaining <= 0

yield request

offset += chunk_size
finished = request.finish_write

response = remote.bytestream.Write(request_stream(resource_name, stream))

assert response.committed_size == digest.size_bytes

def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
required_blobs = self._required_blobs(digest)

missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
request = remote_execution_pb2.FindMissingBlobsRequest()

for required_digest in required_blobs_group:
d = request.blob_digests.add()
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes

response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
d.size_bytes = missing_digest.size_bytes
missing_blobs[d.hash] = d

# Upload any blobs missing on the server
for blob_digest in missing_blobs.values():
with open(self.objpath(blob_digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
self._send_blob(remote, blob_digest, f, u_uid=u_uid)


# Represents a single remote CAS cache.
#
Expand Down

0 comments on commit 9568824

Please sign in to comment.