From 9568824f3780be032b500694bd2c78d6dd526fa4 Mon Sep 17 00:00:00 2001 From: Jim MacArthur Date: Thu, 2 Aug 2018 15:49:33 +0100 Subject: [PATCH] cascache.py: Preparation for remote execution Refactor the push() and pull() implementations so that API additions needed for remote-execution is made easier. https://gitlab.com/BuildStream/buildstream/issues/454 --- buildstream/_artifactcache/cascache.py | 205 +++++++++++++------------ 1 file changed, 111 insertions(+), 94 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index e2c0d44b5..ec9d78026 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -81,6 +81,7 @@ def __init__(self, context, *, enable_push=True): ################################################ # Implementation of abstract methods # ################################################ + def contains(self, element, key): refpath = self._refpath(self.get_artifact_fullname(element, key)) @@ -156,6 +157,7 @@ def initialize_remotes(self, *, on_failure=None): q = multiprocessing.Queue() for remote_spec in remote_specs: # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) try: @@ -268,109 +270,69 @@ def link_key(self, element, oldkey, newkey): self.set_ref(newref, tree) + def _push_refs_to_remote(self, refs, remote): + skipped_remote = True + try: + for ref in refs: + tree = self.resolve_ref(ref) + + # Check whether ref is already on the server in which case + # there is no need to push the artifact + try: + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + response = remote.ref_storage.GetReference(request) + + if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: + # ref is already on the server with the same tree + continue + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + # Intentionally re-raise RpcError for outer except block. + raise + + self._send_directory(remote, tree) + + request = buildstream_pb2.UpdateReferenceRequest() + request.keys.append(ref) + request.digest.hash = tree.hash + request.digest.size_bytes = tree.size_bytes + remote.ref_storage.UpdateReference(request) + + skipped_remote = False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + + return not skipped_remote + def push(self, element, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] + + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] project = element._get_project() push_remotes = [r for r in self._remotes[project] if r.spec.push] pushed = False - display_key = element._get_brief_display_key() + for remote in push_remotes: remote.init() - skipped_remote = True + display_key = element._get_brief_display_key() element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - try: - for ref in refs: - tree = self.resolve_ref(ref) - - # Check whether ref is already on the server in which case - # there is no need to push the artifact - try: - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: - # ref is already on the server with the same tree - continue - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - # Intentionally re-raise RpcError for outer except block. - raise - - missing_blobs = {} - required_blobs = self._required_blobs(tree) - - # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest() - - for required_digest in required_blobs_group: - d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes - - response = remote.cas.FindMissingBlobs(request) - for digest in response.missing_blob_digests: - d = remote_execution_pb2.Digest() - d.hash = digest.hash - d.size_bytes = digest.size_bytes - missing_blobs[d.hash] = d - - # Upload any blobs missing on the server - skipped_remote = False - for digest in missing_blobs.values(): - uuid_ = uuid.uuid4() - resource_name = '/'.join(['uploads', str(uuid_), 'blobs', - digest.hash, str(digest.size_bytes)]) - - def request_stream(resname): - with open(self.objpath(digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == digest.size_bytes - offset = 0 - finished = False - remaining = digest.size_bytes - while not finished: - chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) - remaining -= chunk_size - - request = bytestream_pb2.WriteRequest() - request.write_offset = offset - # max. _MAX_PAYLOAD_BYTES chunks - request.data = f.read(chunk_size) - request.resource_name = resname - request.finish_write = remaining <= 0 - yield request - offset += chunk_size - finished = request.finish_write - response = remote.bytestream.Write(request_stream(resource_name)) - - request = buildstream_pb2.UpdateReferenceRequest() - request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes - remote.ref_storage.UpdateReference(request) - - pushed = True - - if not skipped_remote: - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e - - if skipped_remote: + if self._push_refs_to_remote(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: self.context.message(Message( None, MessageType.INFO, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) + return pushed ################################################ @@ -599,6 +561,7 @@ def prune(self): ################################################ # Local Private Methods # ################################################ + def _checkout(self, dest, tree): os.makedirs(dest, exist_ok=True) @@ -776,16 +739,16 @@ def _initialize_remote(self, remote_spec, q): # q.put(str(e)) - def _required_blobs(self, tree): + def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() - d.hash = tree.hash - d.size_bytes = tree.size_bytes + d.hash = directory_digest.hash + d.size_bytes = directory_digest.size_bytes yield d directory = remote_execution_pb2.Directory() - with open(self.objpath(tree), 'rb') as f: + with open(self.objpath(directory_digest), 'rb') as f: directory.ParseFromString(f.read()) for filenode in directory.files: @@ -797,16 +760,16 @@ def _required_blobs(self, tree): for dirnode in directory.directories: yield from self._required_blobs(dirnode.digest) - def _fetch_blob(self, remote, digest, out): + def _fetch_blob(self, remote, digest, stream): resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) request = bytestream_pb2.ReadRequest() request.resource_name = resource_name request.read_offset = 0 for response in remote.bytestream.Read(request): - out.write(response.data) + stream.write(response.data) + stream.flush() - out.flush() - assert digest.size_bytes == os.fstat(out.fileno()).st_size + assert digest.size_bytes == os.fstat(stream.fileno()).st_size # _ensure_blob(): # @@ -922,6 +885,60 @@ def _fetch_directory(self, remote, dir_digest): # Fetch final batch self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()): + resource_name = '/'.join(['uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)]) + + def request_stream(resname, instream): + offset = 0 + finished = False + remaining = digest.size_bytes + while not finished: + chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) + remaining -= chunk_size + + request = bytestream_pb2.WriteRequest() + request.write_offset = offset + # max. _MAX_PAYLOAD_BYTES chunks + request.data = instream.read(chunk_size) + request.resource_name = resname + request.finish_write = remaining <= 0 + + yield request + + offset += chunk_size + finished = request.finish_write + + response = remote.bytestream.Write(request_stream(resource_name, stream)) + + assert response.committed_size == digest.size_bytes + + def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + required_blobs = self._required_blobs(digest) + + missing_blobs = dict() + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest() + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = remote.cas.FindMissingBlobs(request) + for missing_digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = missing_digest.hash + d.size_bytes = missing_digest.size_bytes + missing_blobs[d.hash] = d + + # Upload any blobs missing on the server + for blob_digest in missing_blobs.values(): + with open(self.objpath(blob_digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes + self._send_blob(remote, blob_digest, f, u_uid=u_uid) + # Represents a single remote CAS cache. #