Interface¶
Server¶
MOSEC server interface.
This module provides a way to define the service components for machine learning model serving.
Dynamic Batching
The user may enable the dynamic batching feature for any stage when the corresponding worker is appended, by setting the
append_worker(max_batch_size)
.
Multiprocessing
The user may spawn multiple processes for any stage when the corresponding worker is appended, by setting the
append_worker(num)
.
- class mosec.server.Server[source]¶
MOSEC server interface.
It allows users to sequentially append workers they implemented, builds the workflow pipeline automatically and starts up the server.
- register_daemon(name, proc)[source]¶
Register a daemon to be monitored.
- Parameters:
name (
str
) – the name of this daemonproc (
Popen
) – the process handle of the daemon
- append_worker(worker, num=1, max_batch_size=1, max_wait_time=0, start_method='spawn', env=None, timeout=0, route='/inference')[source]¶
Sequentially appends workers to the workflow pipeline.
- Parameters:
worker (
Type
[Worker
]) – the class you inherit fromWorker
which implements theforward
num (
int
) – the number of processes for parallel computing (>=1)max_batch_size (
int
) – the maximum batch size allowed (>=1), will enable the dynamic batching if it > 1max_wait_time (
int
) – the maximum wait time (millisecond) for dynamic batching, needs to be used with max_batch_size to enable the feature. If not configure, will use the CLI argument –wait (default=10ms)start_method (
str
) – the process starting method (“spawn” or “fork”). (DO NOT change this unless you understand the difference between them)env (
Optional
[List
[Dict
[str
,str
]]]) – the environment variables to set before starting the processtimeout (
int
) – the timeout (second) for each worker forward processing (>=1)route (
Union
[str
,List
[str
]]) – the route path for this worker. If not configured, will use the default route path /inference. If a list is provided, different route paths will share the same worker.
Worker¶
MOSEC worker interface.
This module provides the interface to define a worker with such behaviors:
initialize
serialize/deserialize data to/from another worker
serialize/deserialize data to/from the client side
data processing
- class mosec.worker.Worker[source]¶
MOSEC worker interface.
It provides default IPC (de)serialization methods, stores the worker meta data including its stage and maximum batch size, and leaves the
forward
method to be implemented by the users.By default, we use JSON encoding. But users are free to customize via simply overriding the
deserialize
method in the first stage (we term it as ingress stage) and/or theserialize
method in the last stage (we term it as egress stage).For the encoding customization, there are many choices including MessagePack, Protocol Buffer and many other out-of-the-box protocols. Users can even define their own protocol and use it to manipulate the raw bytes! A naive customization can be found in this PyTorch example.
- serialize_ipc(data)[source]¶
Define IPC serialization method.
- Parameters:
data (
Any
) – returned data fromforward()
- Return type:
bytes
- deserialize_ipc(data)[source]¶
Define IPC deserialization method.
- Parameters:
data (
bytes
) – input data forforward()
- Return type:
Any
- property stage: str¶
Return the stage name.
- property max_batch_size: int¶
Return the maximum batch size.
- property worker_id: int¶
Return the ID of this worker instance.
This property returns the worker ID in the range of [1, … ,
num
] (num
as configured inappend_worker(num)
) to differentiate workers in the same stage.
- serialize(data)[source]¶
Serialize the last stage (egress).
Default response serialization method: JSON.
Check
mosec.mixin
for more information.- Parameters:
data (
Any
) – the same type as the output of theforward()
- Return type:
bytes
- Returns:
the bytes you want to put into the response body
- Raises:
EncodingError – if the data cannot be serialized with JSON
- deserialize(data)[source]¶
Deserialize the first stage (ingress).
Default request deserialization method: JSON.
Check
mosec.mixin
for more information.- Parameters:
data (
bytes
) – the raw bytes extracted from the request body- Return type:
Any
- Returns:
the same type as the input of the
forward()
- Raises:
DecodingError – if the data cannot be deserialized with JSON
- abstract forward(data)[source]¶
Model inference, data processing or computation logic.
- Parameters:
data (
Any
) – input data to be processed- Return type:
Any
Must be overridden by the subclass.
If any code in this
forward()
needs to access other resources (e.g. a model, a memory cache, etc.), the user should initialize these resources as attributes of the class in the__init__
.Note
For a stage that enables dynamic batching, please return the results that have the same length and the same order of the input data.
Note
- for a single-stage worker, data will go through
<deserialize> -> <forward> -> <serialize>
- for a multi-stage worker that is neither ingress not egress, data
will go through
<deserialize_ipc> -> <forward> -> <serialize_ipc>
- classmethod get_forward_json_schema(target, ref_template)[source]¶
Retrieve the JSON schema for the forward method of the class.
- Parameters:
cls – The class object.
target (
ParseTarget
) – The target variable to parse the schema for.ref_template (
str
) – A template to use when generating"$ref"
fields.
- Return type:
Tuple
[Dict
[str
,Any
],Dict
[str
,Any
]]- Returns:
A tuple containing the schema and the component schemas.
The
get_forward_json_schema()
method is a class method that returns the JSON schema for theforward()
method of thecls
class. It takes atarget
param specifying the target to parse the schema for.The returned value is a tuple containing the schema and the component schema.
Note
Developer must implement this function to retrieve the JSON schema to enable openapi spec.
Note
The
MOSEC_REF_TEMPLATE
constant should be used as a reference template according to openapi standards.
- class mosec.worker.SSEWorker[source]¶
MOSEC worker with Server-Sent Events (SSE) support.
- send_stream_event(text, index=0)[source]¶
Send a stream event to the client.
- Parameters:
text (
str
) – the text to be sent, needs to be UTF-8 compatibleindex (
int
) – the index of the stream event. For the single request, this will always be 0. For dynamic batch request, this should be the index of the request in this batch.
Runtime¶
Managers to control Coordinator and Mosec process.
- class mosec.runtime.Runtime(worker, num=1, max_batch_size=1, max_wait_time=10, timeout=3, start_method='spawn', env=None)[source]¶
The wrapper with one worker and its arguments.
- __init__(worker, num=1, max_batch_size=1, max_wait_time=10, timeout=3, start_method='spawn', env=None)[source]¶
Initialize the mosec coordinator.
- Parameters:
worker (Worker) – subclass of mosec.Worker implemented by users.
num (int) – number of workers
max_batch_size (
int
) – the maximum batch size allowed (>=1), will enable the dynamic batching if it > 1max_wait_time (int) – the maximum wait time (millisecond) for dynamic batching, needs to be used with max_batch_size to enable the feature. If not configure, will use the CLI argument –wait (default=10ms)
timeout (int) – timeout (second) for the forward function.
start_method (
str
) – the process starting method (“spawn” or “fork”)env (
Optional
[List
[Dict
[str
,str
]]]) – the environment variables to set before starting the process
Errors¶
Exceptions used in the Worker.
Suppose the input dataflow of our model server is as follows:
bytes ->
deserialize ->
data ->
parse ->
valid data
If the raw bytes cannot be successfully deserialized, the DecodingError is raised; if the decoded data cannot pass the validation check (usually implemented by users), the ValidationError should be raised.
- exception mosec.errors.ClientError[source]¶
Bases:
MosecError
Client side error.
This error indicates that the server cannot or will not process the request due to something that is perceived to be a client error. It will return the details to the client side with HTTP 400.
- exception mosec.errors.ServerError[source]¶
Bases:
MosecError
Server side error.
This error indicates that the server encountered an unexpected condition that prevented it from fulfilling the request. It will return the details to the client side with HTTP 500.
Attention: be careful about the returned message since it may contain some sensitive information. If you don’t want to return the details, just raise an exception that is not inherited from mosec.errors.MosecError.
- exception mosec.errors.EncodingError[source]¶
Bases:
ServerError
Serialization error.
The EncodingError should be raised in user-implemented codes when the serialization for the response bytes fails. This error will set to status code to HTTP 500 and show the details in the response.
- exception mosec.errors.DecodingError[source]¶
Bases:
ClientError
De-serialization error.
The DecodingError should be raised in user-implemented codes when the de-serialization for the request bytes fails. This error will set the status code to HTTP 400 in the response.
- exception mosec.errors.ValidationError[source]¶
Bases:
MosecError
Request data validation error.
The ValidationError should be raised in user-implemented codes, where the validation for the input data fails. Usually, it should be put after the data de-serialization, which converts the raw bytes into structured data. This error will set the status code to HTTP 422 in the response.
- exception mosec.errors.MosecTimeoutError[source]¶
Bases:
BaseException
Exception raised when a MOSEC worker operation times out.
If a bug in the forward code causes the worker to hang indefinitely, a timeout can be used to ensure that the worker eventually returns control to the main thread program. When a timeout occurs, the MosecTimeout exception is raised. This exception can be caught and handled appropriately to perform any necessary cleanup tasks or return a response indicating that the operation timed out.
Note that MosecTimeout is a subclass of BaseException, not Exception. This is because timeouts should not be caught and handled in the same way as other exceptions. Instead, they should be handled in a separate except block which isn’t designed to break the working loop.
Mixins¶
Provide useful mixin to extend MOSEC.
- class mosec.mixin.MsgpackMixin[source]¶
Bases:
object
Msgpack worker mixin interface.
- serialize(data)[source]¶
Serialize with msgpack for the last stage (egress).
- Parameters:
data (
Any
) – the same type as returned byWorker.forward
- Return type:
bytes
- Returns:
the bytes you want to put into the response body
- Raises:
EncodingError – if the data cannot be serialized with msgpack
- deserialize(data)[source]¶
Deserialize method for the first stage (ingress).
- Parameters:
data (
bytes
) – the raw bytes extracted from the request body- Return type:
Any
- Returns:
the same type as the input of
Worker.forward
- Raises:
DecodingError – if the data cannot be deserialized with msgpack
- class mosec.mixin.PlasmaShmIPCMixin[source]¶
Bases:
Worker
Plasma shared memory worker mixin interface.
- class mosec.mixin.RedisShmIPCMixin[source]¶
Bases:
Worker
Redis shared memory worker mixin interface.