8000 refactor: merge grpc and rest client by bwanglzu · Pull Request #2565 · jina-ai/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

refactor: merge grpc and rest client #2565

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3f61b1a
refactor: merge grpc and rest client
bwanglzu Jun 5, 2021
eae9f38
feat: refine client
bwanglzu Jun 7, 2021
19a6d6e
feat: refine client parser
bwanglzu Jun 7, 2021
365c115
feat: refine client parser
bwanglzu Jun 7, 2021
9689dd5
feat: create client method
bwanglzu Jun 8, 2021
24b8737
feat: finish add client method
bwanglzu Jun 8, 2021
54301be
feat: add docstring for client
bwanglzu Jun 8, 2021
f9cc07b
feat: fix black and docstring
bwanglzu Jun 8, 2021
d32db32
feat: init client it
bwanglzu Jun 8, 2021
5fbc09c
feat: unify parameter name
bwanglzu Jun 8, 2021
0cf19f5
feat: unify parameter names
bwanglzu Jun 8, 2021
f6371ba
feat: fix unit tests
bwanglzu Jun 8, 2021
f796823
feat: fix io test
bwanglzu Jun 8, 2021
dbf2238
feat: fix distribute test
bwanglzu Jun 8, 2021
a31d81f
feat: add grpc client back to distributed test
bwanglzu Jun 8, 2021
e7e4630
feat: add grpc client back to distributed test
bwanglzu Jun 8, 2021
5d0e386
feat: add grpc client back to distributed test
bwanglzu Jun 8, 2021
bec39b1
feat: add grpc client back to distributed test
bwanglzu Jun 8, 2021
9bfb778
feat: update client assignment in base 8000
bwanglzu Jun 8, 2021
dfa4e51
feat: remove unused fixture
bwanglzu Jun 8, 2021
6fd1461
feat: revert client initialise
bwanglzu Jun 8, 2021
600557d
feat: revert client initialise
bwanglzu Jun 8, 2021
713407c
feat: unify grpc client and grpc runtime
bwanglzu Jun 9, 2021
8ae6a1f
feat: parse args using namespace
bwanglzu Jun 9, 2021
6e58613
feat: parse args using namespace
bwanglzu Jun 9, 2021
e77f8ae
feat: add overload to Client
bwanglzu Jun 9, 2021
8aba9f1
Merge remote-tracking branch 'origin' into refactor-client
bwanglzu Jun 9, 2021
e423d7c
feat: use overload to refactor client
bwanglzu Jun 9, 2021
45c4a86
feat: remove unused imports
bwanglzu Jun 9, 2021
5c4b1fe
feat: fix flake8
bwanglzu Jun 9, 2021
49a4810
feat: fix inject script for method outside class
bwanglzu Jun 9, 2021
68a77d0
feat: fix inject script for method outside class
bwanglzu Jun 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/2.0/cookbooks/Flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ c.post('/')
```

```text
Client@27219[S]:connected to the gateway at 192.168.1.14:12345!
GRPCClient@27219[S]:connected to the gateway at 192.168.1.14:12345!
|█ | 📃 100 ⏱️ 0.0s 🐎 26690.1/s 1 requests takes 0 seconds (0.00s)
✅ done in ⏱ 0 seconds 🐎 24854.8/s
```
Expand Down Expand Up @@ -834,17 +834,17 @@ When use `curl`, make sure to pass the `-N/--no-buffer` flag.

### Python Client with REST Request

In some case when `restful=True`, you may still need a Python client to query the server for debugging. You can use `WebsocketClient` in this case.
In some case when `restful=True`, you may still need a Python client to query the server for debugging. You can use `Client` with an additional parameter `restful` in this case.

```python
from jina.clients import WebSocketClient
from jina import Client

c = WebSocketClient(host='192.168.1.14', port_expose=12345)
c = Client(host='192.168.1.14', port_expose=12345, restful=True)
c.post('/')
```

```text
WebSocketClient@27622[S]:Connected to the gateway at 192.168.1.14:12345
WebSocketClient@27622[S]:Connected to the gateway at 192.168.1.14:12345
|█ | 📃 100 ⏱️ 0.0s 🐎 19476.6/s 1 requests takes 0 seconds (0.00s)
✅ done in ⏱ 0 seconds 🐎 18578.9/s
```
Expand Down
61 changes: 56 additions & 5 deletions jina/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,79 @@
"""Module wrapping the Client of Jina."""
import argparse
from typing import overload, Optional

from .base import BaseClient, CallbackFnType, InputType, InputDeleteType
from .helper import callback_exec
from .mixin import PostMixin
from .request import GeneratorSourceType
from .websocket import WebSocketClientMixin

__all__ = ['Client', 'WebSocketClient']
__all__ = ['Client', 'GRPCClient', 'WebSocketClient']


# overload_inject_start_client
@overload
def Client(
continue_on_error: Optional[bool] = False,
host: Optional[str] = '0.0.0.0',
port_expose: Optional[int] = None,
proxy: Optional[bool] = False,
request_size: Optional[int] = 100,
restful: Optional[bool] = False,
return_results: Optional[bool] = False,
show_progress: Optional[bool] = False,
**kwargs
) -> 'BaseClient':
"""Create a Client. Client is how user interact with Flow

:param continue_on_error: If set, a Request that causes error will be logged only without blocking the further requests.
:param host: The host address of the runtime, by default it is 0.0.0.0.
:param port_expose: The port of the host exposed to the public
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param request_size: The number of Documents in each Request.
:param restful: If set, use RESTful interface instead of gRPC as the main interface
:param return_results: This feature is only used for AsyncClient.

If set, the results of all Requests will be returned as a list. This is useful when one wants
process Responses in bulk instead of using callback.
:param show_progress: If set, client will show a progress bar on receiving every request.
:return: the new Client object

.. # noqa: DAR202
.. # noqa: DAR101
.. # noqa: DAR003
"""
# overload_inject_end_client


def Client(args: Optional['argparse.Namespace'] = None, **kwargs) -> 'BaseClient':
"""Jina Python client.

class Client(PostMixin, BaseClient):
:param args: Namespace args.
:param kwargs: Additional arguments.
:return: An instance of :class:`GRPCClient` or :class:`WebSocketClient`.
"""
is_restful = kwargs.get('restful', False)
if is_restful:
return WebSocketClient(args, **kwargs)
else:
return GRPCClient(args, **kwargs)


class GRPCClient(PostMixin, BaseClient):
"""A simple Python client for connecting to the gRPC gateway.

It manages the asyncio event loop internally, so all interfaces are synchronous from the outside.
"""

@property
def client(self) -> 'Client':
def client(self) -> 'GRPCClient':
"""Return the client object itself

.. # noqa: DAR201"""
return self


class WebSocketClient(Client, WebSocketClientMixin):
class WebSocketClient(GRPCClient, WebSocketClientMixin):
"""A Python Client to stream requests from a Flow with a REST Gateway.

:class:`WebSocketClient` shares the same interface as :class:`Client` and provides methods like
Expand Down
4 changes: 2 additions & 2 deletions jina/clients/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class PostMixin:
"""The Post Mixin class for Client and Flow """
"""The Post Mixin class for Client and Flow"""

def post(
self,
Expand Down Expand Up @@ -63,7 +63,7 @@ async def _get_results(*args, **kwargs):


class AsyncPostMixin:
"""The Async Post Mixin class for AsyncClient and AsyncFlow """
"""The Async Post Mixin class for AsyncClient and AsyncFlow"""

async def post(
self,
Expand Down
18 changes: 10 additions & 8 deletions jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

from .builder import build_required, _build_flow, _hanging_pods
from .. import __default_host__
from ..clients import Client, WebSocketClient
from ..clients.base import BaseClient
from ..clients import GRPCClient, WebSocketClient
from ..enums import FlowBuildLevel, PodRoleType, FlowInspectType
from ..excepts import FlowTopologyError, FlowMissingPodError
from ..helper import (
Expand Down Expand Up @@ -68,7 +69,9 @@ class BaseFlow(JAMLCompatible, ExitStack, metaclass=FlowType):
:param env: environment variables shared by all Pods
"""

_cls_client = Client #: the type of the Client, can be changed to other class
_cls_client = (
GRPCClient #: the type of the GRPCClient, can be changed to other class
)

# overload_inject_start_flow
@overload
Expand Down Expand Up @@ -211,7 +214,7 @@ def _add_gateway(self, needs, **kwargs):
name=pod_name,
ctrl_with_ipc=True, # otherwise ctrl port would be conflicted
runtime_cls='GRPCRuntime'
if self._cls_client == Client
if self._cls_client == GRPCClient
else 'RESTRuntime',
pod_role=PodRoleType.GATEWAY,
identity=self.args.identity,
A93C Expand Down Expand Up @@ -800,8 +803,8 @@ def __eq__(self, other: 'BaseFlow') -> bool:

@property
@build_required(FlowBuildLevel.GRAPH)
def client(self) -> 'Client':
"""Return a :class:`Client` object attach to this Flow.
def client(self) -> 'BaseClient':
"""Return a :class:`BaseClient` object attach to this Flow.

.. # noqa: DAR201"""
kwargs = {}
Expand All @@ -816,7 +819,6 @@ def client(self) -> 'Client':
# show progress when client is used inside the flow, for better log readability
if 'show_progress' not in kwargs:
args.show_progress = True

return self._cls_client(args)

@property
Expand Down Expand Up @@ -1093,7 +1095,7 @@ def _show_success_message(self):
self.logger.info('\n' + '\n'.join(address_table))

def block(self):
"""Block the process until user hits KeyboardInterrupt """
"""Block the process until user hits KeyboardInterrupt"""
try:
threading.Event().wait()
except KeyboardInterrupt:
Expand All @@ -1111,7 +1113,7 @@ def use_grpc_gateway(self, port: Optional[int] = None):

def _switch_gateway(self, gateway: str, port: int):
restful = gateway == 'RESTRuntime'
client = WebSocketClient if gateway == 'RESTRuntime' else Client
client = WebSocketClient if gateway == 'RESTRuntime' else GRPCClient

# globally register this at Flow level
self._cls_client = client
Expand Down
7 changes: 7 additions & 0 deletions jina/parsers/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ def mixin_client_cli_parser(parser):
process Responses in bulk instead of using callback.
''',
)

gp.add_argument(
'--restful',
action='store_true',
default=False,
help='If set, use RESTful interface instead of gRPC as the main interface',
)
94 changes: 67 additions & 27 deletions scripts/inject-cli-as-overload.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _cli_to_schema(
return_type='BaseFlow',
filepath='../jina/flow/base.py',
overload_fn='add',
class_method=True, # if it is a method inside class.
),
dict(
cli_entrypoint='flow',
Expand All @@ -65,6 +66,16 @@ def _cli_to_schema(
return_type=None,
filepath='../jina/flow/base.py',
overload_fn='__init__',
class_method=True,
),
dict(
cli_entrypoint='client',
doc_str_title='Create a Client. Client is how user interact with Flow',
doc_str_return='the new Client object',
return_type='BaseClient',
filepath='../jina/clients/__init__.py',
overload_fn='Client',
class_method=False,
),
]

Expand All @@ -76,37 +87,66 @@ def fill_overload(
return_type,
filepath,
overload_fn,
class_method,
indent=' ' * 4,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, isn't the modification the same as fill_overload(..., indent=' '*2)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you revert it and simply pass indent=' '*2

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you're right, working on it now

Copy link
Member Author
@bwanglzu bwanglzu Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hanxiao it's not only about indent, the script will also insert a self inside the method etc..and we have different levels of indent such as no indent (before overload and method name), 4 indents for docstrings etc..

this is what happened after i change indent = ' ' * 2:

C48D9157-14A7-40FA-88AF-8367A1A16BC8

):
a = _cli_to_schema(api_to_dict(), cli_entrypoint)
cli_args = [
f'{indent}{indent}{k[0]}: Optional[{k[1]["type"]}] = {k[1]["default_literal"]}'
for k in a
]
args_str = ', \n'.join(cli_args + [f'{indent}{indent}**kwargs'])
signature_str = f'def {overload_fn}(\n{indent}{indent}self,\n{args_str})'
if return_type:
signature_str += f' -> \'{return_type}\':'
return_str = f'\n{indent}{indent}:return: {doc_str_return}'
if class_method:
cli_args = [
f'{indent}{indent}{k[0]}: Optional[{k[1]["type"]}] = {k[1]["default_literal"]}'
for k in a
]
args_str = ', \n'.join(cli_args + [f'{indent}{indent}**kwargs'])
signature_str = f'def {overload_fn}(\n{indent}{indent}self,\n{args_str})'
if return_type:
signature_str += f' -> \'{return_type}\':'
return_str = f'\n{indent}{indent}:return: {doc_str_return}'
else:
signature_str += ':'
return_str = ''
else:
cli_args = [
f'{indent}{k[0]}: Optional[{k[1]["type"]}] = {k[1]["default_literal"]}'
for k in a
]
args_str = ', \n'.join(cli_args + [f'{indent}**kwargs'])
signature_str = f'def {overload_fn}(\n{args_str})'
if return_type:
signature_str += f' -> \'{return_type}\':'
return_str = f'\n{indent}:return: {doc_str_return}'
else:
signature_str += ':'
return_str = ''
if class_method:
doc_str = '\n'.join(
f'{indent}{indent}:param {k[0]}: {k[1]["description"]}' for k in a
)
noqa_str = '\n'.join(
f'{indent}{indent}.. # noqa: DAR{j}' for j in ['202', '101', '003']
)
else:
doc_str = '\n'.join(f'{indent}:param {k[0]}: {k[1]["description"]}' for k in a)
noqa_str = '\n'.join(
f'{indent}.. # noqa: DAR{j}' for j in ['202', '101', '003']
)
if class_method:
final_str = f'@overload\n{indent}{signature_str}\n{indent}{indent}"""{doc_str_title}\n\n{doc_str}{return_str}\n\n{noqa_str}\n{indent}{indent}"""'
final_code = re.sub(
rf'(# overload_inject_start_{cli_entrypoint}).*(# overload_inject_end_{cli_entrypoint})',
f'\\1\n{indent}{final_str}\n{indent}\\2',
open(filepath).read(),
0,
re.DOTALL,
)
else:
signature_str += ':'
return_str = ''
doc_str = '\n'.join(
f'{indent}{indent}:param {k[0]}: {k[1]["description"]}' for k in a
)
noqa_str = '\n'.join(
f'{indent}{indent}.. # noqa: DAR{j}' for j in ['202', '101', '003']
)

final_str = f'@overload\n{indent}{signature_str}\n{indent}{indent}"""{doc_str_title}\n\n{doc_str}{return_str}\n\n{noqa_str}\n{indent}{indent}"""'

final_code = re.sub(
rf'(# overload_inject_start_{cli_entrypoint}).*(# overload_inject_end_{cli_entrypoint})',
f'\\1\n{indent}{final_str}\n{indent}\\2',
open(filepath).read(),
0,
re.DOTALL,
)
final_str = f'@overload\n{signature_str}\n{indent}"""{doc_str_title}\n\n{doc_str}{return_str}\n\n{noqa_str}\n{indent}"""'
final_code = re.sub(
rf'(# overload_inject_start_{cli_entrypoint}).*(# overload_inject_end_{cli_entrypoint})',
f'\\1\n{final_str}\n{indent}\\2',
open(filepath).read(),
0,
re.DOTALL,
)

with open(filepath, 'w') as fp:
fp.write(final_code)
Expand Down
18 changes: 14 additions & 4 deletions tests/distributed/test_join_local_from_remote/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from jina import Document
from jina.clients import Client
from jina.clients import Client, GRPCClient
from jina.parsers import set_client_cli_parser
from tests import validate_callback
from ..helpers import create_flow, assert_request
Expand All @@ -23,16 +23,26 @@ def doc_to_index():

@pytest.fixture
def client():
return Client(host='localhost', port_expose=45678)


@pytest.fixture
def grpc_client():
args = set_client_cli_parser().parse_args(
['--host', 'localhost', '--port-expose', '45678']
)

return Client(args)
return GRPCClient(args)


@pytest.fixture(params=['client', 'grpc_client'])
def client_instance(request):
return request.getfixturevalue(request.param)


@pytest.mark.timeout(360)
@pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose'])
def test_flow(docker_compose, doc_to_index, client, mocker):
def test_flow(docker_compose, doc_to_index, client_instance, mocker):
def validate_resp(resp):
assert len(resp.data.docs) == 2
assert resp.data.docs[0].text == 'test'
Expand All @@ -41,7 +51,7 @@ def validate_resp(resp):
mock = mocker.Mock()
flow_id = create_flow(flow_yaml=flow_yaml, pod_dir=os.path.join(cur_dir, 'pods'))

client.search(inputs=[doc_to_index], on_done=mock)
client_instance.search(inputs=[doc_to_index], on_done=mock)

assert_request(method='get', url=f'http://localhost:8000/flows/{flow_id}')

Expand Down
Loading
0