# Copyright 2023 MOSEC Authors## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""MOSEC redis worker mixin.Provide another data transfer way between workers.The data will be stored in redis shared memory, while the object ID will besent via the original way. use case: large image tensors, cluster-shared data benefits: more stable P99 latency"""# pylint: disable=import-outside-toplevelfromosimportenvironfromtypingimportAnyfrommosec.workerimportWorker_REDIS_URL_ENV="MOSEC_INTERNAL_REDIS_URL"_DEFAULT_KEY="REDIS_SHM_IPC_KEY"
[docs]@classmethoddefset_redis_url(cls,url:str):"""Set the redis service url."""environ[_REDIS_URL_ENV]=url
def_get_client(self)->Any:"""Get the redis client. This will create a new one if not exist."""importredisifself._redis_clientisNone:url=environ.get(_REDIS_URL_ENV)ifnoturl:raiseRuntimeError("please set the redis url with `RedisShmIPCMixin.set_redis_url()`")self._redis_client=redis.from_url(url)returnself._redis_clientdef_prepare_next_id(self)->None:"""Make sure the next id exists. This will create a new one if not exist."""ifself._next_idisNone:client=self._get_client()key=self._redis_keyself._next_id=bytes(str(client.incr(key)),encoding="utf-8")
[docs]defserialize_ipc(self,data:Any)->bytes:"""Save the data to the redis server and return the id."""self._prepare_next_id()client=self._get_client()withclient.pipeline()aspipe:current_id=self._next_idpipe.set(current_id,super().serialize_ipc(data))# type: ignorepipe.incr(self._redis_key)_id=pipe.execute()[-1]self._next_id=bytes(str(_id),encoding="utf-8")returncurrent_id# type: ignore
[docs]defdeserialize_ipc(self,data:bytes)->Any:"""Get the data from the redis server and delete it."""client=self._get_client()object_id=bytes(data)withclient.pipeline()aspipe:pipe.get(object_id)pipe.delete(object_id)obj=pipe.execute()[0]returnsuper().deserialize_ipc(obj)