# Copyright 2023 MOSEC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Managers to control Coordinator and Mosec process."""
import multiprocessing as mp
import subprocess
from multiprocessing.context import ForkContext, SpawnContext
from multiprocessing.process import BaseProcess
from multiprocessing.synchronize import Event
from pathlib import Path
from time import monotonic, sleep
from typing import Callable, Dict, Iterable, List, Optional, Type, Union, cast
from mosec.coordinator import Coordinator
from mosec.env import env_var_context, validate_env, validate_int_ge
from mosec.log import get_internal_logger
from mosec.utils import get_mosec_path
from mosec.worker import Worker
logger = get_internal_logger()
NEW_PROCESS_METHOD = {"spawn", "fork"}
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-arguments
# pylint: disable=too-few-public-methods
[docs]
class Runtime:
"""The wrapper with one worker and its arguments."""
# count how many runtime instances have been created
_stage_id: int = 0
[docs]
def __init__(
self,
worker: Type[Worker],
num: int = 1,
max_batch_size: int = 1,
max_wait_time: int = 10,
timeout: int = 3,
start_method: str = "spawn",
env: Union[None, List[Dict[str, str]]] = None,
):
"""Initialize the mosec coordinator.
Args:
worker (Worker): subclass of `mosec.Worker` implemented by users.
num (int): number of workers
max_batch_size: the maximum batch size allowed (>=1), will enable the
dynamic batching if it > 1
max_wait_time (int): the maximum wait time (millisecond)
for dynamic batching, needs to be used with `max_batch_size`
to enable the feature. If not configure, will use the CLI
argument `--wait` (default=10ms)
timeout (int): timeout (second) for the `forward` function.
start_method: the process starting method ("spawn" or "fork")
env: the environment variables to set before starting the process
"""
self.worker = worker
self.num = num
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.timeout = timeout
self.start_method = start_method
self.env = env
Runtime._stage_id += 1
# adding the stage id in case the worker class is added to multiple stages
self.name = f"{self.worker.__name__}_{self._stage_id}"
self._pool: List[Union[BaseProcess, None]] = [None for _ in range(self.num)]
self._validate()
@staticmethod
def _process_healthy(process: Union[BaseProcess, None]) -> bool:
"""Check if the child process is healthy.
ref: https://docs.python.org/3/library/multiprocessing.html
The exit code will be None if the process has not yet terminated.
"""
return process is not None and process.exitcode is None
def _healthy(self, method: Callable[[Iterable[object]], bool]) -> bool:
"""Check if all/any of the child processes are healthy."""
return method(self._pool)
def _start_process(
self,
worker_id: int,
work_path: str,
shutdown: Event,
shutdown_notify: Event,
):
"""Start a worker process in the context."""
context = mp.get_context(self.start_method)
context = cast(Union[SpawnContext, ForkContext], context)
coordinator_process = context.Process(
target=Coordinator,
args=(
self.worker,
self.max_batch_size,
shutdown,
shutdown_notify,
work_path,
self.name,
worker_id + 1,
self.timeout,
),
daemon=True,
)
with env_var_context(self.env, worker_id):
coordinator_process.start()
self._pool[worker_id] = coordinator_process
def _check(
self,
work_path: str,
shutdown: Event,
shutdown_notify: Event,
init: bool,
) -> bool:
"""Check and start the worker process if it has not started yet.
Args:
work_path: path of working directory
shutdown: Event of server shutdown
shutdown_notify: Event of server will shutdown
init: whether the worker is tried to start at the first time
Returns:
Whether the worker is started successfully
"""
# for every sequential stage
self._pool = [p if self._process_healthy(p) else None for p in self._pool]
if self._healthy(all):
# this stage is healthy
return True
if not init and not self._healthy(any):
# this stage might contain bugs
return False
need_start_id = [
worker_id for worker_id in range(self.num) if self._pool[worker_id] is None
]
for worker_id in need_start_id:
# for every worker in each stage
self._start_process(worker_id, work_path, shutdown, shutdown_notify)
return True
def _validate(self):
"""Validate arguments of worker runtime."""
validate_env(self.env, self.num)
assert issubclass(
self.worker, Worker
), "worker must be inherited from mosec.Worker"
validate_int_ge(self.num, "worker number")
validate_int_ge(self.max_batch_size, "maximum batch size")
validate_int_ge(self.max_wait_time, "maximum wait time", 0)
validate_int_ge(self.timeout, "forward timeout", 0)
assert (
self.start_method in NEW_PROCESS_METHOD
), f"start method must be one of {NEW_PROCESS_METHOD}"
class PyRuntimeManager:
"""The manager to control coordinator process."""
def __init__(self, work_path: str, shutdown: Event, shutdown_notify: Event):
"""Initialize a coordinator manager.
Args:
work_path: path of working directory
shutdown: Event of server shutdown
shutdown_notify: Event of server will shutdown
"""
self.runtimes: List[Runtime] = []
self._work_path = work_path
self.shutdown = shutdown
self.shutdown_notify = shutdown_notify
@property
def worker_count(self) -> int:
"""Get number of workers."""
return len(self.runtimes)
@property
def workers(self) -> List[Type[Worker]]:
"""Get List of workers."""
return [r.worker for r in self.runtimes]
def append(self, runtime: Runtime):
"""Sequentially appends workers to the workflow pipeline."""
self.runtimes.append(runtime)
def check_and_start(self, init: bool) -> Union[Runtime, None]:
"""Check all worker processes and try to start failed ones.
Args:
init: whether the worker is tried to start at the first time
"""
for worker_runtime in self.runtimes:
if not worker_runtime._check( # pylint: disable=protected-access
self._work_path, self.shutdown, self.shutdown_notify, init
):
return worker_runtime
return None
class RsRuntimeManager:
"""The manager to control Mosec process."""
def __init__(self, timeout: int):
"""Initialize a Mosec manager.
Args:
timeout: service timeout in milliseconds
"""
self.process: Optional[subprocess.Popen] = None
self.timeout = timeout
def halt(self):
"""Graceful shutdown."""
# terminate controller first and wait for a graceful period
if self.process is None:
return
self.process.terminate()
graceful_period = monotonic() + self.timeout / 1000
while monotonic() < graceful_period:
ctr_exitcode = self.process.poll()
if ctr_exitcode is None:
sleep(0.1)
continue
# exited
if ctr_exitcode: # on error
logger.error("mosec service halted on error [%d]", ctr_exitcode)
else:
logger.info("mosec service halted normally [%d]", ctr_exitcode)
break
if monotonic() > graceful_period:
logger.error("failed to terminate mosec service, will try to kill it")
self.process.kill()
def start(self, config_path: Path) -> subprocess.Popen:
"""Start the Mosec process.
Args:
config_path: configuration path of mosec
"""
# pylint: disable=consider-using-with
self.process = subprocess.Popen([str(get_mosec_path()), config_path])
return self.process