Multi-Route¶
This example shows how to use the multi-route feature.
You will need this feature if you want to:
Serve multiple models in one service on different endpoints.
i.e. register
/embedding
&/classify
with different models
Serve one model to multiple different endpoints in one service.
i.e. register LLaMA with
/inference
and/v1/chat/completions
to make it compatible with the OpenAI API
Share a worker in different routes
The shared worker will collect the dynamic batch from multiple previous stages.
If you want to have multiple runtimes with sharing, you can declare multiple runtime instances with the same worker class.
The worker definition part is the same as for a single route. The only difference is how you register the worker with the server.
Here we expose a new concept called Runtime
.
You can create the Runtime
and register on the server with a {endpoint: [Runtime]}
dictionary.
See the complete demo code below. This will run a service with two endpoints:
/inference
withPreprocess
andInference
/v1/inference
withTypedProcess
,Inference
andTypedPostprocess
And the Inference
worker is shared between the two routes.
Server¶
multi_route_server.py
# Copyright 2023 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from msgspec import Struct
from mosec import Runtime, Server, Worker
from mosec.mixin import TypedMsgPackMixin
class Request(Struct):
"""User request struct."""
# pylint: disable=too-few-public-methods
bin: bytes
name: str = "test"
class TypedPreprocess(TypedMsgPackMixin, Worker):
"""Dummy preprocess to exit early if the validation failed."""
def forward(self, data: Request) -> Any:
"""Input will be parse as the `Request`."""
print(f"received from {data.name} with {data.bin!r}")
return data.bin
class Preprocess(Worker):
"""Dummy preprocess worker."""
def deserialize(self, data: bytes) -> Any:
return data
def forward(self, data: Any) -> Any:
return data
class Inference(Worker):
"""Dummy inference worker."""
def forward(self, data: Any) -> Any:
return [{"length": len(datum)} for datum in data]
class TypedPostprocess(TypedMsgPackMixin, Worker):
"""Dummy postprocess with msgpack."""
def forward(self, data: Any) -> Any:
return data
if __name__ == "__main__":
server = Server()
typed_pre = Runtime(TypedPreprocess)
pre = Runtime(Preprocess)
inf = Runtime(Inference, max_batch_size=16)
typed_post = Runtime(TypedPostprocess)
server.register_runtime(
{
"/v1/inference": [typed_pre, inf, typed_post],
"/inference": [pre, inf],
}
)
server.run()
Client¶
multi_route_client.py
# Copyright 2023 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from http import HTTPStatus
import httpx
import msgpack # type: ignore
typed_req = {
"bin": b"hello mosec with type check",
"name": "type check",
}
print(">> requesting for the typed route with msgpack serde")
resp = httpx.post(
"http://127.0.0.1:8000/v1/inference", content=msgpack.packb(typed_req)
)
if resp.status_code == HTTPStatus.OK:
print(f"OK: {msgpack.unpackb(resp.content)}")
else:
print(f"err[{resp.status_code}] {resp.text}")
print(">> requesting for the untyped route with json serde")
resp = httpx.post("http://127.0.0.1:8000/inference", content=b"hello mosec")
if resp.status_code == HTTPStatus.OK:
print(f"OK: {json.loads(resp.content)}")
else:
print(f"err[{resp.status_code}] {resp.text}")