"""
Custom caches and utilities
"""
# Copyright CFHT/CNRS/SorbonneU
# Licensed under the MIT licence
import atexit
from collections import OrderedDict
from hashlib import md5
import logging
from multiprocessing.shared_memory import SharedMemory
from os import getpid, getppid
from sys import exit, platform
from time import time_ns
from typing import Any, Callable, OrderedDict, Union
import numpy as np
from .. import package
# Shared version of cache only available on Linux
if package.isonlinux:
from posix_ipc import Semaphore, O_CREAT #type: ignore
from UltraDict import UltraDict #type: ignore
[docs]
class LRUCache:
"""
Custom Least Recently Used (LRU) memory cache.
Parameters
----------
func: class or func
Function result or class instantiation to be cached.
maxsize: int, optional
Maximum size of the cache.
"""
def __init__(self, func: Callable, maxsize: int=8):
self.cache: OrderedDict = OrderedDict()
self.func = func
self.maxsize = maxsize
def __call__(self, *args, **kwargs) -> Any:
"""
Cache or recover earlier cached result/object.
If the number of cached items exceeds maxsize then the least recently
used item is dropped from the cache.
Parameters
----------
args: any
Hashable arguments to the function/constructor.
Returns
-------
result: any
Cached item.
:meta public:
"""
if args in self.cache:
self.cache.move_to_end(args)
return self.cache[args]
if len(self.cache) > self.maxsize:
self.cache.popitem(last=False)
result = self.func(*args, **kwargs)
self.cache[args] = result
return result
def __delitem__(self, key) -> None:
"""
Delete a specific cache entry.
:meta public:
"""
if key in self.cache:
del self.cache[key]
[docs]
class LRUSharedRWCache:
"""
Custom, dictionary-based, Least Recently Used (LRU) reader-writer lockable
cache, shareable across processes.
Parameters
----------
func: Callable
Function to be cached.
name: str, optional
Name of the lock cache.
maxsize: int, optional
Maximum size of the cache.
shared: bool, optional
Share dictionary across processes (requires POSIX IPC support)
removecall: func, optional
Callback function for removing cached data.
logger: logging.Logger, optional
Logger object to display cache handling information.
"""
def __init__(
self,
func: Callable,
name: Union[str, None] = None,
maxsize: int = 8,
shared: bool = True,
removecall: Union[Callable, None] = None,
logger: Union[logging.Logger, None] = None):
self.func = func
# Shared cache option only available on Linux
self.shared = shared and package.isonlinux
self.name = name if name else f"rwlockcache_{getppid()}"
self._lock_name = self.name + ".lock"
self.logger = logger
if self.shared:
with Semaphore(self._lock_name, O_CREAT, initial_value=1) as lock:
self.cache = UltraDict(name=self.name, create=None, shared_lock=True)
self.locks = LRUCache(func=SharedRWLock, maxsize=maxsize)
else:
self.cache = {}
self.results = LRUCache(func=func, maxsize=maxsize)
# Delete semaphores when process is aborted.
if self.shared:
atexit.register(self.remove)
self.removecall = removecall
self.maxsize = maxsize
def __call__(self, *args, **kwargs):
"""
Get or recover a cached entry.
Parameters
----------
args: any
Hashable arguments to the cache dictionary.
kwargs: any
Hashable keyworded arguments to the cache dictionary.
Returns
-------
result:
Cached output.
lock: :class:`SharedRWLock` or None
Reader-Writer lock associated with args, or None if cache is
unshared.
msg:
"OK" or error string in case of an exception.
:meta public:
"""
return self.call_shared(*args, **kwargs) if self.shared \
else self.call_unshared(*args, **kwargs)
[docs]
def call_unshared(self, *args, **kwargs):
"""
Get or recover a cached entry, not shared with other processes.
Parameters
----------
args: any
Hashable arguments to the cache dictionary.
kwargs: any
Hashable keyworded arguments to the cache dictionary.
Returns
-------
result:
Cached output.
lock:
Reader-Writer lock associated with args, set to None.
msg:
"OK" or error string in case of an exception.
:meta public:
"""
# We use MD5 instead of hash() as output is consistent across processes
m = md5()
for s in args:
m.update(s.encode())
hargs = m.hexdigest()
write = False
remove = False
if hargs in self.cache:
# Reference already in cache: get time
self.cache[hargs] = [None, time_ns()]
else:
# Reference not in cache: test if we're reaching the cache limit
if len(self.cache) >= self.maxsize:
# Find LRU cache entry
oldest = min(self.cache, key=lambda k: self.cache[k][1])
ofirstarg = self.cache[oldest][0]
# Delete cache entry
del self.cache[oldest]
# Will have to do remove call if callback function available
remove = self.removecall
# Cache an empty content just to mark territory
write = True
firstarg = args[0]
self.cache[hargs] = [firstarg, time_ns()]
# Prepare write operation on new data
if remove:
# Apply remove callback to first argument stored in cache (filename)
self.removecall(ofirstarg)
# Finally try to update the shared version
if write:
try:
result = self.func(*args, **kwargs)
except Exception as e:
# Uh-oh! things went wrong: remove cache entry and lock
# and return None
del self.cache[hargs]
result = None
msg = e.args[0]
else:
self.cache[hargs] = [firstarg, time_ns()]
msg = "OK"
else:
try:
result = self.results(*args, **kwargs)
except Exception as e:
result = None
msg = e.args[0]
else:
self.cache[hargs] = [None, time_ns()]
msg = "OK"
return result, msg, None
[docs]
def call_shared(self, *args, **kwargs):
"""
Get or recover a cached entry, not shared with other processes.
Parameters
----------
args: any
Hashable arguments to the cache dictionary.
kwargs: any
Hashable keyworded arguments to the cache dictionary.
Returns
-------
result:
Cached output.
lock:
Reader-Writer lock associated with args.
msg:
"OK" or error string in case of an exception.
:meta public:
"""
# We use MD5 instead of hash() as output is consistent across processes
m = md5()
for s in args:
m.update(s.encode())
hargs = m.hexdigest()
write = False
remove = False
lock = self.locks(hargs)
with self.cache.lock:
if hargs in self.cache:
# Reference already in cache: lock in read mode
self.cache[hargs][1] = time_ns()
else:
# Reference not in cache: lock in write mode
# after testing if we're reaching the cache limit
if len(self.cache) >= self.maxsize:
# Find LRU cache entry
oldest = min(self.cache, key=lambda k: self.cache[k][1])
olock = self.locks(oldest[1])
ofirstarg = self.cache[oldest][0]
# Acquire write lock on (now defunct) LRU cache entry
olock.acquire_write()
# Delete cache entry
del self.cache[oldest]
# Will have to do remove call if callback function available
remove = self.removecall
lock.acquire_write()
write = True
# Cache an empty content just to mark territory
firstarg = args[0]
self.cache[hargs] = [firstarg, time_ns()]
# Prepare write operation on new data
if remove:
# Apply remove callback to first argument stored in cache (filename)
self.removecall(ofirstarg)
olock.release_write()
del self.locks[oldest]
del olock
# Finally try to update the shared version
if write:
try:
result = self.func(*args, **kwargs)
except Exception as e:
# Uh-oh! things went wrong: remove cache entry and lock
# and return None
with self.cache.lock:
del self.cache[hargs]
del self.locks[hargs]
result = None
msg = str(e)
else:
with self.cache.lock:
self.cache[hargs] = [firstarg, time_ns()]
msg = "OK"
lock.release_write()
lock.acquire_read()
else:
lock.acquire_read()
try:
result = self.results(*args, **kwargs)
except Exception as e:
result = None
msg = str(e)
else:
with self.cache.lock:
self.cache[hargs][1] = time_ns()
msg = "OK"
return result, msg, lock
[docs]
def remove(self, *args) -> None:
"""
Remove semaphores.
"""
if self.logger:
self.logger.info(f"Cleaning up semaphores")
try:
Semaphore(self._lock_name).close()
except:
pass
try:
Semaphore(self._lock_name).unlink()
except:
pass
[docs]
class SharedRWLock:
"""
Custom reader-writer lock shareable across processes
See Raynal, Michel (2012), Concurrent Programming: Algorithms, Principles,
and Foundations, Springer.
Parameters
----------
name: str, optional
Name of the lock.
"""
def __init__(self, name: str = "RWLock"):
self.name = name
self._rlock_name = f"{package.name}_{name}.r.lock"
self._glock_name = f"{package.name}_{name}.g.lock"
rlock = Semaphore(self._rlock_name, O_CREAT, initial_value=1)
glock = Semaphore(self._glock_name, O_CREAT, initial_value=1)
with Semaphore(self._rlock_name) as rlock:
# We use a try block and SharedMemory flags O_CREX
# to initialize the shared variable only once
try:
self.shared_mem = SharedMemory(
name=f"{package.name}_{name}.b.shm",
create=True,
size=np.array([0], dtype=np.int32).nbytes
)
self.b : np.ndarray = np.ndarray(
[1],
dtype=np.int32,
buffer=self.shared_mem.buf
)
except:
self.shared_mem = SharedMemory(
name=f"{package.name}_{name}.b.shm",
create=False,
size=np.array([0], dtype=np.int32).nbytes
)
self.b = np.ndarray(
[1],
dtype=np.int32,
buffer=self.shared_mem.buf
)
atexit.register(self.remove)
[docs]
def acquire_read(self) -> None:
"""
Acquire lock in read mode.
"""
with Semaphore(self._rlock_name) as rlock:
self.b[0] += 1
if self.b[0] == 1:
Semaphore(self._glock_name).acquire()
[docs]
def acquire_write(self) -> None:
"""
Acquire lock in write mode.
"""
Semaphore(self._glock_name).acquire()
[docs]
def release_read(self) -> None:
"""
Release acquired read lock.
"""
with Semaphore(self._rlock_name) as rlock:
self.b[0] -= 1
if self.b[0] == 0:
Semaphore(self._glock_name).release()
[docs]
def release_write(self) -> None:
"""
Release acquired write lock.
"""
Semaphore(self._glock_name).release()
def __delete__(self, instance) -> None:
"""
Clean up leftovers from the RW lock.
:meta public:
"""
self.remove()
[docs]
def remove(self, *args) -> None:
"""
Remove files used by the RW lock semaphores and shared memory.
"""
try:
Semaphore(self._glock_name).unlink()
Semaphore(self._rlock_name).unlink()
self.shared_mem.close()
self.shared_mem.unlink()
except:
pass