Interface#

Server#

MOSEC server interface.

This module provides a way to define the service components for machine learning model serving.

Dynamic Batching

The user may enable the dynamic batching feature for any stage when the corresponding worker is appended, by setting the append_worker(max_batch_size).

Multiprocessing

The user may spawn multiple processes for any stage when the corresponding worker is appended, by setting the append_worker(num).

class mosec.server.Server[source]#

MOSEC server interface.

It allows users to sequentially append workers they implemented, builds the workflow pipeline automatically and starts up the server.

__init__()[source]#

Initialize a MOSEC Server.

register_daemon(name, proc)[source]#

Register a daemon to be monitored.

Parameters:
  • name (str) – the name of this daemon

  • proc (Popen) – the process handle of the daemon

append_worker(worker, num=1, max_batch_size=1, max_wait_time=0, start_method='spawn', env=None, timeout=0, route='/inference')[source]#

Sequentially appends workers to the workflow pipeline.

Parameters:
  • worker (Type[Worker]) – the class you inherit from Worker which implements the forward

  • num (int) – the number of processes for parallel computing (>=1)

  • max_batch_size (int) – the maximum batch size allowed (>=1), will enable the dynamic batching if it > 1

  • max_wait_time (int) – the maximum wait time (millisecond) for dynamic batching, needs to be used with max_batch_size to enable the feature. If not configure, will use the CLI argument –wait (default=10ms)

  • start_method (str) – the process starting method (“spawn” or “fork”). (DO NOT change this unless you understand the difference between them)

  • env (Optional[List[Dict[str, str]]]) – the environment variables to set before starting the process

  • timeout (int) – the timeout (second) for each worker forward processing (>=1)

  • route (Union[str, List[str]]) – the route path for this worker. If not configured, will use the default route path /inference. If a list is provided, different route paths will share the same worker.

register_runtime(routes)[source]#

Register the runtime to the routes.

run()[source]#

Start the mosec model server.

mosec.server.generate_openapi(workers)[source]#

Generate the OpenAPI specification for one pipeline.

Worker#

MOSEC worker interface.

This module provides the interface to define a worker with such behaviors:

  1. initialize

  2. serialize/deserialize data to/from another worker

  3. serialize/deserialize data to/from the client side

  4. data processing

class mosec.worker.Worker[source]#

MOSEC worker interface.

It provides default IPC (de)serialization methods, stores the worker meta data including its stage and maximum batch size, and leaves the forward method to be implemented by the users.

By default, we use JSON encoding. But users are free to customize via simply overriding the deserialize method in the first stage (we term it as ingress stage) and/or the serialize method in the last stage (we term it as egress stage).

For the encoding customization, there are many choices including MessagePack, Protocol Buffer and many other out-of-the-box protocols. Users can even define their own protocol and use it to manipulate the raw bytes! A naive customization can be found in this PyTorch example.

__init__()[source]#

Initialize the worker.

This method doesn’t require the child class to override.

serialize_ipc(data)[source]#

Define IPC serialization method.

Parameters:

data (Any) – returned data from forward()

Return type:

bytes

deserialize_ipc(data)[source]#

Define IPC deserialization method.

Parameters:

data (bytes) – input data for forward()

Return type:

Any

property stage: str#

Return the stage name.

property max_batch_size: int#

Return the maximum batch size.

property worker_id: int#

Return the ID of this worker instance.

This property returns the worker ID in the range of [1, … , num] (num as configured in append_worker(num)) to differentiate workers in the same stage.

serialize(data)[source]#

Serialize the last stage (egress).

Default response serialization method: JSON.

Check mosec.mixin for more information.

Parameters:

data (Any) – the same type as the output of the forward()

Return type:

bytes

Returns:

the bytes you want to put into the response body

Raises:

EncodingError – if the data cannot be serialized with JSON

deserialize(data)[source]#

Deserialize the first stage (ingress).

Default request deserialization method: JSON.

Check mosec.mixin for more information.

Parameters:

data (bytes) – the raw bytes extracted from the request body

Return type:

Any

Returns:

the same type as the input of the forward()

Raises:

DecodingError – if the data cannot be deserialized with JSON

abstract forward(data)[source]#

Model inference, data processing or computation logic.

Parameters:

data (Any) – input data to be processed

Return type:

Any

Must be overridden by the subclass.

If any code in this forward() needs to access other resources (e.g. a model, a memory cache, etc.), the user should initialize these resources as attributes of the class in the __init__.

Note

For a stage that enables dynamic batching, please return the results that have the same length and the same order of the input data.

Note

  • for a single-stage worker, data will go through

    <deserialize> -> <forward> -> <serialize>

  • for a multi-stage worker that is neither ingress not egress, data

    will go through <deserialize_ipc> -> <forward> -> <serialize_ipc>

classmethod get_forward_json_schema(target, ref_template)[source]#

Retrieve the JSON schema for the forward method of the class.

Parameters:
  • cls – The class object.

  • target (ParseTarget) – The target variable to parse the schema for.

  • ref_template (str) – A template to use when generating "$ref" fields.

Return type:

Tuple[Dict[str, Any], Dict[str, Any]]

Returns:

A tuple containing the schema and the component schemas.

The get_forward_json_schema() method is a class method that returns the JSON schema for the forward() method of the cls class. It takes a target param specifying the target to parse the schema for.

The returned value is a tuple containing the schema and the component schema.

Note

Developer must implement this function to retrieve the JSON schema to enable openapi spec.

Note

The MOSEC_REF_TEMPLATE constant should be used as a reference template according to openapi standards.

class mosec.worker.SSEWorker[source]#

MOSEC worker with Server-Sent Events (SSE) support.

send_stream_event(text, index=0)[source]#

Send a stream event to the client.

Parameters:
  • text (str) – the text to be sent, needs to be UTF-8 compatible

  • index (int) – the index of the stream event. For the single request, this will always be 0. For dynamic batch request, this should be the index of the request in this batch.

Runtime#

Managers to control Coordinator and Mosec process.

class mosec.runtime.Runtime(worker, num=1, max_batch_size=1, max_wait_time=10, timeout=3, start_method='spawn', env=None)[source]#

The wrapper with one worker and its arguments.

__init__(worker, num=1, max_batch_size=1, max_wait_time=10, timeout=3, start_method='spawn', env=None)[source]#

Initialize the mosec coordinator.

Parameters:
  • worker (Worker) – subclass of mosec.Worker implemented by users.

  • num (int) – number of workers

  • max_batch_size (int) – the maximum batch size allowed (>=1), will enable the dynamic batching if it > 1

  • max_wait_time (int) – the maximum wait time (millisecond) for dynamic batching, needs to be used with max_batch_size to enable the feature. If not configure, will use the CLI argument –wait (default=10ms)

  • timeout (int) – timeout (second) for the forward function.

  • start_method (str) – the process starting method (“spawn” or “fork”)

  • env (Optional[List[Dict[str, str]]]) – the environment variables to set before starting the process

Errors#

Exceptions used in the Worker.

Suppose the input dataflow of our model server is as follows:

bytes -> deserialize -> data -> parse -> valid data

If the raw bytes cannot be successfully deserialized, the DecodingError is raised; if the decoded data cannot pass the validation check (usually implemented by users), the ValidationError should be raised.

exception mosec.errors.MosecError[source]#

Bases: Exception

Mosec basic exception.

exception mosec.errors.ClientError[source]#

Bases: MosecError

Client side error.

This error indicates that the server cannot or will not process the request due to something that is perceived to be a client error. It will return the details to the client side with HTTP 400.

exception mosec.errors.ServerError[source]#

Bases: MosecError

Server side error.

This error indicates that the server encountered an unexpected condition that prevented it from fulfilling the request. It will return the details to the client side with HTTP 500.

Attention: be careful about the returned message since it may contain some sensitive information. If you don’t want to return the details, just raise an exception that is not inherited from mosec.errors.MosecError.

exception mosec.errors.EncodingError[source]#

Bases: ServerError

Serialization error.

The EncodingError should be raised in user-implemented codes when the serialization for the response bytes fails. This error will set to status code to HTTP 500 and show the details in the response.

exception mosec.errors.DecodingError[source]#

Bases: ClientError

De-serialization error.

The DecodingError should be raised in user-implemented codes when the de-serialization for the request bytes fails. This error will set the status code to HTTP 400 in the response.

exception mosec.errors.ValidationError[source]#

Bases: MosecError

Request data validation error.

The ValidationError should be raised in user-implemented codes, where the validation for the input data fails. Usually, it should be put after the data de-serialization, which converts the raw bytes into structured data. This error will set the status code to HTTP 422 in the response.

exception mosec.errors.MosecTimeoutError[source]#

Bases: BaseException

Exception raised when a MOSEC worker operation times out.

If a bug in the forward code causes the worker to hang indefinitely, a timeout can be used to ensure that the worker eventually returns control to the main thread program. When a timeout occurs, the MosecTimeout exception is raised. This exception can be caught and handled appropriately to perform any necessary cleanup tasks or return a response indicating that the operation timed out.

Note that MosecTimeout is a subclass of BaseException, not Exception. This is because timeouts should not be caught and handled in the same way as other exceptions. Instead, they should be handled in a separate except block which isn’t designed to break the working loop.

Mixins#

Provide useful mixin to extend MOSEC.

class mosec.mixin.MsgpackMixin[source]#

Bases: object

Msgpack worker mixin interface.

serialize(data)[source]#

Serialize with msgpack for the last stage (egress).

Parameters:

data (Any) – the same type as returned by Worker.forward

Return type:

bytes

Returns:

the bytes you want to put into the response body

Raises:

EncodingError – if the data cannot be serialized with msgpack

deserialize(data)[source]#

Deserialize method for the first stage (ingress).

Parameters:

data (bytes) – the raw bytes extracted from the request body

Return type:

Any

Returns:

the same type as the input of Worker.forward

Raises:

DecodingError – if the data cannot be deserialized with msgpack

class mosec.mixin.NumBinIPCMixin[source]#

Bases: object

NumBin IPC worker mixin interface.

serialize_ipc(data)[source]#

Serialize with NumBin for the IPC.

Return type:

bytes

deserialize_ipc(data)[source]#

Deserialize with NumBin for the IPC.

Return type:

Any

class mosec.mixin.PlasmaShmIPCMixin[source]#

Bases: Worker

Plasma shared memory worker mixin interface.

classmethod set_plasma_path(path)[source]#

Set the plasma service path.

serialize_ipc(data)[source]#

Save the data to the plasma server and return the id.

Return type:

bytes

deserialize_ipc(data)[source]#

Get the data from the plasma server and delete it.

Return type:

Any

class mosec.mixin.TypedMsgPackMixin[source]#

Bases: Worker

Enable request type validation with msgspec and serde with msgpack.

deserialize(data)[source]#

Deserialize and validate request with msgspec.

Return type:

Any

serialize(data)[source]#

Serialize with msgpack.

Return type:

bytes

classmethod get_forward_json_schema(target, ref_template)[source]#

Get the JSON schema of the forward function.

Return type:

Tuple[Dict[str, Any], Dict[str, Any]]

class mosec.mixin.RedisShmIPCMixin[source]#

Bases: Worker

Redis shared memory worker mixin interface.

classmethod set_redis_url(url)[source]#

Set the redis service url.

serialize_ipc(data)[source]#

Save the data to the redis server and return the id.

Return type:

bytes

deserialize_ipc(data)[source]#

Get the data from the redis server and delete it.

Return type:

Any