Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix issue with same doc type in spawned processes #6062

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
timeout-minutes: 45
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
timeout-minutes: 45
Expand Down
3 changes: 2 additions & 1 deletion jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ async def post(body: input_model, response: Response):
else:
docs = DocList[input_doc_list_model]([data])
if body.header is None:
req_id = docs[0].id
if hasattr(docs[0], 'id'):
req_id = docs[0].id

try:
async for resp in streamer.stream_docs(
Expand Down
11 changes: 10 additions & 1 deletion jina/serve/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ async def _get_endpoints_input_output_models(
connection_pool, retry_forever=True, is_cancel=is_cancel
)
self.logger.debug(f'Got all endpoints from TopologyGraph {endpoints}')

if endpoints is not None:
for endp in endpoints:
for origin_node in topology_graph.origin_nodes:
Expand All @@ -102,6 +101,16 @@ async def _get_endpoints_input_output_models(
and len(leaf_input_output_model) > 0
):
_endpoints_models_map[endp] = leaf_input_output_model[0]
cached_models = {}
for k, v in _endpoints_models_map.items():
if v['input'].__name__ not in cached_models:
cached_models[v['input'].__name__] = v['input']
else:
v['input'] = cached_models[v['input'].__name__]
if v['output'].__name__ not in cached_models:
cached_models[v['output'].__name__] = v['output']
else:
v['output'] = cached_models[v['output'].__name__]
return _endpoints_models_map

async def stream_doc(
Expand Down
Empty file.
7 changes: 7 additions & 0 deletions tests/integration/docarray_v2/docker/executor1/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jinaai/jina:test-pip

COPY . /executor_root/

WORKDIR /executor_root

ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]
Empty file.
5 changes: 5 additions & 0 deletions tests/integration/docarray_v2/docker/executor1/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
jtype: Encoder
metas:
name: EncoderPrivate
py_modules:
- executor.py
23 changes: 23 additions & 0 deletions tests/integration/docarray_v2/docker/executor1/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Optional
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
from jina import Executor, requests
import numpy as np

class MyDoc(BaseDoc):
text: str
embedding: Optional[NdArray] = None


class Encoder(Executor):
def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

@requests
def encode(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
for doc in docs:
doc.embedding = np.random.random(128)
7 changes: 7 additions & 0 deletions tests/integration/docarray_v2/docker/executor2/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jinaai/jina:test-pip

COPY . /executor_root/

WORKDIR /executor_root

ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]
Empty file.
5 changes: 5 additions & 0 deletions tests/integration/docarray_v2/docker/executor2/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
jtype: Indexer
metas:
name: IndexerPrivate
py_modules:
- executor.py
39 changes: 39 additions & 0 deletions tests/integration/docarray_v2/docker/executor2/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Optional, List
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
from docarray.index import InMemoryExactNNIndex
from jina import Executor, requests


class MyDoc(BaseDoc):
text: str
embedding: Optional[NdArray] = None


class MyDocWithMatches(MyDoc):
matches: DocList[MyDoc] = []
scores: List[float] = []


class Indexer(Executor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._indexer = InMemoryExactNNIndex[MyDoc]()

@requests(on='/index')
def index(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
self._indexer.index(docs)
return docs

@requests(on='/search')
def search(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDocWithMatches]:
res = DocList[MyDocWithMatches]()
ret = self._indexer.find_batched(docs, search_field='embedding')
matched_documents = ret.documents
matched_scores = ret.scores
for query, matches, scores in zip(docs, matched_documents, matched_scores):
output_doc = MyDocWithMatches(**query.dict())
output_doc.matches = matches
output_doc.scores = scores.tolist()
res.append(output_doc)
return res
61 changes: 61 additions & 0 deletions tests/integration/docarray_v2/docker/test_with_docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import time

import pytest
import requests as general_requests

from jina import Flow

cur_dir = os.path.dirname(os.path.abspath(__file__))


@pytest.fixture
def executor_images_built():
import docker

client = docker.from_env()
client.images.build(path=os.path.join(cur_dir, 'executor1'), tag='encoder-executor')
client.images.build(path=os.path.join(cur_dir, 'executor2'), tag='indexer-executor')
client.close()
yield
time.sleep(2)
client = docker.from_env()
client.containers.prune()


@pytest.mark.parametrize('protocol', ['http', 'grpc'])
def test_flow_with_docker(executor_images_built, protocol):
from docarray import BaseDoc, DocList
from typing import Optional, List
from docarray.typing import NdArray

class MyDoc(BaseDoc):
text: str
embedding: Optional[NdArray] = None

class MyDocWithMatches(MyDoc):
matches: DocList[MyDoc] = []
scores: List[float] = []

f = Flow(protocol=protocol).add(uses='docker://encoder-executor').add(uses='docker://indexer-executor')

with f:
if protocol == 'http':
resp = general_requests.get(f'http://localhost:{f.port}/openapi.json')
resp.json()

sentences = ['This framework generates embeddings for each input sentence',
'Sentences are passed as a list of string.',
'The quick brown fox jumps over the lazy dog.']

inputs = DocList[MyDoc]([MyDoc(text=sentence) for sentence in sentences])
f.post(on='/index', inputs=inputs)
queries = inputs[0:2]
search_results = f.post(on='/search', inputs=queries, return_type=DocList[MyDocWithMatches])

assert len(search_results) == len(queries)
for result in search_results:
assert result.text in sentences
assert len(result.matches) == len(sentences)
for m in result.matches:
assert m.text in sentences