Skip to content

Commit

Permalink
Merge pull request #477 from seung-lab/ingest-handle-empty-input
Browse files Browse the repository at this point in the history
fix(ingest): handle empty input to parse_edges
  • Loading branch information
akhileshh authored Nov 21, 2023
2 parents f5c8503 + f445849 commit c6947a9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
8 changes: 6 additions & 2 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ def ingest_chunk(queue: str, chunk_info):
def ingest_chunk_local(graph_id: str, chunk_info, n_threads: int):
"""Manually ingest a chunk on a local machine."""
from .create.abstract_layers import add_layer
from .cluster import _create_atomic_chunk

cg = ChunkedGraph(graph_id=graph_id)
add_layer(cg, chunk_info[0], chunk_info[1:], n_threads=n_threads)
if chunk_info[0] == 2:
_create_atomic_chunk(chunk_info[1:])
else:
cg = ChunkedGraph(graph_id=graph_id)
add_layer(cg, chunk_info[0], chunk_info[1:], n_threads=n_threads)
4 changes: 3 additions & 1 deletion pychunkedgraph/io/edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ def deserialize(edges_message: EdgesMsg) -> Tuple[np.ndarray, np.ndarray, np.nda


def _parse_edges(compressed: List[bytes]) -> List[Dict]:
result = []
if(len(compressed) == 0):
return result
zdc = zstd.ZstdDecompressor()
try:
n_threads = int(os.environ.get("ZSTD_THREADS", 1))
except ValueError:
n_threads = 1
decompressed = zdc.multi_decompress_to_buffer(compressed, threads=n_threads)
result = []
for content in decompressed:
chunk_edges = ChunkEdgesMsg()
chunk_edges.ParseFromString(memoryview(content))
Expand Down

0 comments on commit c6947a9

Please sign in to comment.