Source code for BioSimSpace.Process._openmm

######################################################################
# BioSimSpace: Making biomolecular simulation a breeze!
#
# Copyright: 2017-2024
#
# Authors: Lester Hedges <lester.hedges@gmail.com>
#
# BioSimSpace is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BioSimSpace. If not, see <http://www.gnu.org/licenses/>.
#####################################################################

"""Functionality for running simulations with OpenMM."""

__author__ = "Lester Hedges"
__email__ = "lester.hedges@gmail.com"

__all__ = ["OpenMM"]

from .._Utils import _try_import

import math as _math
import os as _os

_pygtail = _try_import("pygtail")
import sys as _sys
import shutil as _shutil
import timeit as _timeit
import warnings as _warnings

from sire.legacy import Base as _SireBase
from sire.legacy import IO as _SireIO
from sire.legacy import Units as _SireUnits


from .. import _isVerbose
from .._Exceptions import IncompatibleError as _IncompatibleError
from .._Exceptions import MissingSoftwareError as _MissingSoftwareError
from .._SireWrappers import System as _System
from ..Metadynamics import CollectiveVariable as _CollectiveVariable
from ..Protocol._position_restraint_mixin import _PositionRestraintMixin
from ..Types._type import Type as _Type

from .. import IO as _IO
from .. import Protocol as _Protocol
from .. import Trajectory as _Trajectory
from .. import Types as _Types
from .. import Units as _Units
from .. import _Utils

from . import _process

from ._plumed import Plumed as _Plumed


[docs] class OpenMM(_process.Process): """A class for running simulations using OpenMM.""" # Dictionary of platforms and their OpenMM keyword. _platforms = {"CPU": "CPU", "CUDA": "CUDA", "OPENCL": "OpenCL"}
[docs] def __init__( self, system, protocol, reference_system=None, exe=None, name="openmm", platform="CPU", work_dir=None, seed=None, property_map={}, **kwargs, ): """ Constructor. Parameters ---------- system : :class:`System <BioSimSpace._SireWrappers.System>` The molecular system. protocol : :class:`Protocol <BioSimSpace.Protocol>` The protocol for the OpenMM process. reference_system : :class:`System <BioSimSpace._SireWrappers.System>` or None An optional system to use as a source of reference coordinates for position restraints. It is assumed that this system has the same topology as "system". If this is None, then "system" is used as a reference. exe : str The full path to the Python interpreter used to run OpenMM. name : str The name of the process. platform : str The platform for the simulation: "CPU", "CUDA", or "OPENCL". For CUDA use the CUDA_VISIBLE_DEVICES environment variable to set the GPUs on which to run, e.g. to run on two GPUs indexed 0 and 1 use: CUDA_VISIBLE_DEVICES=0,1. For OPENCL, instead use OPENCL_VISIBLE_DEVICES. work_dir : The working directory for the process. seed : int A random number seed. property_map : dict A dictionary that maps system "properties" to their user defined values. This allows the user to refer to properties with their own naming scheme, e.g. { "charge" : "my-charge" } kwargs : dict Additional keyword arguments. """ # Call the base class constructor. super().__init__( system, protocol, reference_system=reference_system, name=name, work_dir=work_dir, seed=seed, property_map=property_map, ) # Set the package name. self._package_name = "OPENMM" # This process can generate trajectory data. self._has_trajectory = True # If the path a Python interpreter wasn't specified, then use the bundled sire_python. if exe is None: bin_dir = _SireBase.getBinDir() # Generate the name of the sire_python interpreter. if _sys.platform == "win32": self._exe = _os.path.join(_os.path.normpath(bin_dir), "sire_python.exe") else: self._exe = _os.path.join(_os.path.normpath(bin_dir), "sire_python") else: # Make sure executable exists. if _os.path.isfile(exe): self._exe = exe else: raise IOError("OpenMM Python interpreter doesn't exist: '%s'" % exe) if not isinstance(platform, str): raise TypeError("'platform' must be of type 'str'.") else: # Strip all whitespace and convert to upper case. platform = platform.replace(" ", "").upper() # Check for platform support. if platform not in self._platforms: raise ValueError("Supported platforms are: %s" % self._platforms.keys()) else: self._platform = self._platforms[platform] # Initialise the stdout dictionary and title header. self._stdout_dict = _process._MultiDict() # Store the name of the OpenMM log file. self._log_file = _os.path.join(str(self._work_dir), f"{name}.log") # Initialise the log file separator. self._record_separator = None # Initialise a dictionary to map log file records to their column order. self._record_mapping = {} # The names of the input files. We choose to use AMBER files since they # are self-contained, but could equally work with GROMACS files. self._rst_file = _os.path.join(str(self._work_dir), f"{name}.rst7") self._top_file = _os.path.join(str(self._work_dir), f"{name}.prm7") self._ref_file = _os.path.join(str(self._work_dir), f"{name}_ref.rst7") # The name of the trajectory file. self._traj_file = _os.path.join(str(self._work_dir), f"{name}.dcd") # Set the path for the OpenMM Python script. (We use the concept of a # config file for consistency with other Process classes.) self._config_file = _os.path.join(str(self._work_dir), f"{name}_script.py") # Create the list of input files. self._input_files = [self._config_file, self._rst_file, self._top_file] # Add the reference file if there are position restraints. if self._protocol.getRestraint() is not None: self._input_files.append(self._ref_file) # Initialise the log file header. self._header = None # Now set up the working directory for the process. self._setup()
def __str__(self): """Return a human readable string representation of the object.""" return ( "<BioSimSpace.Process.%s: system=%s, protocol=%s, exe='%s', name='%s', platform='%s', work_dir='%s' seed=%s>" % ( self.__class__.__name__, str(self._system), self._protocol.__repr__(), self._exe + ("%s " % self._script if self._script else ""), self._name, self._platform, self._work_dir, self._seed, ) ) def __repr__(self): """Return a string showing how to instantiate the object.""" return ( "BioSimSpace.Process.%s(%s, %s, exe='%s', name='%s', platform='%s', work_dir='%s', seed=%s)" % ( self.__class__.__name__, str(self._system), self._protocol.__repr__(), self._exe + ("%s " % self._script if self._script else ""), self._name, self._platform, self._work_dir, self._seed, ) ) def _setup(self): """Setup the input files and working directory ready for simulation.""" # Create a copy of the system. system = self._system.copy() # Convert the water model topology so that it matches the AMBER naming convention. system._set_water_topology("AMBER", property_map=self._property_map) self._reference_system._set_water_topology( "AMBER", property_map=self._property_map ) # Check for perturbable molecules and convert to the chosen end state. system = self._checkPerturbable(system) # Create the input files... # RST file (coordinates). try: file = _os.path.splitext(self._rst_file)[0] _IO.saveMolecules(file, system, "rst7", property_map=self._property_map) except Exception as e: msg = "Failed to write system to 'RST7' format." if _isVerbose(): raise IOError(msg) from e else: raise IOError(msg) from None # Reference coordinate file for position restraints. if self._protocol.getRestraint() is not None: try: file = _os.path.splitext(self._ref_file)[0] _IO.saveMolecules( file, self._reference_system, "rst7", property_map=self._property_map, ) except Exception as e: msg = "Failed to write reference system to 'RST7' format." if _isVerbose(): raise IOError(msg) from e else: raise IOError(msg) from None # PRM file (topology). try: file = _os.path.splitext(self._top_file)[0] _IO.saveMolecules( file, system, "prm7", match_water=False, property_map=self._property_map ) except Exception as e: msg = "Failed to write system to 'PRM7' format." if _isVerbose(): raise IOError(msg) from e else: raise IOError(msg) from None # Skip if the user has passed a custom config. if isinstance(self._protocol, _Protocol.Custom): self.setConfig(self._protocol.getConfig()) else: self._generate_config() self.writeConfig(self._config_file) # Generate the dictionary of command-line arguments. self._generate_args() # Return the list of input files. return self._input_files def _generate_config(self): """Generate OpenMM Python script file strings.""" # Clear the existing configuration list. self._config = [] # Get the "space" property from the user mapping. prop = self._property_map.get("space", "space") # Check whether the system contains periodic box information. if prop in self._system._sire_object.propertyKeys(): try: # Make sure that we have a periodic box. The system will now have # a default cartesian space. box = self._system._sire_object.property(prop) has_box = box.isPeriodic() except: has_box = False else: _warnings.warn("No simulation box found. Assuming gas phase simulation.") has_box = False if isinstance(self._protocol, _Protocol.Minimisation): # Write the OpenMM import statements. self._add_config_imports() self._add_config_monkey_patches() # Load the input files. self.addToConfig("\n# Load the topology and coordinate files.") self.addToConfig( "\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces." ) self.addToConfig( f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')" ) # Don't use a cut-off if this is a vacuum simulation or if box information # is missing. self.addToConfig("\n# Initialise the molecular system.") if not has_box or not self._has_water: self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,") else: self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,") self.addToConfig(" nonbondedCutoff=1*nanometer,") self.addToConfig(" constraints=HBonds)") # Set the integrator. (Use zero-temperature as this is just a dummy step.) self.addToConfig("\n# Define the integrator.") self.addToConfig("integrator = LangevinMiddleIntegrator(0*kelvin,") self.addToConfig(" 1/picosecond,") self.addToConfig(" 0.002*picoseconds)") # Add the platform information. self._add_config_platform() # Add any position restraints. if self._protocol.getRestraint() is not None: self._add_config_restraints() # Set up the simulation object. self.addToConfig("\n# Initialise and configure the simulation object.") self.addToConfig("simulation = Simulation(prm.topology,") self.addToConfig(" system,") self.addToConfig(" integrator,") self.addToConfig(" platform,") self.addToConfig(" properties)") if self._protocol.getRestraint() is not None: self.addToConfig("simulation.context.setPositions(positions)") else: self.addToConfig("simulation.context.setPositions(prm.positions)") self.addToConfig("if prm.box_vectors is not None:") self.addToConfig( " box_vectors = reducePeriodicBoxVectors(prm.box_vectors)" ) self.addToConfig( " simulation.context.setPeriodicBoxVectors(*box_vectors)" ) self.addToConfig( f"simulation.minimizeEnergy(maxIterations={self._protocol.getSteps()})" ) # Add the reporters. self.addToConfig("\n# Add reporters.") self._add_config_reporters(state_interval=1, traj_interval=1) # Now run the simulation. self.addToConfig( "\n# Run a single simulation step to allow us to get the system and energy." ) self.addToConfig(f"simulation.step(1)") elif isinstance(self._protocol, _Protocol.Equilibration): # Write the OpenMM import statements and monkey-patches. self._add_config_imports() self._add_config_monkey_patches() # Load the input files. self.addToConfig("\n# Load the topology and coordinate files.") self.addToConfig( "\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces." ) self.addToConfig( f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')" ) # Don't use a cut-off if this is a vacuum simulation or if box information # is missing. is_periodic = True self.addToConfig("\n# Initialise the molecular system.") if not has_box or not self._has_water: is_periodic = False self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,") else: self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,") self.addToConfig(" nonbondedCutoff=1*nanometer,") self.addToConfig(" constraints=HBonds)") # Get the starting temperature and system pressure. temperature = self._protocol.getStartTemperature().kelvin().value() pressure = self._protocol.getPressure() # Add a Monte Carlo barostat if the simulation is at constant pressure. is_const_pressure = False if pressure is not None: # Cannot use a barostat with a non-periodic system. if not is_periodic: _warnings.warn( "Cannot use a barostat for a vacuum or non-periodic simulation" ) else: is_const_pressure = True # Convert to bar and get the value. pressure = pressure.bar().value() # Create the barostat and add its force to the system. self.addToConfig("\n# Add a barostat to run at constant pressure.") self.addToConfig( f"barostat = MonteCarloBarostat({pressure}*bar, {temperature}*kelvin)" ) if self._is_seeded: self.addToConfig(f"barostat.setRandomNumberSeed({self._seed})") self.addToConfig("system.addForce(barostat)") # Add any position restraints. if self._protocol.getRestraint() is not None: self._add_config_restraints() # Get the integration time step from the protocol. timestep = self._protocol.getTimeStep().picoseconds().value() # Set the integrator. self.addToConfig("\n# Define the integrator.") self.addToConfig( f"integrator = LangevinMiddleIntegrator({temperature}*kelvin," ) friction = ( 1 / self._protocol.getThermostatTimeConstant().picoseconds().value() ) self.addToConfig( f" {friction:.5f}/picosecond," ) self.addToConfig(f" {timestep}*picoseconds)") if self._is_seeded: self.addToConfig(f"integrator.setRandomNumberSeed({self._seed})") # Add the platform information. self._add_config_platform() # Set up the simulation object. self.addToConfig("\n# Initialise and configure the simulation object.") self.addToConfig("simulation = Simulation(prm.topology,") self.addToConfig(" system,") self.addToConfig(" integrator,") self.addToConfig(" platform,") self.addToConfig(" properties)") if self._protocol.getRestraint() is not None: self.addToConfig("simulation.context.setPositions(positions)") else: self.addToConfig("simulation.context.setPositions(prm.positions)") self.addToConfig("if prm.box_vectors is not None:") self.addToConfig( " box_vectors = reducePeriodicBoxVectors(prm.box_vectors)" ) self.addToConfig( " simulation.context.setPeriodicBoxVectors(*box_vectors)" ) # Set initial velocities from temperature distribution. self.addToConfig("\n# Setting initial system velocities.") self.addToConfig( f"simulation.context.setVelocitiesToTemperature({temperature})" ) # Work out the number of integration steps. steps = _math.ceil( self._protocol.getRunTime() / self._protocol.getTimeStep() ) # Get the report and restart intervals. report_interval = self._protocol.getReportInterval() restart_interval = self._protocol.getRestartInterval() # Cap the intervals at the total number of steps. if report_interval > steps: report_interval = steps if restart_interval > steps: restart_interval = steps # Add the reporters. self.addToConfig("\n# Add reporters.") self._add_config_reporters( state_interval=report_interval, traj_interval=restart_interval, is_restart=False, ) # Now run the simulation. self.addToConfig("\n# Run the simulation.") # Constant temperature equilibration. if self._protocol.isConstantTemp(): self.addToConfig(f"simulation.step({steps})") # Heating / cooling cycle. else: # Adjust temperature every 100 cycles, assuming that there at # least that many cycles. if steps > 100: # Work out the number of temperature cycles. temp_cycles = _math.ceil(steps / 100) # Work out the temperature change per cycle. delta_temp = ( self._protocol.getEndTemperature().kelvin().value() - self._protocol.getStartTemperature().kelvin().value() ) / temp_cycles self.addToConfig(f"start_temperature = {temperature}") self.addToConfig(f"for x in range(0, {temp_cycles}):") self.addToConfig( f" temperature = {temperature} + x*{delta_temp}" ) self.addToConfig( f" integrator.setTemperature(temperature*kelvin)" ) if is_const_pressure: self.addToConfig( f" barostat.setDefaultTemperature(temperature*kelvin)" ) self.addToConfig(" simulation.step(100)") else: # Work out the temperature change per step. delta_temp = ( self._protocol.getEndTemperature().kelvin().value() - self._protocol.getStartTemperature().kelvin().value() ) / steps self.addToConfig(f"start_temperature = {temperature}") self.addToConfig(f"for x in range(0, {steps}):") self.addToConfig( f" temperature = {temperature} + x*{delta_temp}" ) self.addToConfig( f" integrator.setTemperature(temperature*kelvin)" ) if is_const_pressure: self.addToConfig( f" barostat.setDefaultTemperature(temperature*kelvin)" ) self.addToConfig(" simulation.step(1)") elif isinstance(self._protocol, _Protocol.Production): # Write the OpenMM import statements. self._add_config_imports() self._add_config_monkey_patches() # Production specific import. self.addToConfig("import os") # Load the input files. self.addToConfig("\n# Load the topology and coordinate files.") self.addToConfig( "\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces." ) self.addToConfig( f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')" ) # Don't use a cut-off if this is a vacuum simulation or if box information # is missing. is_periodic = True self.addToConfig("\n# Initialise the molecular system.") if not has_box or not self._has_water: is_periodic = False self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,") else: self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,") self.addToConfig(" nonbondedCutoff=1*nanometer,") self.addToConfig(" constraints=HBonds)") # Get the starting temperature and system pressure. temperature = self._protocol.getTemperature().kelvin().value() pressure = self._protocol.getPressure() # Add a Monte Carlo barostat if the simulation is at constant pressure. is_const_pressure = False if pressure is not None: # Cannot use a barostat with a non-periodic system. if not is_periodic: _warnings.warn( "Cannot use a barostat for a vacuum or non-periodic simulation" ) else: is_const_pressure = True # Convert to bar and get the value. pressure = pressure.bar().value() # Create the barostat and add its force to the system. self.addToConfig("\n# Add a barostat to run at constant pressure.") self.addToConfig( f"barostat = MonteCarloBarostat({pressure}*bar, {temperature}*kelvin)" ) if self._is_seeded: self.addToConfig(f"barostat.setRandomNumberSeed({self._seed})") self.addToConfig("system.addForce(barostat)") # Add any position restraints. if self._protocol.getRestraint() is not None: self._add_config_restraints() # Get the integration time step from the protocol. timestep = self._protocol.getTimeStep().picoseconds().value() # Set the integrator. self.addToConfig("\n# Define the integrator.") self.addToConfig( f"integrator = LangevinMiddleIntegrator({temperature}*kelvin," ) friction = ( 1 / self._protocol.getThermostatTimeConstant().picoseconds().value() ) self.addToConfig( f" {friction:.5f}/picosecond," ) self.addToConfig(f" {timestep}*picoseconds)") if self._is_seeded: self.addToConfig(f"integrator.setRandomNumberSeed({self._seed})") # Add the platform information. self._add_config_platform() # Set up the simulation object. self.addToConfig("\n# Initialise and configure the simulation object.") self.addToConfig("simulation = Simulation(prm.topology,") self.addToConfig(" system,") self.addToConfig(" integrator,") self.addToConfig(" platform,") self.addToConfig(" properties)") if self._protocol.getRestraint() is not None: self.addToConfig("simulation.context.setPositions(positions)") else: self.addToConfig("simulation.context.setPositions(prm.positions)") self.addToConfig("if prm.box_vectors is not None:") self.addToConfig( " box_vectors = reducePeriodicBoxVectors(prm.box_vectors)" ) self.addToConfig( " simulation.context.setPeriodicBoxVectors(*box_vectors)" ) # Set initial velocities from temperature distribution. self.addToConfig("\n# Setting initial system velocities.") self.addToConfig( f"simulation.context.setVelocitiesToTemperature({temperature})" ) # Check for a restart file and load the simulation state. is_restart, step = self._add_config_restart() # Work out the number of integration steps. total_steps = _math.ceil( self._protocol.getRunTime() / self._protocol.getTimeStep() ) # Subtract the current number of steps. steps = total_steps - step # Exit if the simulation has already finished. if steps <= 0: print("The simulation has already finished!") return # Inform user that a restart was loaded. self.addToConfig("\n# Print restart information.") self.addToConfig("if is_restart:") self.addToConfig(f" steps = {total_steps}") self.addToConfig(" percent_complete = 100 * (step / steps)") self.addToConfig(" print('Loaded state from an existing simulation.')") self.addToConfig( " print(f'Simulation is {percent_complete}% complete.')" ) # Get the report and restart intervals. report_interval = self._protocol.getReportInterval() restart_interval = self._protocol.getRestartInterval() # Cap the intervals at the total number of steps. if report_interval > steps: report_interval = steps if restart_interval > steps: restart_interval = steps # Add the reporters. self.addToConfig("\n# Add reporters.") self._add_config_reporters( state_interval=report_interval, traj_interval=restart_interval, is_restart=is_restart, ) # Work out the total simulation time in picoseconds. run_time = steps * timestep # Work out the number of cycles in 100 picosecond intervals. cycles = _math.ceil(run_time / 100) # Work out the number of steps per cycle. steps_per_cycle = int(steps / cycles) # Now run the simulation. self.addToConfig("\n# Run the simulation in 100 picosecond cycles.") self.addToConfig(f"for x in range(0, {cycles}):") self.addToConfig(f" simulation.step({steps_per_cycle})") self.addToConfig(f" simulation.saveState('{self._name}.xml')") elif isinstance(self._protocol, _Protocol.Metadynamics): colvar = self._protocol.getCollectiveVariable() if len(colvar) != 1 or not isinstance( colvar[0], _CollectiveVariable.Funnel ): raise _IncompatibleError( "We currently only support '%s' collective variables for '%s' protocols" % ( _CollectiveVariable.Funnel.__name__, self._protocol.__class__.__name__, ) ) # Create the path to the patched OpenMM metadynamics module. aux_file = "metadynamics.py" path = ( _os.path.dirname(_CollectiveVariable.__file__).replace( "CollectiveVariable", "_aux" ) + "/" + aux_file ) # Copy the file into the working directory. _shutil.copyfile(path, _os.path.join(str(self._work_dir), aux_file)) # The following OpenMM native implementation of the funnel metadynamics protocol # is adapted from funnel_maker.py by Dominykas Lukauskis. # Extract the only collective variable. colvar = colvar[0] # Write the OpenMM import statements. self._add_config_imports() self.addToConfig( "from metadynamics import *" ) # Use local patched metadynamics module. self.addToConfig("from glob import glob") self.addToConfig("import math") self.addToConfig("import os") self.addToConfig("import shutil") # Load the input files. self.addToConfig("\n# Load the topology and coordinate files.") self.addToConfig( "\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces." ) self.addToConfig( f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')" ) # Don't use a cut-off if this is a vacuum simulation or if box information # is missing. is_periodic = True self.addToConfig("\n# Initialise the molecular system.") if not has_box or not self._has_water: is_periodic = False self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,") else: self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,") self.addToConfig(" nonbondedCutoff=1*nanometer,") self.addToConfig(" constraints=HBonds)") # Get the starting temperature and system pressure. temperature = self._protocol.getTemperature().kelvin().value() pressure = self._protocol.getPressure() # Add a Monte Carlo barostat if the simulation is at constant pressure. is_const_pressure = False if pressure is not None: # Cannot use a barostat with a non-periodic system. if not is_periodic: _warnings.warn( "Cannot use a barostat for a vacuum or non-periodic simulation" ) else: is_const_pressure = True # Convert to bar and get the value. pressure = pressure.bar().value() # Create the barostat and add its force to the system. self.addToConfig("\n# Add a barostat to run at constant pressure.") self.addToConfig( f"barostat = MonteCarloBarostat({pressure}*bar, {temperature}*kelvin)" ) if self._is_seeded: self.addToConfig(f"barostat.setRandomNumberSeed({self._seed})") self.addToConfig("system.addForce(barostat)") # Store the number of atoms in each molecule. atom_nums = [] for mol in self._system: atom_nums.append(mol.nAtoms()) # Sort the molecule indices by the number of atoms they contain. sorted_nums = sorted((value, idx) for idx, value in enumerate(atom_nums)) # Set the ligand to the index of the second largest molecule. ligand = sorted_nums[-2][1] # Work out the start/end indices of the ligand within the system. idx_start = self._system.getIndex(self._system[ligand].getAtoms()[0]) idx_end = self._system.getIndex(self._system[ligand + 1].getAtoms()[0]) # Add the funnel variables. p1_string = ",".join([str(x) for x in colvar.getAtoms0()]) p2_string = ",".join([str(x) for x in colvar.getAtoms1()]) self.addToConfig("\n# Set funnel variables.") self.addToConfig(f"p1 = [{p1_string}]") self.addToConfig(f"p2 = [{p2_string}]") self.addToConfig(f"lig = [x for x in range({idx_start}, {idx_end})]") sigma_proj = colvar.getHillWidth()[0].nanometers().value() self.addToConfig("\n# Create the bias variable for the funnel projection.") self.addToConfig( "projection = CustomCentroidBondForce(3, 'distance(g1,g2)*cos(angle(g1,g2,g3))')" ) self.addToConfig("projection.addGroup(lig)") self.addToConfig("projection.addGroup(p1)") self.addToConfig("projection.addGroup(p2)") self.addToConfig("projection.addBond([0,1,2])") self.addToConfig("projection.setUsesPeriodicBoundaryConditions(True)") self.addToConfig(f"sigma_proj = {sigma_proj}") if colvar.getLowerBound() is None and colvar.getUpperBound() is None: # Sane defaults if no bounds are set. lower_wall = 0.5 upper_wall = 4.5 else: lower_wall = colvar.getLowerBound().getValue().nanometers().value() upper_wall = colvar.getUpperBound().getValue().nanometers().value() self.addToConfig( f"proj = BiasVariable(projection, {lower_wall-0.2}, {upper_wall+0.2}, {sigma_proj}, False, gridWidth=200)" ) sigma_ext = colvar.getHillWidth()[1].nanometers().value() self.addToConfig("\n# Create the bias variable for the funnel extent.") self.addToConfig( "extent = CustomCentroidBondForce(3, 'distance(g1,g2)*sin(angle(g1,g2,g3))')" ) self.addToConfig("extent.addGroup(lig)") self.addToConfig("extent.addGroup(p1)") self.addToConfig("extent.addGroup(p2)") self.addToConfig("extent.addBond([0,1,2])") self.addToConfig("extent.setUsesPeriodicBoundaryConditions(True)") extent_max = ( colvar.getWidth().nanometers().value() + colvar.getBuffer().nanometers().value() + 0.2 ) self.addToConfig(f"sigma_ext = {sigma_ext}") self.addToConfig( f"ext = BiasVariable(extent, 0.0, {extent_max}, {sigma_ext}, False, gridWidth=200)" ) # Add restraints. self.addToConfig("\n# Add restraints.") self.addToConfig("k1 = 10000*kilojoules_per_mole") self.addToConfig("k2 = 1000*kilojoules_per_mole") self.addToConfig(f"lower_wall = {lower_wall}*nanometer") self.addToConfig(f"upper_wall = {upper_wall}*nanometer") self.addToConfig("\n# Upper wall.") self.addToConfig( "upper_wall_rest = CustomCentroidBondForce(3, '(k1/2)*max(distance(g1,g2)*cos(angle(g1,g2,g3)) - upper_wall, 0)^2')" ) self.addToConfig("upper_wall_rest.addGroup(lig)") self.addToConfig("upper_wall_rest.addGroup(p1)") self.addToConfig("upper_wall_rest.addGroup(p2)") self.addToConfig("upper_wall_rest.addBond([0,1,2])") self.addToConfig("upper_wall_rest.addGlobalParameter('k1', k1)") self.addToConfig( "upper_wall_rest.addGlobalParameter('upper_wall', upper_wall)" ) self.addToConfig("upper_wall_rest.setUsesPeriodicBoundaryConditions(True)") self.addToConfig("system.addForce(upper_wall_rest)") self.addToConfig("\n# Sides of the funnel.") self.addToConfig( f"wall_width = {colvar.getWidth().nanometers().value()}*nanometer" ) self.addToConfig( f"wall_buffer = {colvar.getBuffer().nanometers().value()}*nanometer" ) self.addToConfig(f"beta_cent = {colvar.getSteepness()}") self.addToConfig( f"s_cent = {colvar.getInflection().nanometers().value()}*nanometer" ) self.addToConfig( "dist_restraint = CustomCentroidBondForce(3, '(k2/2)*max(distance(g1,g2)*sin(angle(g1,g2,g3)) - (a/(1+exp(b*(distance(g1,g2)*cos(angle(g1,g2,g3))-c)))+d), 0)^2')" ) self.addToConfig("dist_restraint.addGroup(lig)") self.addToConfig("dist_restraint.addGroup(p1)") self.addToConfig("dist_restraint.addGroup(p2)") self.addToConfig("dist_restraint.addBond([0,1,2])") self.addToConfig("dist_restraint.addGlobalParameter('k2', k2)") self.addToConfig("dist_restraint.addGlobalParameter('a', wall_width)") self.addToConfig("dist_restraint.addGlobalParameter('b', beta_cent)") self.addToConfig("dist_restraint.addGlobalParameter('c', s_cent)") self.addToConfig("dist_restraint.addGlobalParameter('d', wall_buffer)") self.addToConfig("dist_restraint.setUsesPeriodicBoundaryConditions(True)") self.addToConfig("system.addForce(dist_restraint)") self.addToConfig("\n# Lower wall.") self.addToConfig( "lower_wall_rest = CustomCentroidBondForce(3, '(k1/2)*min(distance(g1,g2)*cos(angle(g1,g2,g3)) - lower_wall, 0)^2')" ) self.addToConfig("lower_wall_rest.addGroup(lig)") self.addToConfig("lower_wall_rest.addGroup(p1)") self.addToConfig("lower_wall_rest.addGroup(p2)") self.addToConfig("lower_wall_rest.addBond([0,1,2])") self.addToConfig("lower_wall_rest.addGlobalParameter('k1', k1)") self.addToConfig( "lower_wall_rest.addGlobalParameter('lower_wall', lower_wall)" ) self.addToConfig("lower_wall_rest.setUsesPeriodicBoundaryConditions(True)") self.addToConfig("system.addForce(lower_wall_rest)") # Get the number of steps to date. step = 0 if _os.path.isfile(_os.path.join(str(self._work_dir), f"{self._name}.xml")): if _os.path.isfile( _os.path.join(str(self._work_dir), f"{self._name}.log") ): with open( _os.path.join(str(self._work_dir), f"{self._name}.log"), "r" ) as f: lines = f.readlines() last_line = lines[-1].split() try: step = int(last_line[0]) except: raise _IncompatibleError( "Failed to read current integration " f"step from '{self._name}.log'" ) else: raise IOError(f"Missing log file: '{self._name}.log'") # Get the report and restart intervals. report_interval = self._protocol.getReportInterval() restart_interval = self._protocol.getRestartInterval() # Get the hill deposition frequency. hill_freq = self._protocol.getHillFrequency() # Work out the number of integration steps. total_steps = _math.ceil( self._protocol.getRunTime() / self._protocol.getTimeStep() ) # Work out the number of cycles. # (Record COLVAR and HILLS at hill deposition frequency.) total_cycles = _math.ceil(total_steps / hill_freq) # Subtract the current number of steps. remaining_steps = total_steps - step # Exit if the simulation has already finished. if remaining_steps <= 0: print("The simulation has already finished!") return # Cap the intervals at the total number of steps. if report_interval > remaining_steps: report_interval = remaining_steps if restart_interval > remaining_steps: restart_interval = remaining_steps self.addToConfig("\n# Initialise the metadynamics object.") if self._protocol.getBiasFactor() is None: bias = 1.0000001 else: bias = self._protocol.getBiasFactor() self.addToConfig(f"bias = {bias}") height = self._protocol.getHillHeight().kj_per_mol().value() self.addToConfig( f"meta = Metadynamics(system, [proj, ext], {temperature}*kelvin, bias, {height}*kilojoules_per_mole, {hill_freq}, biasDir = '.', saveFrequency = {report_interval})" ) # Get the integration time step from the protocol. timestep = self._protocol.getTimeStep().picoseconds().value() # Set the integrator. self.addToConfig("\n# Define the integrator.") self.addToConfig( f"integrator = LangevinMiddleIntegrator({temperature}*kelvin," ) self.addToConfig(" 1/picosecond,") self.addToConfig(f" {timestep}*picoseconds)") if self._is_seeded: self.addToConfig(f"integrator.setRandomNumberSeed({self._seed})") # Add the platform information. self._add_config_platform() # Set up the simulation object. self.addToConfig("\n# Initialise and configure the simulation object.") self.addToConfig("simulation = Simulation(prm.topology,") self.addToConfig(" system,") self.addToConfig(" integrator,") self.addToConfig(" platform,") self.addToConfig(" properties)") self.addToConfig("simulation.context.setPositions(prm.positions)") self.addToConfig("if prm.box_vectors is not None:") self.addToConfig( " box_vectors = reducePeriodicBoxVectors(prm.box_vectors)" ) self.addToConfig( " simulation.context.setPeriodicBoxVectors(*box_vectors)" ) # Set initial velocities from temperature distribution. self.addToConfig("\n# Setting initial system velocities.") self.addToConfig( f"simulation.context.setVelocitiesToTemperature({temperature})" ) # Check for a restart file and load the simulation state. is_restart, step = self._add_config_restart() # Add the reporters. self.addToConfig("\n# Add reporters.") self._add_config_reporters( state_interval=report_interval, traj_interval=restart_interval, is_restart=is_restart, ) # Create the HILLS file. self.addToConfig("\n# Create PLUMED compatible HILLS file.") self.addToConfig("if is_restart:") self.addToConfig(" file = open('HILLS','a')") self.addToConfig("else:") self.addToConfig(" file = open('HILLS','w')") self.addToConfig( " file.write('#! FIELDS time pp.proj pp.ext sigma_pp.proj sigma_pp.ext height biasf\\n')" ) self.addToConfig(" file.write('#! SET multivariate false\\n')") self.addToConfig(" file.write('#! SET kerneltype gaussian\\n')") # Get the initial collective variables. self.addToConfig("\n# Initialise the collective variable array.") self.addToConfig( "current_cvs = np.array(list(meta.getCollectiveVariables(simulation)) + [meta.getHillHeight(simulation)])" ) # Write the initial record. self.addToConfig("\n# Write the initial collective variable record.") self.addToConfig("if is_restart:") self.addToConfig(" if os.path.isfile('COLVAR.npy'):") self.addToConfig(" colvar_array = np.load('COLVAR.npy')") self.addToConfig( " colvar_array = np.append(colvar_array, [current_cvs], axis=0)" ) self.addToConfig(" else:") self.addToConfig(" raise IOError('Missing COLVAR file: COLVAR.npy')") self.addToConfig("else:") self.addToConfig(" colvar_array = np.array([current_cvs])") self.addToConfig(" line = colvar_array[0]") self.addToConfig(" time = 0") self.addToConfig( " write_line = f'{time:15} {line[0]:20.16f} {line[1]:20.16f} {sigma_proj} {sigma_ext} {line[2]:20.16f} {bias}\\n'" ) self.addToConfig(" file.write(write_line)") # Run the metadynamics simulation. self.addToConfig("\n# Run the simulation.") self.addToConfig(f"total_steps = {total_steps}") self.addToConfig(f"total_cycles = {total_cycles}") self.addToConfig(f"remaining_steps = {remaining_steps}") self.addToConfig("steps_per_cycle = math.ceil(total_steps / total_cycles)") self.addToConfig( "remaining_cycles = math.ceil(remaining_steps / steps_per_cycle)" ) self.addToConfig(f"start_cycles = total_cycles - remaining_cycles") self.addToConfig("checkpoint = 100") self.addToConfig("if is_restart:") self.addToConfig(" fraction_complete = step / total_steps") self.addToConfig(" percent_complete = 100 * fraction_complete") self.addToConfig(" print('Loaded state from an existing simulation.')") self.addToConfig( " print(f'Simulation is {percent_complete}% complete.')" ) self.addToConfig(" if fraction_complete > 1.0:") self.addToConfig( " start_cycles = math.ceil(fraction_complete * total_cycles)" ) self.addToConfig(" total_cycles = start_cycles + total_cycles") self.addToConfig("for x in range(start_cycles, total_cycles):") self.addToConfig(" meta.step(simulation, steps_per_cycle)") self.addToConfig( " current_cvs = np.array(list(meta.getCollectiveVariables(simulation)) + [meta.getHillHeight(simulation)])" ) self.addToConfig( " colvar_array = np.append(colvar_array, [current_cvs], axis=0)" ) self.addToConfig(" np.save('COLVAR.npy', colvar_array)") self.addToConfig(" line = colvar_array[x+1]") self.addToConfig(f" time = int((x+1) * {timestep}*steps_per_cycle)") self.addToConfig( " write_line = f'{time:15} {line[0]:20.16f} {line[1]:20.16f} {sigma_proj} {sigma_ext} {line[2]:20.16f} {bias}\\n'" ) self.addToConfig(" file.write(write_line)") self.addToConfig(" # Save the simulation state every 100 picoseconds.") self.addToConfig(" if time >= checkpoint:") self.addToConfig(f" simulation.saveState('{self._name}.xml')") self.addToConfig(" while time >= checkpoint:") self.addToConfig(" checkpoint += 100") # Create a dummy PLUMED input file so that we can bind PLUMED # analysis functions to this process. self._plumed = _Plumed(str(self._work_dir)) plumed_config, auxillary_files = self._plumed.createConfig( self._system, self._protocol, self._property_map ) # Expose the PLUMED specific member functions. setattr(self, "getFreeEnergy", self._getFreeEnergy) setattr(self, "getCollectiveVariable", self._getCollectiveVariable) setattr(self, "sampleConfigurations", self._sampleConfigurations) setattr(self, "getTime", self._getTime) else: raise _IncompatibleError( "Unsupported protocol: '%s'" % self._protocol.__class__.__name__ ) # Flag that this isn't a custom protocol. self._protocol._setCustomised(False)
[docs] def getConfig(self): """ Get the list of strings defining the OpenMM Python script. Returns ------- config : [str] The list of configuration strings. """ return super().getConfig()
def setConfig(self, config): """ Set the list of strings defining the OpenMM Python script. Parameters ---------- config : str, [str] The list of configuration strings, or a path to a configuration file. """ return super().setConfig()
[docs] def addToConfig(self, config): """ Add a string to the OpenMM Python script configuration. Parameters ---------- config : str, [str] A configuration string, a list of configuration strings, or a path to a configuration file. """ # Call the base class method. super().addToConfig(config)
[docs] def resetConfig(self): """Reset the OpenMM Python script configuration.""" self._generate_config()
[docs] def setConfig(self, config): """ Set the list of configuration file strings. Parameters ---------- config : str, [str] The list of configuration strings, or a path to a configuration file. """ # Call the base class method. super().setConfig(config)
[docs] def start(self): """ Start the OpenMM process. Returns ------- process : :class:`Process.OpenMM <BioSimSpace.Process.OpenMM>` A handle to the OpenMM process. """ # The process is currently queued. if self.isQueued(): return # Process is already running. if self._process is not None: if self._process.isRunning(): return # Clear any existing output. self._clear_output() # Run the process in the working directory. with _Utils.cd(self._work_dir): # Create the arguments string list. # The name of the Python script (config file) is the first argument. args = ["%s" % self._config_file] args.extend(self.getArgStringList()) # Write the command-line process to a README.txt file. with open("README.txt", "w") as f: # Set the command-line string. self._command = ( "%s %s " % (self._exe, self._config_file) + self.getArgString() ) # Write the command to file. f.write("# OpenMM was run with the following command:\n") f.write("%s\n" % self._command) # Start the timer. self._timer = _timeit.default_timer() # Start the simulation. self._process = _SireBase.Process.run( self._exe, args, "%s.out" % self._name, "%s.err" % self._name ) return self
[docs] def getSystem(self, block="AUTO"): """ Get the latest molecular system. Parameters ---------- block : bool Whether to block until the process has finished running. Returns ------- system : :class:`System <BioSimSpace._SireWrappers.System>` The latest molecular system. """ # Wait for the process to finish. if block is True: self.wait() elif block == "AUTO" and self._is_blocked: self.wait() # Try to get the most recent trajectory frame. try: # Handle minimisation protocols separately. if isinstance(self._protocol, _Protocol.Minimisation): # Do we need to get coordinates for the lambda=1 state. if "is_lambda1" in self._property_map: is_lambda1 = True else: is_lambda1 = False traj = self.getTrajectory() # If there is no trajectory, simply return None. if traj is None: return None # Get the last frame. new_system = traj.getFrames(-1)[0] # Make a copy of the existing molecular system. old_system = self._system.copy() # Update the coordinates and velocities and return a mapping between # the molecule indices in the two systems. sire_system, mapping = _SireIO.updateCoordinatesAndVelocities( old_system._sire_object, new_system._sire_object, self._mapping, is_lambda1, self._property_map, self._property_map, ) # Update the underlying Sire object. old_system._sire_object = sire_system # Store the mapping between the MolIdx in both systems so we don't # need to recompute it next time. self._mapping = mapping # Update the box information in the original system. if "space" in new_system._sire_object.propertyKeys(): box = new_system._sire_object.property("space") if box.isPeriodic(): old_system._sire_object.setProperty( self._property_map.get("space", "space"), box ) return old_system else: # Work out the total number of trajectory frames. num_frames = int( (self._protocol.getRunTime() / self._protocol.getTimeStep()) / self._protocol.getRestartInterval() ) # Work out the fraction of the simulation that has been completed. frac_complete = self._protocol.getRunTime() / self.getTime() # Make sure the fraction doesn't exceed one. OpenMM can report # time values that are larger than the number of integration steps # multiplied by the time step. if frac_complete > 1: frac_complete = 1 # Work out the trajectory frame index, rounding down. # Remember that frames in MDTraj are zero indexed, like Python. index = int(frac_complete * num_frames) if index > 0: index -= 1 # Return the most recent frame. return self.getFrame(index) except: return None
[docs] def getCurrentSystem(self): """ Get the latest molecular system. Returns ------- system : :class:`System <BioSimSpace._SireWrappers.System>` The latest molecular system. """ return self.getSystem(block=False)
[docs] def getTrajectory(self, backend="AUTO", block="AUTO"): """ Return a trajectory object. Parameters ---------- backend : str The backend to use for trajectory parsing. To see supported backends, run BioSimSpace.Trajectory.backends(). Using "AUTO" will try each in sequence. block : bool Whether to block until the process has finished running. Returns ------- trajectory : :class:`System <BioSimSpace.Trajectory.Trajectory>` The latest trajectory object. """ if not isinstance(backend, str): raise TypeError("'backend' must be of type 'str'") if not isinstance(block, (bool, str)): raise TypeError("'block' must be of type 'bool' or 'str'") # Wait for the process to finish. if block is True: self.wait() elif block == "AUTO" and self._is_blocked: self.wait() # Warn the user if the process has exited with an error. if self.isError(): _warnings.warn("The process exited with an error!") if not _os.path.isfile(self._traj_file): return None else: return _Trajectory.Trajectory(process=self, backend=backend)
[docs] def getFrame(self, index): """ Return a specific trajectory frame. Parameters ---------- index : int The index of the frame. Returns ------- frame : :class:`System <BioSimSpace._SireWrappers.System>` The System object of the corresponding frame. """ if not type(index) is int: raise TypeError("'index' must be of type 'int'") max_index = ( int( (self._protocol.getRunTime() / self._protocol.getTimeStep()) / self._protocol.getRestartInterval() ) - 1 ) if index < 0 or index > max_index: raise ValueError(f"'index' must be in range [0, {max_index}].") # Warn the user if the process has exited with an error. if self.isError(): _warnings.warn("The process exited with an error!") try: # Do we need to get coordinates for the lambda=1 state. if "is_lambda1" in self._property_map: is_lambda1 = True else: is_lambda1 = False # Create a copy of the existing system object. old_system = self._system.copy() # Get the latest trajectory frame. new_system = _Trajectory.getFrame(self._traj_file, self._top_file, index) # Update the coordinates and velocities and return a mapping between # the molecule indices in the two systems. sire_system, mapping = _SireIO.updateCoordinatesAndVelocities( old_system._sire_object, new_system._sire_object, self._mapping, is_lambda1, self._property_map, self._property_map, ) # Update the underlying Sire object. old_system._sire_object = sire_system # Store the mapping between the MolIdx in both systems so we don't # need to recompute it next time. self._mapping = mapping # Update the box information in the original system. if "space" in new_system._sire_object.propertyKeys(): box = new_system._sire_object.property("space") old_system._sire_object.setProperty( self._property_map.get("space", "space"), box ) return old_system except: return None
[docs] def getRecord(self, record, time_series=False, unit=None, block="AUTO"): """ Get a record from the stdout dictionary. Parameters ---------- record : str The record key. time_series : bool Whether to return a list of time series records. unit : :class:`Unit <BioSimSpace.Units>` The unit to convert the record to. block : bool Whether to block until the process has finished running. Returns ------- record : :class:`Type <BioSimSpace.Types>` The matching record. """ # Wait for the process to finish. if block is True: self.wait() elif block == "AUTO" and self._is_blocked: self.wait() # Warn the user if the process has exited with an error. if self.isError(): _warnings.warn("The process exited with an error!") self._update_stdout_dict() return self._get_stdout_record(record, time_series, unit)
[docs] def getCurrentRecord(self, record, time_series=False, unit=None): """ Get a current record from the stdout dictionary. Parameters ---------- record : str The record key. time_series : bool Whether to return a list of time series records. unit : :class:`Unit <BioSimSpace.Units>` The unit to convert the record to. Returns ------- record : :class:`Type <BioSimSpace.Types>` The matching record. """ # Warn the user if the process has exited with an error. if self.isError(): _warnings.warn("The process exited with an error!") self._update_stdout_dict() return self._get_stdout_record(record, time_series, unit)
[docs] def getRecords(self, block="AUTO"): """ Return the dictionary of stdout time-series records. Parameters ---------- block : bool Whether to block until the process has finished running. Returns ------- records : :class:`MultiDict <BioSimSpace.Process._process._MultiDict>` The dictionary of time-series records. """ # Wait for the process to finish. if block is True: self.wait() elif block == "AUTO" and self._is_blocked: self.wait() # Warn the user if the process has exited with an error. if self.isError(): _warnings.warn("The process exited with an error!") self._update_stdout_dict() return self._stdout_dict.copy()
[docs] def getCurrentRecords(self): """ Return the current dictionary of stdout time-series records. Parameters ---------- block : bool Whether to block until the process has finished running. Returns ------- records : :class:`MultiDict <BioSimSpace.Process._process._MultiDict>` The dictionary of time-series records. """ return self.getRecords(block=False)
[docs] def getTime(self, time_series=False, block="AUTO"): """ Get the simulation time. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- time : :class:`Time <BioSimSpace.Types.Time>` The current simulation time in nanoseconds. """ return self.getRecord("TIME(PS)", time_series, _Units.Time.picosecond, block)
[docs] def getCurrentTime(self, time_series=False): """ Get the current simulation time. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- time : :class:`Time <BioSimSpace.Types.Time>` The current simulation time in nanoseconds. """ return self.getTime(time_series, block=False)
[docs] def getStep(self, time_series=False, block="AUTO"): """ Get the number of integration steps. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- step : int The current number of integration steps. """ return self.getRecord("STEP", time_series, None, block)
[docs] def getCurrentStep(self, time_series=False): """ Get the current number of integration steps. Parameters ---------- time_series : bool Whether to return a list of time series records. Returns ------- step : int The current number of integration steps. """ return self.getStep(time_series, block=False)
[docs] def getPotentialEnergy(self, time_series=False, block="AUTO"): """ Get the potential energy. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- energy : :class:`Energy <BioSimSpace.Types.Energy>` The potential energy. """ return self.getRecord( "POTENTIALENERGY(KJ/MOLE)", time_series, _Units.Energy.kj_per_mol, block )
[docs] def getCurrentPotentialEnergy(self, time_series=False): """ Get the current potential energy. Parameters ---------- time_series : bool Whether to return a list of time series records. Returns ------- energy : :class:`Energy <BioSimSpace.Types.Energy>` The potential energy. """ return self.getPotentialEnergy(time_series, block=False)
[docs] def getKineticEnergy(self, time_series=False, block="AUTO"): """ Get the kinetic energy. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- energy : :class:`Energy <BioSimSpace.Types.Energy>` The kinetic energy. """ return self.getRecord( "KINETICENERGY(KJ/MOLE)", time_series, _Units.Energy.kj_per_mol, block )
[docs] def getCurrentKineticEnergy(self, time_series=False): """ Get the current kinetic energy. Parameters ---------- time_series : bool Whether to return a list of time series records. Returns ------- energy : :class:`Energy <BioSimSpace.Types.Energy>` The kinetic energy. """ return self.getKineticEnergy(time_series, block=False)
[docs] def getTotalEnergy(self, time_series=False, block="AUTO"): """ Get the total energy. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- energy : :class:`Energy <BioSimSpace.Types.Energy>` The total energy. """ return self.getRecord( "TOTALENERGY(KJ/MOLE)", time_series, _Units.Energy.kj_per_mol, block )
[docs] def getCurrentTotalEnergy(self, time_series=False): """ Get the current total energy. Parameters ---------- time_series : bool Whether to return a list of time series records. Returns ------- energy : :class:`Energy <BioSimSpace.Types.Energy>` The total energy. """ return self.getTotalEnergy(time_series, block=False)
[docs] def getTemperature(self, time_series=False, block="AUTO"): """ Get the temperature. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- temperature : :class:`Temperature <BioSimSpace.Types.Temperature>` The temperature. """ return self.getRecord( "TEMPERATURE(K)", time_series, _Units.Temperature.kelvin, block )
[docs] def getCurrentTemperature(self, time_series=False): """ Get the current temperature. Parameters ---------- time_series : bool Whether to return a list of time series records. Returns ------- temperature : :class:`Temperature <BioSimSpace.Types.Temperature>` The current temperature. """ return self.getTemperature(time_series, block=False)
[docs] def getVolume(self, time_series=False, block="AUTO"): """ Get the volume. Parameters ---------- time_series : bool Whether to return a list of time series records. block : bool Whether to block until the process has finished running. Returns ------- volume : :class:`Volume <BioSimSpace.Types.Volume>` The volume. """ return self.getRecord( "BOXVOLUME(NM^3)", time_series, _Units.Volume.nanometer3, block )
[docs] def getCurrentVolume(self, time_series=False): """ Get the current volume. Parameters ---------- time_series : bool Whether to return a list of time series records. Returns ------- volume : :class:`Volume <BioSimSpace.Types.Volume>` The volume. """ return self.getVolume(time_series, block=False)
[docs] def stdout(self, n=10): """ Print the last n lines of the stdout buffer. Parameters ---------- n : int The number of lines to print. """ # Note that thermodynamic records, e.g. energy, pressure, temperature, # are redirected to a log file. # Ensure that the number of lines is positive. if n < 0: raise ValueError("The number of lines must be positive!") # Append any new lines to the stdout list. for line in _pygtail.Pygtail(self._stdout_file): self._stdout.append(line.rstrip()) # Get the current number of lines. num_lines = len(self._stdout) # Set the line from which to start printing. if num_lines < n: start = 0 else: start = num_lines - n # Print the lines. for x in range(start, num_lines): print(self._stdout[x])
def _add_config_imports(self): """ Helper function to write the header (import statements) to the OpenMM Python script (config file). """ # We should verify that openmm is available to prevent # difficult-to-debug errors in the run script from BioSimSpace._Utils import _try_import, _assert_imported _openmm = _try_import("openmm") _assert_imported(_openmm) self.addToConfig("from openmm import *") self.addToConfig("from openmm.app import *") self.addToConfig("from openmm.unit import *") self.addToConfig( "from openmm.app.internal.unitcell import reducePeriodicBoxVectors" ) self.addToConfig("import parmed") def _add_config_platform(self): """ Helper function to add platform information to the OpenMM Python script. """ # Set the simulation platform. self.addToConfig("\n# Set the simulation platform.") self.addToConfig(f"platform = Platform.getPlatformByName('{self._platform}')") if self._platform == "CPU": self.addToConfig("properties = {}") elif self._platform == "CUDA": cuda_devices = _os.environ.get("CUDA_VISIBLE_DEVICES") if cuda_devices is None: cuda_devices = "0" _warnings.warn( "'CUDA' platform selected but 'CUDA_VISIBLE_DEVICES' " "environment variable is unset. Defaulting to '0'." ) self.addToConfig("properties = {'CudaDeviceIndex': '%s'}" % cuda_devices) elif self._platform == "OPENCL": opencl_devices = _os.environ.get("OPENCL_VISIBLE_DEVICES") if opencl_devices is None: opencl_devices = "0" _warnings.warn( "'OpenCL' platform selected but 'OPENCL_VISIBLE_DEVICES' " "environment variable is unset. Defaulting to '0'." ) self.addToConfig( "properties = {'OpenCLDeviceIndex': '%s'}" % opencl_devices ) def _add_config_restart(self): """Helper function to check for a restart file and load state information.""" self.addToConfig("\n# Check for a restart file.") self.addToConfig(f"if os.path.isfile('{self._name}.xml'):") self.addToConfig(" is_restart = True") self.addToConfig(f" simulation.loadState('{self._name}.xml')") self.addToConfig(f" if not os.path.isfile('{self._name}.log'):") self.addToConfig(f" raise IOError('Missing log file: {self._name}.log')") self.addToConfig(f" with open('{self._name}.log', 'r') as f:") self.addToConfig(" lines = f.readlines()") self.addToConfig(" last_line = lines[-1].split()") self.addToConfig(" try:") self.addToConfig(" step = int(last_line[0])") self.addToConfig(" except:") self.addToConfig( f" raise IOError('Failed to read current integration step from {self._name}.log')" ) self.addToConfig(" simulation.currentStep = step") self.addToConfig("else:") self.addToConfig(" is_restart = False") if _os.path.isfile(_os.path.join(str(self._work_dir), f"{self._name}.xml")): with open( _os.path.join(str(self._work_dir), f"{self._name}.log"), "r" ) as f: lines = f.readlines() last_line = lines[-1].split() step = int(last_line[0]) return True, step else: return False, 0 def _add_config_monkey_patches(self): """ Helper function to write any monkey-patches to the OpenMM Python script (config file). """ # We monkey-patch the OpenMM DCDFile.writeModel method to avoid writing the # positions of any dummy atoms that are used as restraints to trajectory files. # This avoids the need to delete the dummies from the molecular system on read, # allowing us to make use of System._updateCoordinates which requires that the # number of atoms are consistent between systems. (Deleting the dummies from the # system is slower than not writing them in the first place.) self.addToConfig( "\n# Monkey-patch the DCD.writeModel method to avoid writing dummy-atom positions." ) # Store the original writeModel method. self.addToConfig("writeModel = DCDFile.writeModel") # Create a monkey-patch where we slice the positions list to match the # number of atoms in the topology, then pass this through to the original # writeModel method. self.addToConfig( "def writeModelPatched(self, positions, unitCellDimensions=None, periodicBoxVectors=None):" ) self.addToConfig( " positions = positions[:len(list(self._topology.atoms()))]" ) self.addToConfig(" writeModel(self,") self.addToConfig(" positions,") self.addToConfig(" unitCellDimensions=unitCellDimensions,") self.addToConfig(" periodicBoxVectors=periodicBoxVectors)") # Replace the writeModel method with the monkey-patch. self.addToConfig("DCDFile.writeModel = writeModelPatched") def _add_config_reporters( self, state_interval=100, traj_interval=500, is_restart=False ): """ Helper function to write the reporter (output statements) section to the OpenMM Python script (config file). Parameters ---------- state_interval : int The frequency at which to write state information in integration steps. traj_interval : int The frequency at which to write trajectory frames in integration steps. is_restart : bool Whether the simulation is a restart. """ if not type(state_interval) is int: raise TypeError("'state_interval' must be of type 'int'.") if state_interval <= 0: raise ValueError("'state_interval' must be a positive integer.") if not type(traj_interval) is int: raise TypeError("'traj_interval' must be of type 'int'.") if traj_interval <= 0: raise ValueError("'traj_interval' must be a positive integer.") if not isinstance(is_restart, bool): raise TypeError("'is_restart' must be of type 'bool'.") # Append to a trajectory file every 500 steps. self.addToConfig( f"simulation.reporters.append(DCDReporter('{self._name}.dcd', {traj_interval}, append={is_restart}))" ) # Disable specific state information for minimisation protocols. if isinstance(self._protocol, _Protocol.Minimisation): is_step = False is_time = False is_temperature = False else: is_step = True is_time = True is_temperature = True # Work out the total number of steps. if isinstance(self._protocol, _Protocol.Minimisation): total_steps = 1 else: total_steps = _math.ceil( self._protocol.getRunTime() / self._protocol.getTimeStep() ) # Write state information to file every 100 steps. self.addToConfig(f"log_file = open('{self._name}.log', 'a')") self.addToConfig(f"simulation.reporters.append(StateDataReporter(log_file,") self.addToConfig( f" {state_interval}," ) self.addToConfig( f" step={is_step}," ) self.addToConfig( f" time={is_time}," ) self.addToConfig( " potentialEnergy=True," ) self.addToConfig( " kineticEnergy=True," ) self.addToConfig( " totalEnergy=True," ) self.addToConfig( f" temperature={is_temperature}," ) self.addToConfig(" volume=True,") self.addToConfig( f" totalSteps={total_steps}," ) self.addToConfig(" speed=True,") self.addToConfig( " remainingTime=True," ) self.addToConfig( " separator=' '))" ) def _add_config_restraints(self): """Helper function to add position restraints to the OpenMM script (config file).""" # Add position restraints. This uses the approach from: # https://github.com/openmm/openmm/issues/2262#issuecomment-464157489 # Here zero-mass dummy atoms are bonded to the restrained atoms to avoid # issues with position rescaling during barostat updates. restraint = self._protocol.getRestraint() if restraint is not None: # Search for the atoms to restrain by keyword. if isinstance(restraint, str): restrained_atoms = self._reference_system.getRestraintAtoms(restraint) # Use the user-defined list of indices. else: restrained_atoms = restraint self.addToConfig( f"ref_prm = parmed.load_file('{self._name}.prm7', '{self._name}_ref.rst7')" ) # Get the force constant in units of kJ_per_mol/nanometer**2 force_constant = self._protocol.getForceConstant()._sire_unit force_constant = force_constant.to( _SireUnits.kJ_per_mol / _SireUnits.nanometer2 ) self.addToConfig( "\n# Restrain the position of atoms using zero-mass dummy atoms." ) self.addToConfig("restraint = HarmonicBondForce()") self.addToConfig("restraint.setUsesPeriodicBoundaryConditions(True)") self.addToConfig("system.addForce(restraint)") self.addToConfig( "nonbonded = [f for f in system.getForces() if isinstance(f, NonbondedForce)][0]" ) self.addToConfig("dummy_indices = []") self.addToConfig("positions = ref_prm.positions") self.addToConfig(f"restrained_atoms = {restrained_atoms}") self.addToConfig("for i in restrained_atoms:") self.addToConfig(" j = system.addParticle(0)") self.addToConfig(" nonbonded.addParticle(0, 1, 0)") self.addToConfig(" nonbonded.addException(i, j, 0, 1, 0)") self.addToConfig( f" restraint.addBond(i, j, 0*nanometers, {force_constant}*kilojoules_per_mole/nanometer**2)" ) self.addToConfig(" dummy_indices.append(j)") self.addToConfig(" positions.append(positions[i])") def _update_stdout_dict(self): """Update the dictionary of thermodynamic records.""" # Exit if log file hasn't been created. if not _os.path.isfile(self._log_file): return # A list of the new record lines. lines = [] # Append any new lines. for line in _pygtail.Pygtail(self._log_file): lines.append(line) # Append any new records to the stdout dictionary. for line in lines: # Strip leading/trailing whitespace. line = line.strip() # This is the header record. if line[0] == "#": if self._header is None: # Store the header. self._header = line else: # If this is a restart, make sure the information in the file # is consistent. if line != self._header: raise _IncompatibleError( "Mismatch in the log file header.! " f"Original header: '{self._header}', " f"Current header: '{line}'" ) # Work out what records are in the file and the separator # that is used. While we use a standard format, this makes # sure that we can still parse the log file if the user # happens to have changed the formatting. # A tally counter for the number of quotes that we've seen # in the line so far. num_quotes = 0 # Initialise the separator. sep = "" # Loop over the characters in the line. for c in line: # Increment the number of quotes. if c == '"': num_quotes += 1 # This is the second quote we've seen, start adding # characters to the separator. if num_quotes == 2: sep += c # Break when we've reached the next quote. elif num_quotes == 3: break # The separator includes a leading " character, so delete it. self._record_separator = sep[1:] # Now split the line on the separator to work out the records. # We ignore the first character since it is a comment. # Here we use the full separator, i.e. including the both quotes, # so that we can correctly split record names with spaces in them. records = line[1:].split(sep + '"') # Store the number of records. num_records = len(records) # Map each record string to its position in the array (column order). for idx, record in enumerate(records): # Strip the extra quotes from the record. if idx == 0: record = record[1:] elif idx == num_records - 1: record = record[:-1] # Map the index to the record. Store records in upper # case without whitespace to help catch typos from # the user. self._record_mapping[idx] = record.upper().replace(" ", "") # Extract the records and add them to the dictionary. else: # Split the line on the separator. records = line.split(self._record_separator) # Add each record to the appropriate key in the MultiDict. for idx, record in enumerate(records): # Get the key for this record. key = self._record_mapping[idx] # Update the record dictionary for this key. self._stdout_dict[key] = record def _get_stdout_record(self, key, time_series=False, unit=None): """ Helper function to get a stdout record from the dictionary. Parameters ---------- key : str The record key. time_series : bool Whether to return a time series of records. unit : BioSimSpace.Types._type.Type The unit to convert the record to. Returns ------- record : The matching stdout record. """ # No data! if len(self._stdout_dict) == 0: return None if not isinstance(time_series, bool): _warnings.warn("Non-boolean time-series flag. Defaulting to False!") time_series = False # Validate the unit. if unit is not None: if not isinstance(unit, _Type): raise TypeError("'unit' must be of type 'BioSimSpace.Types'") # Return the list of dictionary values. if time_series: try: if unit is None: return [float(x) for x in self._stdout_dict[key]] else: return [ (float(x) * unit)._to_default_unit() for x in self._stdout_dict[key] ] except KeyError: return None # Return the most recent dictionary value. else: try: if unit is None: return float(self._stdout_dict[key][-1]) else: return (float(self._stdout_dict[key][-1]) * unit)._to_default_unit() except KeyError: return None