Echo Example#

An echo server is usually the very first server you wanna implement to get familiar with the framework.

This server sleeps for a given period and return. It is a simple illustration of how multi-stage workload is implemented. It also shows how to write a simple validation for input data.

The default JSON protocol will be used since the (de)serialization methods are not overridden in this demo. In particular, the input data of Preprocess’s forward is a dictionary decoded by JSON from the request body’s bytes; and the output dictionary of Postprocess’s forward will be JSON-encoded as a mirrored process.

echo.py#

# Copyright 2022 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Sample structures for using mosec server."""

import time
from types import MappingProxyType as ImmutableDict
from typing import List

from mosec import Server, ValidationError, Worker, get_logger

logger = get_logger()


class Preprocess(Worker):
    """Sample Class."""

    example = ImmutableDict({"time": 0})

    def forward(self, data: dict) -> float:
        logger.debug("pre received %s", data)
        # Customized, simple input validation
        try:
            count_time = float(data["time"])
        except KeyError as err:
            raise ValidationError(f"cannot find key {err}") from err
        return count_time


class Inference(Worker):
    """Sample Class."""

    example = (0, 1e-5, 2e-4)

    def forward(self, data: List[float]) -> List[float]:
        logger.info("sleeping for %s seconds", max(data))
        time.sleep(max(data))
        return data


class Postprocess(Worker):
    """Sample Class."""

    def forward(self, data: float) -> dict:
        logger.debug("post received %f", data)
        return {"msg": f"sleep {data} seconds"}


if __name__ == "__main__":
    server = Server()
    server.append_worker(Preprocess)
    server.append_worker(Inference, max_batch_size=32)
    server.append_worker(Postprocess)
    server.run()

Start#

python echo.py

Test#

http :8000/inference time=1.5