Multi-Route

This example shows how to use the multi-route feature.

You will need this feature if you want to:

  • Serve multiple models in one service on different endpoints.

    • i.e. register /embedding & /classify with different models

  • Serve one model to multiple different endpoints in one service.

    • i.e. register LLaMA with /inference and /v1/chat/completions to make it compatible with the OpenAI API

  • Share a worker in different routes

    • The shared worker will collect the dynamic batch from multiple previous stages.

    • If you want to have multiple runtimes with sharing, you can declare multiple runtime instances with the same worker class.

The worker definition part is the same as for a single route. The only difference is how you register the worker with the server.

Here we expose a new concept called Runtime.

You can create the Runtime and register on the server with a {endpoint: [Runtime]} dictionary.

See the complete demo code below. This will run a service with two endpoints:

  • /inference with Preprocess and Inference

  • /v1/inference with TypedProcess, Inference and TypedPostprocess

And the Inference worker is shared between the two routes.

Server

multi_route_server.py
# Copyright 2023 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any

from msgspec import Struct

from mosec import Runtime, Server, Worker
from mosec.mixin import TypedMsgPackMixin


class Request(Struct):
    """User request struct."""

    # pylint: disable=too-few-public-methods

    bin: bytes
    name: str = "test"


class TypedPreprocess(TypedMsgPackMixin, Worker):
    """Dummy preprocess to exit early if the validation failed."""

    def forward(self, data: Request) -> Any:
        """Input will be parse as the `Request`."""
        print(f"received from {data.name} with {data.bin!r}")
        return data.bin


class Preprocess(Worker):
    """Dummy preprocess worker."""

    def deserialize(self, data: bytes) -> Any:
        return data

    def forward(self, data: Any) -> Any:
        return data


class Inference(Worker):
    """Dummy inference worker."""

    def forward(self, data: Any) -> Any:
        return [{"length": len(datum)} for datum in data]


class TypedPostprocess(TypedMsgPackMixin, Worker):
    """Dummy postprocess with msgpack."""

    def forward(self, data: Any) -> Any:
        return data


if __name__ == "__main__":
    server = Server()
    typed_pre = Runtime(TypedPreprocess)
    pre = Runtime(Preprocess)
    inf = Runtime(Inference, max_batch_size=16)
    typed_post = Runtime(TypedPostprocess)
    server.register_runtime(
        {
            "/v1/inference": [typed_pre, inf, typed_post],
            "/inference": [pre, inf],
        }
    )
    server.run()

Client

multi_route_client.py
# Copyright 2023 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from http import HTTPStatus

import httpx
import msgpack  # type: ignore

typed_req = {
    "bin": b"hello mosec with type check",
    "name": "type check",
}

print(">> requesting for the typed route with msgpack serde")
resp = httpx.post(
    "http://127.0.0.1:8000/v1/inference", content=msgpack.packb(typed_req)
)
if resp.status_code == HTTPStatus.OK:
    print(f"OK: {msgpack.unpackb(resp.content)}")
else:
    print(f"err[{resp.status_code}] {resp.text}")

print(">> requesting for the untyped route with json serde")
resp = httpx.post("http://127.0.0.1:8000/inference", content=b"hello mosec")
if resp.status_code == HTTPStatus.OK:
    print(f"OK: {json.loads(resp.content)}")
else:
    print(f"err[{resp.status_code}] {resp.text}")