8000 Start uvicorn server in multiple process as per worker count by Suresh-Nakkeran · Pull Request #2573 · kserve/kserve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Start uvicorn server in multiple process as per worker count #2573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 24, 2022
Merged
2 changes: 1 addition & 1 deletion .github/actions/minikube-setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ runs:
minikube version: 'v1.28.0'
kubernetes version: 'v1.23.15'
driver: 'none'
start args: --wait-timeout=60s
start args: --wait-timeout=120s
github token: ${{ env.GITHUB_TOKEN }}
- name: Setup port-forward
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- '.github/workflows/e2e-test.yml'

env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

jobs:
kserve-image-build:
Expand Down
65 changes: 49 additions & 16 deletions python/custom_model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
from PIL import Image
import base64
import io
import numpy as np
from kserve.utils.utils import generate_uuid


# This custom predictor example implements the custom model following KServe REST v1/v2 protocol,
# the input can be raw image base64 encoded bytes or image tensor which is pre-processed by transformer
# and then passed to predictor, the output is the prediction response.
class AlexNetModel(kserve.Model):
def __init__(self, name: str):
super().__init__(name)
Expand All @@ -33,37 +38,65 @@ def load(self):
self.model.eval()
self.ready = True

def predict(self, payload: Dict, headers: Dict[str, str] = None) -> Dict:
inputs = payload["instances"]
def preprocess(self, payload: Dict, headers: Dict[str, str] = None) -> torch.Tensor:
raw_img_data = None
if "instances" in payload:
headers["request-type"] = "v1"
if "data" in payload["instances"][0]:
# assume the data is already preprocessed in transformer
input_tensor = torch.Tensor(np.asarray(payload["instances"][0]["data"]))
return input_tensor.unsqueeze(0)
elif "image" in payload["instances"][0]:
# Input follows the Tensorflow V1 HTTP API for binary values
# https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
data = payload["instances"][0]["image"]["b64"]
raw_img_data = base64.b64decode(data)
elif "inputs" in payload:
headers["request-type"] = "v2"
inputs = payload["inputs"]
data = inputs[0]["data"][0]
if inputs[0]["datatype"] == "BYTES":
raw_img_data = base64.b64decode(data)
elif inputs[0]["datatype"] == "FP32":
# assume the data is already preprocessed in transformer
input_tensor = torch.Tensor(np.asarray(data))
return input_tensor.unsqueeze(0)

# Input follows the Tensorflow V1 HTTP API for binary values
# https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
data = inputs[0]["image"]["b64"]

raw_img_data = base64.b64decode(data)
input_image = Image.open(io.BytesIO(raw_img_data))

preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])

input_tensor = preprocess(input_image)
input_batch = input_tensor.unsqueeze(0)

output = self.model(input_batch)
return input_tensor.unsqueeze(0)

def predict(self, input_tensor: torch.Tensor, headers: Dict[str, str] = None) -> Dict:
output = self.model(input_tensor)
torch.nn.functional.softmax(output, dim=1)

values, top_5 = torch.topk(output, 5)

return {"predictions": values.tolist()}
if headers["request-type"] == "v1":
return {"predictions": values.tolist()}
else:
result = values.tolist()
response_id = generate_uuid()
response = {
"id": response_id,
"model_name": "custom-model",
"outputs": [
{
"data": result,
"datatype": "FP32",
"name": "output-0",
"shape": list(values.shape)
}
]}
return response


if __name__ == "__main__":
model = AlexNetModel("custom-model")
model.load()
kserve.ModelServer(workers=1).start([model])
kserve.ModelServer().start([model])
97 changes: 39 additions & 58 deletions python/custom_model/model_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import io
from typing import Dict, Union
from typing import Dict
import numpy as np

import kserve
import torch
from kserve.grpc.grpc_predict_v2_pb2 import (ModelInferRequest,
ModelInferResponse)
from kserve.grpc.grpc_predict_v2_pb2 import ModelInferRequest
from kserve.utils.utils import generate_uuid
from PIL import Image
from torchvision import models, transforms


# This custom predictor example implements the custom model following KServe v2 inference gPPC protocol,
# the input can be raw image bytes or image tensor which is pre-processed by transformer
# and then passed to predictor, the output is the prediction response.
class AlexNetModel(kserve.Model):
def __init__(self, name: str):
super().__init__(name)
Expand All @@ -38,70 +40,49 @@ def load(self):
self.model.eval()
self.ready = True

def predict(
self,
payload: Union[Dict, ModelInferRequest],
headers: Dict[str, str] = None
) -> Union[Dict, ModelInferResponse]:
raw_img_data = ""
if isinstance(payload, Dict):
input = payload["inputs"][0]
# Input follows the Tensorflow V1 HTTP API for binary values
# https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
data = input["data"][0]
raw_img_data = base64.b64decode(data)
elif isinstance(payload, ModelInferRequest):
req = payload.inputs[0]
def preprocess(self, payload: ModelInferRequest, headers: Dict[str, str] = None) -> torch.Tensor:
req = payload.inputs[0]
if req.datatype == "BYTES":
raw_img_data = req.contents.bytes_contents[0]
input_image = Image.open(io.BytesIO(raw_img_data))
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])

input_image = Image.open(io.BytesIO(raw_img_data))
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])

input_tensor = preprocess(input_image)
input_batch = input_tensor.unsqueeze(0)

output = self.model(input_batch)
input_tensor = preprocess(input_image)
return input_tensor.unsqueeze(0)
elif req.datatype == "FP32":
result = np.frombuffer(payload.raw_input_contents[0], dtype="float32")
batched_result = np.reshape(result, req.shape)
return torch.Tensor(batched_result)

def predict(self, input_tensor: torch.Tensor, headers: Dict[str, str] = None) -> Dict:
output = self.model(input_tensor)
torch.nn.functional.softmax(output, dim=1)
values, top_5 = torch.topk(output, 5)
result = values.tolist()
id = generate_uuid()
if isinstance(payload, Dict):
response = {
"id": id,
"model_name": self.name,
"outputs": [
{
"data": result,
"datatype": "FP32",
"name": "output-0",
"shape": list(values.shape)
}
]}
else:
response = {
"id": id,
"model_name": payload.model_name,
"outputs": [
{
"contents": {
"fp32_contents": result[0],
},
"datatype": "FP32",
"name": "output-0",
"shape": list(values.shape)
}
]}
response = {
"id": id,
"model_name": "custom-model",
"outputs": [
{
"contents": {
"fp32_contents": result[0],
},
"datatype": "FP32",
"name": "output-0",
"shape": list(values.shape)
}
]}
return response


if __name__ == "__main__":
model = AlexNetModel("custom-model")
model.load()
kserve.ModelServer(workers=1).start([model])
kserve.ModelServer().start([model])
38 changes: 24 additions & 14 deletions python/custom_transformer/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def image_transform(instance):
byte_array = base64.b64decode(instance["image"]["b64"])
image = Image.open(io.BytesIO(byte_array))
tensor = image_processing(image).numpy()
print(tensor.shape)
return tensor


Expand All @@ -57,31 +56,42 @@ def preprocess(self, payload: Dict, headers: Dict[str, str] = None) -> ModelInfe
# Input follows the Tensorflow V1 HTTP API for binary values
# https://www.tensorflow.org/tfx/serving/api_rest#encoding_binary_values
input_tensors = [image_transform(instance) for instance in payload["instances"]]

input_tensors = numpy.asarray(input_tensors)
print(input_tensors.shape)
# Transform to KServe v1/v2 inference protocol
if self.protocol == PredictorProtocol.GRPC_V2.value:
return self.v2_request_transform(numpy.asarray(input_tensors))
else:
return self.v2_request_transform(input_tensors)
elif self.protocol == PredictorProtocol.REST_V1.value:
inputs = [{"data": input_tensor.tolist()} for input_tensor in input_tensors]
payload = {"instances": inputs}
return payload
else:
return {
'inputs': [
{
'name': "INPUT__0",
'shape': input_tensors.shape,
'datatype': 'FP32',
'data': input_tensors.tolist()
}
]
}

def v2_request_transform(self, input_tensors):
request = ModelInferRequest()
request.model_name = self.name
tensor = {
payload = [{
'name': "INPUT__0",
'shape': input_tensors.shape,
'datatype': "FP32",
}
request.inputs.extend([tensor])
request.raw_input_contents.extend([input_tensors.tobytes()])
return request
'datatype': "FP32"
}]
return ModelInferRequest(model_name=self.name, inputs=payload,
raw_input_contents=[input_tensors.tobytes()])

def postprocess(self, infer_response: ModelInferResponse, headers: Dict[str, str] = None) -> Dict:
if self.protocol == PredictorProtocol.GRPC_V2.value:
res = super.postprocess(infer_response, headers)
return {"predictions": res["contents"]["fp32_contents"]}
res = super().postprocess(infer_response, headers)
return {"predictions": res["outputs"][0]["data"]}
elif self.protocol == PredictorProtocol.REST_V2.value:
return {"predictions": infer_response["outputs"][0]["data"]}
else:
return infer_response

Expand Down
2 changes: 1 addition & 1 deletion python/custom_transformer/model_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, name: str, predictor_host: str, protocol: str):
self.protocol = protocol
self.model_name = name

def preprocess(self, request: Union[Dict, ModelInferRequest], headers=None) -> ModelInferRequest:
def preprocess(self, request: Union[Dict, ModelInferRequest], headers: Dict[str, str] = None) -> ModelInferRequest:
if isinstance(request, ModelInferRequest):
return request
else:
Expand Down
5 changes: 5 additions & 0 deletions python/kserve/kserve/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ async def inference_error_handler(_, exc):
return JSONResponse(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, content={"error": str(exc)})


async def generic_exception_handler(_, exc):
return JSONResponse(status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
content={"error": f"{type(exc).__name__} : {str(exc)}"})


async def model_not_found_handler(_, exc):
return JSONResponse(status_code=HTTPStatus.NOT_FOUND, content={"error": str(exc)})

Expand Down
10 changes: 9 additions & 1 deletion python/kserve/kserve/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from grpc import aio


MAX_GRPC_MESSAGE_LENGTH = 8388608


class GRPCServer:
def __init__(
self,
Expand All @@ -43,7 +46,12 @@ async def start(self, max_workers):
self._model_repository_extension)
self._server = aio.server(
futures.ThreadPoolExecutor(max_workers=max_workers),
interceptors=(LoggingInterceptor(),)
interceptors=(LoggingInterceptor(),),
options=[
("grpc.max_message_length", MAX_GRPC_MESSAGE_LENGTH),
("grpc.max_send_message_length", MAX_GRPC_MESSAGE_LENGTH),
("grpc.max_receive_message_length", MAX_GRPC_MESSAGE_LENGTH)
]
)
grpc_predict_v2_pb2_grpc.add_GRPCInferenceServiceServicer_to_server(
inference_servicer, self._server)
Expand Down
Loading
0