Shared Memory IPC¶
This is an example demonstrating how you can enable the plasma shared memory store or customize your own IPC wrapper.
Mosec’s multi-stage pipeline requires the output data from the previous stage to be transferred to the next stage across python processes. This is coordinated via Unix domain socket between every Python worker process from all stages and the Rust controller process.
By default, we serialize the data and directly transfer the bytes over the socket. However, users may find wrapping this IPC useful or more efficient for specific use cases. Therefore, we provide an example implementation PlasmaShmIPCMixin
based on pyarrow.plasma
and RedisShmIPCMixin
based on redis
. We recommend using RedisShmWrapper
for better performance and longer-lasting updates.
Warning
plasma
is deprecated. Please use Redis instead.
The additional subprocess can be registered as a daemon thus it will be checked by mosec regularly and trigger graceful shutdown when the daemon exits.
plasma_legacy.py
¶
# Copyright 2022 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Using Plasma store with mosec mixin PlasmaShmIPCMixin.
We start a subprocess for the plasma server, and pass the path
to the plasma client which serves as the shm mixin.
We also register the plasma server process as a daemon, so
that when it exits the service is able to gracefully shutdown
and restarted by the orchestrator.
"""
import numpy as np
from pyarrow import plasma # type: ignore
from mosec import Server, ValidationError, Worker
from mosec.mixin import PlasmaShmIPCMixin
class DataProducer(PlasmaShmIPCMixin, Worker):
"""Sample Data Producer."""
def forward(self, data: dict) -> np.ndarray:
# pylint: disable=duplicate-code
try:
nums = np.random.rand(int(data["size"]))
except KeyError as err:
raise ValidationError(err) from err
return nums
class DataConsumer(PlasmaShmIPCMixin, Worker):
"""Sample Data Consumer."""
def forward(self, data: np.ndarray) -> dict:
return {"ipc test data": data.tolist()}
if __name__ == "__main__":
# 200 Mb store, adjust the size according to your requirement
with plasma.start_plasma_store(plasma_store_memory=200 * 1000 * 1000) as (
shm_path,
shm_process,
):
# configure the plasma service path
PlasmaShmIPCMixin.set_plasma_path(shm_path)
server = Server()
# register this process to be monitored
server.register_daemon("plasma_server", shm_process)
server.append_worker(DataProducer, num=2)
server.append_worker(DataConsumer, num=2)
server.run()
redis.py
¶
# Copyright 2023 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Using Redis store with mosec mixin RedisShmIPCMixin.
We start a subprocess for the Redis server, and pass the url
to the redis client which serves as the shm mixin.
We also register the redis server process as a daemon, so
that when it exits the service is able to gracefully shut down
and be restarted by the orchestrator.
"""
import subprocess
import numpy as np
from mosec import Server, ValidationError, Worker
from mosec.mixin import RedisShmIPCMixin
class DataProducer(RedisShmIPCMixin, Worker):
"""Sample Data Producer."""
def forward(self, data: dict) -> np.ndarray:
# pylint: disable=duplicate-code
try:
nums = np.random.rand(int(data["size"]))
except KeyError as err:
raise ValidationError(err) from err
return nums
class DataConsumer(RedisShmIPCMixin, Worker):
"""Sample Data Consumer."""
def forward(self, data: np.ndarray) -> dict:
return {"ipc test data": data.tolist()}
if __name__ == "__main__":
with subprocess.Popen(["redis-server"]) as p: # start the redis server
# configure the redis url
RedisShmIPCMixin.set_redis_url("redis://localhost:6379/0")
server = Server()
# register this process to be monitored
server.register_daemon("redis-server", p)
server.append_worker(DataProducer, num=2)
server.append_worker(DataConsumer, num=2)
server.run()
Start¶
python examples/shm_ipc/plasma_legacy.py
or
python examples/shm_ipc/redis.py
Test¶
http :8000/inference size=100