Source code for BioSimSpace.IO._file_cache

######################################################################
# BioSimSpace: Making biomolecular simulation a breeze!
#
# Copyright: 2017-2024
#
# Authors: Lester Hedges <lester.hedges@gmail.com>
#
# BioSimSpace is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# BioSimSpace is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BioSimSpace. If not, see <http://www.gnu.org/licenses/>.
#####################################################################

"""Functionality for caching molecular files to avoid re-writing."""

__author__ = "Lester Hedges"
__email__ = "lester.hedges@gmail.com"

__all__ = ["clearCache", "disableCache", "enableCache"]

import collections as _collections
import hashlib as _hashlib
import os as _os
import shutil as _shutil
import sys as _sys

from .._SireWrappers import System as _System


class _FixedSizeOrderedDict(_collections.OrderedDict):
    """A utility class to implement a fixed-sized cache."""

    def __init__(self, *args, max=2, **kwargs):
        """
        Constructor.

        Parameters
        ----------

        max : float
            The maximum size in GB.
        """

        # Work out the approximate maximum number of atoms.
        if max > 0:
            self._max_atoms = int((max * 1e9) / (9 * _sys.getsizeof(float())))
        else:
            self._max_atoms = 0

        # Store the total number of atoms.
        self._num_atoms = 0

        super().__init__(*args, **kwargs)

    def __setitem__(self, key, value):
        _collections.OrderedDict.__setitem__(self, key, value)
        self._num_atoms += value[0].nAtoms()
        if self._max_atoms > 0:
            if self._num_atoms > self._max_atoms:
                key, value = self.popitem(False)
                self._num_atoms -= value[0].nAtoms()

    def __delitem__(self, key):
        value = self[key]
        self._num_atoms -= value[0].nAtoms()
        _collections.OrderedDict.__delitem__(self, key)


# Initialise a "cache" dictionary. This maps a key of the system UID, file format
# and excluded properties a value of the system and file path. When saving to a
# given format, we can then to see if a matching system has previously been written
# to the same format, allowing us to re-use the existing file.
_cache = _FixedSizeOrderedDict()

# Whether to use the cache.
_use_cache = True


[docs] def clearCache(): """ Clear the file cache. """ global _cache _cache = _FixedSizeOrderedDict()
[docs] def disableCache(): """ Disable the file cache. """ global _use_cache _use_cache = False
[docs] def enableCache(): """ Enable the file cache. """ global _use_cache _use_cache = True
def _cache_active(): """ Internal helper function to check whether the cache is active. """ global _use_cache return _use_cache def _check_cache( system, format, filebase, match_water=True, property_map={}, excluded_properties=[], skip_water=True, **kwargs, ): """ Check whether a Sire system has previously been written to the specified format. Parameters ---------- system : :class:`System <BioSimSpace._SireWrappers.System>` The system. format : str The molecular file format. filebase : str The file base to copy the file to. match_water : bool Whether to update the naming of water molecules to match the expected convention for the chosen file format. This is useful when a system is being saved to a different file format to that from which it was loaded. property_map : dict A dictionary that maps system "properties" to their user defined values. This allows the user to refer to properties with their own naming scheme, e.g. { "charge" : "my-charge" } excluded_properties : [str] A list of properties to exclude when comparing systems when checking the file cache. skip_water : bool Whether to skip water molecules when comparing systems. Returns ------- extension : str The extension for cached file. False if no file was found. """ # Validate input. if not isinstance(system, _System): raise TypeError("'system' must be of type 'BioSimSpace._SireWrappers.System'") if not isinstance(format, str): raise TypeError("'format' must be of type 'str'") if not isinstance(filebase, str): raise TypeError("'filebase' must be of type 'str'") if not isinstance(match_water, bool): raise TypeError("'match_water' must be of type 'bool'") if not isinstance(excluded_properties, (list, tuple)): raise TypeError("'excluded_properties' must be a list of 'str' types.") if not all(isinstance(x, str) for x in excluded_properties): raise TypeError("'excluded_properties' must be a list of 'str' types.") if not isinstance(property_map, dict): raise TypeError("'property_map' must be of type 'dict'.") if not isinstance(skip_water, bool): raise TypeError("'skip_water' must be of type 'bool'.") global _cache # Create the key. key = ( system._sire_object.uid().toString(), format, _compress_molnum_key(str(system._mol_nums)), str(set(excluded_properties)), str(match_water), str(skip_water), ) # Get the existing file path and MD5 hash from the cache. try: (prev_system, path, original_hash) = _cache[key] except: return False # Whether the cache entry is still valid. cache_valid = True # Is this system the same as the previous? if not system.isSame( prev_system, excluded_properties=excluded_properties, property_map0=property_map, property_map1=property_map, skip_water=skip_water, ): cache_valid = False # Make sure the file still exists. if not _os.path.exists(path): cache_valid = False # Make sure the MD5 sum is still the same. else: current_hash = _get_md5_hash(path) if current_hash != original_hash: cache_valid = False # If the cache isn't valid, delete the entry and return False. if not cache_valid: if key in _cache: del _cache[key] return False # Copy the old file to the new location. else: # Get the file extension. ext = _os.path.splitext(path)[1] # Add the extension to the file base. new_path = filebase + ext # Copy the file to the new location. try: _shutil.copyfile(path, new_path) except _shutil.SameFileError: pass except: del _cache[key] return False return ext def _update_cache( system, format, path, excluded_properties=[], match_water=True, skip_water=True, **kwargs, ): """ Update the file cache when a new system is written to a specified format. Parameters ---------- system : :class:`System <BioSimSpace._SireWrappers.System>` The system. format : str The molecular file format. path : str The path to the file. excluded_properties : [str] A list of properties to exclude when comparing systems when checking match_water : bool Whether to update the naming of water molecules to match the expected convention for the chosen file format. This is useful when a system is being saved to a different file format to that from which it was loaded. skip_water : bool Whether to skip water molecules when comparing systems. """ # Validate input. if not isinstance(system, _System): raise TypeError("'system' must be of type 'BioSimSpace._SireWrappers.System'") if not isinstance(format, str): raise TypeError("'format' must be of type 'str'") if not isinstance(excluded_properties, (list, tuple)): raise TypeError("'excluded_properties' must be a list of 'str' types.") if not isinstance(path, str): raise TypeError("'path' must be of type 'str'") if not _os.path.exists(path): raise IOError(f"File does not exist: '{path}'") if not all(isinstance(x, str) for x in excluded_properties): raise TypeError("'excluded_properties' must be a list of 'str' types.") if not isinstance(match_water, bool): raise TypeError("'match_water' must be of type 'bool'") if not isinstance(skip_water, bool): raise TypeError("'skip_water' must be of type 'bool'.") global _cache # Convert to an absolute path. path = _os.path.abspath(path) # Get the MD5 checksum for the file. hash = _get_md5_hash(path) # Create the key. key = ( system._sire_object.uid().toString(), format, _compress_molnum_key(str(system._mol_nums)), str(set(excluded_properties)), str(match_water), str(skip_water), ) # Update the cache. _cache[key] = (system.copy(), path, hash) def _get_md5_hash(path): """ Internal helper function to return the MD5 checksum for a file. Returns ------- hash : hashlib.HASH """ # Get the MD5 hash of the file. Process in chunks in case the file is too # large to process. hash = _hashlib.md5() with open(path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash.update(chunk) return hash.hexdigest() def _compress_molnum_key(str): """ Internal helper function to compress the MolNum list section of the key. """ return str.replace("MolNum(", "").replace(")", "")