######################################################################
# BioSimSpace: Making biomolecular simulation a breeze!
#
# Copyright: 2017-2024
#
# Authors: Lester Hedges <lester.hedges@gmail.com>
#
# BioSimSpace is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BioSimSpace. If not, see <http://www.gnu.org/licenses/>.
#####################################################################
"""Functionality for running simulations with OpenMM."""
__author__ = "Lester Hedges"
__email__ = "lester.hedges@gmail.com"
__all__ = ["OpenMM"]
from .._Utils import _try_import
import math as _math
import os as _os
_pygtail = _try_import("pygtail")
import sys as _sys
import shutil as _shutil
import timeit as _timeit
import warnings as _warnings
from sire.legacy import Base as _SireBase
from sire.legacy import IO as _SireIO
from sire.legacy import Units as _SireUnits
from .. import _isVerbose
from .._Exceptions import IncompatibleError as _IncompatibleError
from .._Exceptions import MissingSoftwareError as _MissingSoftwareError
from .._SireWrappers import System as _System
from ..Metadynamics import CollectiveVariable as _CollectiveVariable
from ..Protocol._position_restraint_mixin import _PositionRestraintMixin
from ..Types._type import Type as _Type
from .. import IO as _IO
from .. import Protocol as _Protocol
from .. import Trajectory as _Trajectory
from .. import Types as _Types
from .. import Units as _Units
from .. import _Utils
from . import _process
from ._plumed import Plumed as _Plumed
[docs]
class OpenMM(_process.Process):
"""A class for running simulations using OpenMM."""
# Dictionary of platforms and their OpenMM keyword.
_platforms = {"CPU": "CPU", "CUDA": "CUDA", "OPENCL": "OpenCL"}
[docs]
def __init__(
self,
system,
protocol,
reference_system=None,
exe=None,
name="openmm",
platform="CPU",
work_dir=None,
seed=None,
property_map={},
**kwargs,
):
"""
Constructor.
Parameters
----------
system : :class:`System <BioSimSpace._SireWrappers.System>`
The molecular system.
protocol : :class:`Protocol <BioSimSpace.Protocol>`
The protocol for the OpenMM process.
reference_system : :class:`System <BioSimSpace._SireWrappers.System>` or None
An optional system to use as a source of reference coordinates for position
restraints. It is assumed that this system has the same topology as "system".
If this is None, then "system" is used as a reference.
exe : str
The full path to the Python interpreter used to run OpenMM.
name : str
The name of the process.
platform : str
The platform for the simulation: "CPU", "CUDA", or "OPENCL".
For CUDA use the CUDA_VISIBLE_DEVICES environment variable to
set the GPUs on which to run, e.g. to run on two GPUs indexed
0 and 1 use: CUDA_VISIBLE_DEVICES=0,1. For OPENCL, instead use
OPENCL_VISIBLE_DEVICES.
work_dir :
The working directory for the process.
seed : int
A random number seed.
property_map : dict
A dictionary that maps system "properties" to their user defined
values. This allows the user to refer to properties with their
own naming scheme, e.g. { "charge" : "my-charge" }
kwargs : dict
Additional keyword arguments.
"""
# Call the base class constructor.
super().__init__(
system,
protocol,
reference_system=reference_system,
name=name,
work_dir=work_dir,
seed=seed,
property_map=property_map,
)
# Set the package name.
self._package_name = "OPENMM"
# This process can generate trajectory data.
self._has_trajectory = True
# If the path a Python interpreter wasn't specified, then use the bundled sire_python.
if exe is None:
bin_dir = _SireBase.getBinDir()
# Generate the name of the sire_python interpreter.
if _sys.platform == "win32":
self._exe = _os.path.join(_os.path.normpath(bin_dir), "sire_python.exe")
else:
self._exe = _os.path.join(_os.path.normpath(bin_dir), "sire_python")
else:
# Make sure executable exists.
if _os.path.isfile(exe):
self._exe = exe
else:
raise IOError("OpenMM Python interpreter doesn't exist: '%s'" % exe)
if not isinstance(platform, str):
raise TypeError("'platform' must be of type 'str'.")
else:
# Strip all whitespace and convert to upper case.
platform = platform.replace(" ", "").upper()
# Check for platform support.
if platform not in self._platforms:
raise ValueError("Supported platforms are: %s" % self._platforms.keys())
else:
self._platform = self._platforms[platform]
# Initialise the stdout dictionary and title header.
self._stdout_dict = _process._MultiDict()
# Store the name of the OpenMM log file.
self._log_file = _os.path.join(str(self._work_dir), f"{name}.log")
# Initialise the log file separator.
self._record_separator = None
# Initialise a dictionary to map log file records to their column order.
self._record_mapping = {}
# The names of the input files. We choose to use AMBER files since they
# are self-contained, but could equally work with GROMACS files.
self._rst_file = _os.path.join(str(self._work_dir), f"{name}.rst7")
self._top_file = _os.path.join(str(self._work_dir), f"{name}.prm7")
self._ref_file = _os.path.join(str(self._work_dir), f"{name}_ref.rst7")
# The name of the trajectory file.
self._traj_file = _os.path.join(str(self._work_dir), f"{name}.dcd")
# Set the path for the OpenMM Python script. (We use the concept of a
# config file for consistency with other Process classes.)
self._config_file = _os.path.join(str(self._work_dir), f"{name}_script.py")
# Create the list of input files.
self._input_files = [self._config_file, self._rst_file, self._top_file]
# Add the reference file if there are position restraints.
if self._protocol.getRestraint() is not None:
self._input_files.append(self._ref_file)
# Initialise the log file header.
self._header = None
# Now set up the working directory for the process.
self._setup()
def __str__(self):
"""Return a human readable string representation of the object."""
return (
"<BioSimSpace.Process.%s: system=%s, protocol=%s, exe='%s', name='%s', platform='%s', work_dir='%s' seed=%s>"
% (
self.__class__.__name__,
str(self._system),
self._protocol.__repr__(),
self._exe + ("%s " % self._script if self._script else ""),
self._name,
self._platform,
self._work_dir,
self._seed,
)
)
def __repr__(self):
"""Return a string showing how to instantiate the object."""
return (
"BioSimSpace.Process.%s(%s, %s, exe='%s', name='%s', platform='%s', work_dir='%s', seed=%s)"
% (
self.__class__.__name__,
str(self._system),
self._protocol.__repr__(),
self._exe + ("%s " % self._script if self._script else ""),
self._name,
self._platform,
self._work_dir,
self._seed,
)
)
def _setup(self):
"""Setup the input files and working directory ready for simulation."""
# Create a copy of the system.
system = self._system.copy()
# Convert the water model topology so that it matches the AMBER naming convention.
system._set_water_topology("AMBER", property_map=self._property_map)
self._reference_system._set_water_topology(
"AMBER", property_map=self._property_map
)
# Check for perturbable molecules and convert to the chosen end state.
system = self._checkPerturbable(system)
# Create the input files...
# RST file (coordinates).
try:
file = _os.path.splitext(self._rst_file)[0]
_IO.saveMolecules(file, system, "rst7", property_map=self._property_map)
except Exception as e:
msg = "Failed to write system to 'RST7' format."
if _isVerbose():
raise IOError(msg) from e
else:
raise IOError(msg) from None
# Reference coordinate file for position restraints.
if self._protocol.getRestraint() is not None:
try:
file = _os.path.splitext(self._ref_file)[0]
_IO.saveMolecules(
file,
self._reference_system,
"rst7",
property_map=self._property_map,
)
except Exception as e:
msg = "Failed to write reference system to 'RST7' format."
if _isVerbose():
raise IOError(msg) from e
else:
raise IOError(msg) from None
# PRM file (topology).
try:
file = _os.path.splitext(self._top_file)[0]
_IO.saveMolecules(
file, system, "prm7", match_water=False, property_map=self._property_map
)
except Exception as e:
msg = "Failed to write system to 'PRM7' format."
if _isVerbose():
raise IOError(msg) from e
else:
raise IOError(msg) from None
# Skip if the user has passed a custom config.
if isinstance(self._protocol, _Protocol.Custom):
self.setConfig(self._protocol.getConfig())
else:
self._generate_config()
self.writeConfig(self._config_file)
# Generate the dictionary of command-line arguments.
self._generate_args()
# Return the list of input files.
return self._input_files
def _generate_config(self):
"""Generate OpenMM Python script file strings."""
# Clear the existing configuration list.
self._config = []
# Get the "space" property from the user mapping.
prop = self._property_map.get("space", "space")
# Check whether the system contains periodic box information.
if prop in self._system._sire_object.propertyKeys():
try:
# Make sure that we have a periodic box. The system will now have
# a default cartesian space.
box = self._system._sire_object.property(prop)
has_box = box.isPeriodic()
except:
has_box = False
else:
_warnings.warn("No simulation box found. Assuming gas phase simulation.")
has_box = False
if isinstance(self._protocol, _Protocol.Minimisation):
# Write the OpenMM import statements.
self._add_config_imports()
self._add_config_monkey_patches()
# Load the input files.
self.addToConfig("\n# Load the topology and coordinate files.")
self.addToConfig(
"\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces."
)
self.addToConfig(
f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')"
)
# Don't use a cut-off if this is a vacuum simulation or if box information
# is missing.
self.addToConfig("\n# Initialise the molecular system.")
if not has_box or not self._has_water:
self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,")
else:
self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,")
self.addToConfig(" nonbondedCutoff=1*nanometer,")
self.addToConfig(" constraints=HBonds)")
# Set the integrator. (Use zero-temperature as this is just a dummy step.)
self.addToConfig("\n# Define the integrator.")
self.addToConfig("integrator = LangevinMiddleIntegrator(0*kelvin,")
self.addToConfig(" 1/picosecond,")
self.addToConfig(" 0.002*picoseconds)")
# Add the platform information.
self._add_config_platform()
# Add any position restraints.
if self._protocol.getRestraint() is not None:
self._add_config_restraints()
# Set up the simulation object.
self.addToConfig("\n# Initialise and configure the simulation object.")
self.addToConfig("simulation = Simulation(prm.topology,")
self.addToConfig(" system,")
self.addToConfig(" integrator,")
self.addToConfig(" platform,")
self.addToConfig(" properties)")
if self._protocol.getRestraint() is not None:
self.addToConfig("simulation.context.setPositions(positions)")
else:
self.addToConfig("simulation.context.setPositions(prm.positions)")
self.addToConfig("if prm.box_vectors is not None:")
self.addToConfig(
" box_vectors = reducePeriodicBoxVectors(prm.box_vectors)"
)
self.addToConfig(
" simulation.context.setPeriodicBoxVectors(*box_vectors)"
)
self.addToConfig(
f"simulation.minimizeEnergy(maxIterations={self._protocol.getSteps()})"
)
# Add the reporters.
self.addToConfig("\n# Add reporters.")
self._add_config_reporters(state_interval=1, traj_interval=1)
# Now run the simulation.
self.addToConfig(
"\n# Run a single simulation step to allow us to get the system and energy."
)
self.addToConfig(f"simulation.step(1)")
elif isinstance(self._protocol, _Protocol.Equilibration):
# Write the OpenMM import statements and monkey-patches.
self._add_config_imports()
self._add_config_monkey_patches()
# Load the input files.
self.addToConfig("\n# Load the topology and coordinate files.")
self.addToConfig(
"\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces."
)
self.addToConfig(
f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')"
)
# Don't use a cut-off if this is a vacuum simulation or if box information
# is missing.
is_periodic = True
self.addToConfig("\n# Initialise the molecular system.")
if not has_box or not self._has_water:
is_periodic = False
self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,")
else:
self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,")
self.addToConfig(" nonbondedCutoff=1*nanometer,")
self.addToConfig(" constraints=HBonds)")
# Get the starting temperature and system pressure.
temperature = self._protocol.getStartTemperature().kelvin().value()
pressure = self._protocol.getPressure()
# Add a Monte Carlo barostat if the simulation is at constant pressure.
is_const_pressure = False
if pressure is not None:
# Cannot use a barostat with a non-periodic system.
if not is_periodic:
_warnings.warn(
"Cannot use a barostat for a vacuum or non-periodic simulation"
)
else:
is_const_pressure = True
# Convert to bar and get the value.
pressure = pressure.bar().value()
# Create the barostat and add its force to the system.
self.addToConfig("\n# Add a barostat to run at constant pressure.")
self.addToConfig(
f"barostat = MonteCarloBarostat({pressure}*bar, {temperature}*kelvin)"
)
if self._is_seeded:
self.addToConfig(f"barostat.setRandomNumberSeed({self._seed})")
self.addToConfig("system.addForce(barostat)")
# Add any position restraints.
if self._protocol.getRestraint() is not None:
self._add_config_restraints()
# Get the integration time step from the protocol.
timestep = self._protocol.getTimeStep().picoseconds().value()
# Set the integrator.
self.addToConfig("\n# Define the integrator.")
self.addToConfig(
f"integrator = LangevinMiddleIntegrator({temperature}*kelvin,"
)
friction = (
1 / self._protocol.getThermostatTimeConstant().picoseconds().value()
)
self.addToConfig(
f" {friction:.5f}/picosecond,"
)
self.addToConfig(f" {timestep}*picoseconds)")
if self._is_seeded:
self.addToConfig(f"integrator.setRandomNumberSeed({self._seed})")
# Add the platform information.
self._add_config_platform()
# Set up the simulation object.
self.addToConfig("\n# Initialise and configure the simulation object.")
self.addToConfig("simulation = Simulation(prm.topology,")
self.addToConfig(" system,")
self.addToConfig(" integrator,")
self.addToConfig(" platform,")
self.addToConfig(" properties)")
if self._protocol.getRestraint() is not None:
self.addToConfig("simulation.context.setPositions(positions)")
else:
self.addToConfig("simulation.context.setPositions(prm.positions)")
self.addToConfig("if prm.box_vectors is not None:")
self.addToConfig(
" box_vectors = reducePeriodicBoxVectors(prm.box_vectors)"
)
self.addToConfig(
" simulation.context.setPeriodicBoxVectors(*box_vectors)"
)
# Set initial velocities from temperature distribution.
self.addToConfig("\n# Setting initial system velocities.")
self.addToConfig(
f"simulation.context.setVelocitiesToTemperature({temperature})"
)
# Work out the number of integration steps.
steps = _math.ceil(
self._protocol.getRunTime() / self._protocol.getTimeStep()
)
# Get the report and restart intervals.
report_interval = self._protocol.getReportInterval()
restart_interval = self._protocol.getRestartInterval()
# Cap the intervals at the total number of steps.
if report_interval > steps:
report_interval = steps
if restart_interval > steps:
restart_interval = steps
# Add the reporters.
self.addToConfig("\n# Add reporters.")
self._add_config_reporters(
state_interval=report_interval,
traj_interval=restart_interval,
is_restart=False,
)
# Now run the simulation.
self.addToConfig("\n# Run the simulation.")
# Constant temperature equilibration.
if self._protocol.isConstantTemp():
self.addToConfig(f"simulation.step({steps})")
# Heating / cooling cycle.
else:
# Adjust temperature every 100 cycles, assuming that there at
# least that many cycles.
if steps > 100:
# Work out the number of temperature cycles.
temp_cycles = _math.ceil(steps / 100)
# Work out the temperature change per cycle.
delta_temp = (
self._protocol.getEndTemperature().kelvin().value()
- self._protocol.getStartTemperature().kelvin().value()
) / temp_cycles
self.addToConfig(f"start_temperature = {temperature}")
self.addToConfig(f"for x in range(0, {temp_cycles}):")
self.addToConfig(
f" temperature = {temperature} + x*{delta_temp}"
)
self.addToConfig(
f" integrator.setTemperature(temperature*kelvin)"
)
if is_const_pressure:
self.addToConfig(
f" barostat.setDefaultTemperature(temperature*kelvin)"
)
self.addToConfig(" simulation.step(100)")
else:
# Work out the temperature change per step.
delta_temp = (
self._protocol.getEndTemperature().kelvin().value()
- self._protocol.getStartTemperature().kelvin().value()
) / steps
self.addToConfig(f"start_temperature = {temperature}")
self.addToConfig(f"for x in range(0, {steps}):")
self.addToConfig(
f" temperature = {temperature} + x*{delta_temp}"
)
self.addToConfig(
f" integrator.setTemperature(temperature*kelvin)"
)
if is_const_pressure:
self.addToConfig(
f" barostat.setDefaultTemperature(temperature*kelvin)"
)
self.addToConfig(" simulation.step(1)")
elif isinstance(self._protocol, _Protocol.Production):
# Write the OpenMM import statements.
self._add_config_imports()
self._add_config_monkey_patches()
# Production specific import.
self.addToConfig("import os")
# Load the input files.
self.addToConfig("\n# Load the topology and coordinate files.")
self.addToConfig(
"\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces."
)
self.addToConfig(
f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')"
)
# Don't use a cut-off if this is a vacuum simulation or if box information
# is missing.
is_periodic = True
self.addToConfig("\n# Initialise the molecular system.")
if not has_box or not self._has_water:
is_periodic = False
self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,")
else:
self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,")
self.addToConfig(" nonbondedCutoff=1*nanometer,")
self.addToConfig(" constraints=HBonds)")
# Get the starting temperature and system pressure.
temperature = self._protocol.getTemperature().kelvin().value()
pressure = self._protocol.getPressure()
# Add a Monte Carlo barostat if the simulation is at constant pressure.
is_const_pressure = False
if pressure is not None:
# Cannot use a barostat with a non-periodic system.
if not is_periodic:
_warnings.warn(
"Cannot use a barostat for a vacuum or non-periodic simulation"
)
else:
is_const_pressure = True
# Convert to bar and get the value.
pressure = pressure.bar().value()
# Create the barostat and add its force to the system.
self.addToConfig("\n# Add a barostat to run at constant pressure.")
self.addToConfig(
f"barostat = MonteCarloBarostat({pressure}*bar, {temperature}*kelvin)"
)
if self._is_seeded:
self.addToConfig(f"barostat.setRandomNumberSeed({self._seed})")
self.addToConfig("system.addForce(barostat)")
# Add any position restraints.
if self._protocol.getRestraint() is not None:
self._add_config_restraints()
# Get the integration time step from the protocol.
timestep = self._protocol.getTimeStep().picoseconds().value()
# Set the integrator.
self.addToConfig("\n# Define the integrator.")
self.addToConfig(
f"integrator = LangevinMiddleIntegrator({temperature}*kelvin,"
)
friction = (
1 / self._protocol.getThermostatTimeConstant().picoseconds().value()
)
self.addToConfig(
f" {friction:.5f}/picosecond,"
)
self.addToConfig(f" {timestep}*picoseconds)")
if self._is_seeded:
self.addToConfig(f"integrator.setRandomNumberSeed({self._seed})")
# Add the platform information.
self._add_config_platform()
# Set up the simulation object.
self.addToConfig("\n# Initialise and configure the simulation object.")
self.addToConfig("simulation = Simulation(prm.topology,")
self.addToConfig(" system,")
self.addToConfig(" integrator,")
self.addToConfig(" platform,")
self.addToConfig(" properties)")
if self._protocol.getRestraint() is not None:
self.addToConfig("simulation.context.setPositions(positions)")
else:
self.addToConfig("simulation.context.setPositions(prm.positions)")
self.addToConfig("if prm.box_vectors is not None:")
self.addToConfig(
" box_vectors = reducePeriodicBoxVectors(prm.box_vectors)"
)
self.addToConfig(
" simulation.context.setPeriodicBoxVectors(*box_vectors)"
)
# Set initial velocities from temperature distribution.
self.addToConfig("\n# Setting initial system velocities.")
self.addToConfig(
f"simulation.context.setVelocitiesToTemperature({temperature})"
)
# Check for a restart file and load the simulation state.
is_restart, step = self._add_config_restart()
# Work out the number of integration steps.
total_steps = _math.ceil(
self._protocol.getRunTime() / self._protocol.getTimeStep()
)
# Subtract the current number of steps.
steps = total_steps - step
# Exit if the simulation has already finished.
if steps <= 0:
print("The simulation has already finished!")
return
# Inform user that a restart was loaded.
self.addToConfig("\n# Print restart information.")
self.addToConfig("if is_restart:")
self.addToConfig(f" steps = {total_steps}")
self.addToConfig(" percent_complete = 100 * (step / steps)")
self.addToConfig(" print('Loaded state from an existing simulation.')")
self.addToConfig(
" print(f'Simulation is {percent_complete}% complete.')"
)
# Get the report and restart intervals.
report_interval = self._protocol.getReportInterval()
restart_interval = self._protocol.getRestartInterval()
# Cap the intervals at the total number of steps.
if report_interval > steps:
report_interval = steps
if restart_interval > steps:
restart_interval = steps
# Add the reporters.
self.addToConfig("\n# Add reporters.")
self._add_config_reporters(
state_interval=report_interval,
traj_interval=restart_interval,
is_restart=is_restart,
)
# Work out the total simulation time in picoseconds.
run_time = steps * timestep
# Work out the number of cycles in 100 picosecond intervals.
cycles = _math.ceil(run_time / 100)
# Work out the number of steps per cycle.
steps_per_cycle = int(steps / cycles)
# Now run the simulation.
self.addToConfig("\n# Run the simulation in 100 picosecond cycles.")
self.addToConfig(f"for x in range(0, {cycles}):")
self.addToConfig(f" simulation.step({steps_per_cycle})")
self.addToConfig(f" simulation.saveState('{self._name}.xml')")
elif isinstance(self._protocol, _Protocol.Metadynamics):
colvar = self._protocol.getCollectiveVariable()
if len(colvar) != 1 or not isinstance(
colvar[0], _CollectiveVariable.Funnel
):
raise _IncompatibleError(
"We currently only support '%s' collective variables for '%s' protocols"
% (
_CollectiveVariable.Funnel.__name__,
self._protocol.__class__.__name__,
)
)
# Create the path to the patched OpenMM metadynamics module.
aux_file = "metadynamics.py"
path = (
_os.path.dirname(_CollectiveVariable.__file__).replace(
"CollectiveVariable", "_aux"
)
+ "/"
+ aux_file
)
# Copy the file into the working directory.
_shutil.copyfile(path, _os.path.join(str(self._work_dir), aux_file))
# The following OpenMM native implementation of the funnel metadynamics protocol
# is adapted from funnel_maker.py by Dominykas Lukauskis.
# Extract the only collective variable.
colvar = colvar[0]
# Write the OpenMM import statements.
self._add_config_imports()
self.addToConfig(
"from metadynamics import *"
) # Use local patched metadynamics module.
self.addToConfig("from glob import glob")
self.addToConfig("import math")
self.addToConfig("import os")
self.addToConfig("import shutil")
# Load the input files.
self.addToConfig("\n# Load the topology and coordinate files.")
self.addToConfig(
"\n# We use ParmEd due to issues with the built in AmberPrmtopFile for certain triclinic spaces."
)
self.addToConfig(
f"prm = parmed.load_file('{self._name}.prm7', '{self._name}.rst7')"
)
# Don't use a cut-off if this is a vacuum simulation or if box information
# is missing.
is_periodic = True
self.addToConfig("\n# Initialise the molecular system.")
if not has_box or not self._has_water:
is_periodic = False
self.addToConfig("system = prm.createSystem(nonbondedMethod=NoCutoff,")
else:
self.addToConfig("system = prm.createSystem(nonbondedMethod=PME,")
self.addToConfig(" nonbondedCutoff=1*nanometer,")
self.addToConfig(" constraints=HBonds)")
# Get the starting temperature and system pressure.
temperature = self._protocol.getTemperature().kelvin().value()
pressure = self._protocol.getPressure()
# Add a Monte Carlo barostat if the simulation is at constant pressure.
is_const_pressure = False
if pressure is not None:
# Cannot use a barostat with a non-periodic system.
if not is_periodic:
_warnings.warn(
"Cannot use a barostat for a vacuum or non-periodic simulation"
)
else:
is_const_pressure = True
# Convert to bar and get the value.
pressure = pressure.bar().value()
# Create the barostat and add its force to the system.
self.addToConfig("\n# Add a barostat to run at constant pressure.")
self.addToConfig(
f"barostat = MonteCarloBarostat({pressure}*bar, {temperature}*kelvin)"
)
if self._is_seeded:
self.addToConfig(f"barostat.setRandomNumberSeed({self._seed})")
self.addToConfig("system.addForce(barostat)")
# Store the number of atoms in each molecule.
atom_nums = []
for mol in self._system:
atom_nums.append(mol.nAtoms())
# Sort the molecule indices by the number of atoms they contain.
sorted_nums = sorted((value, idx) for idx, value in enumerate(atom_nums))
# Set the ligand to the index of the second largest molecule.
ligand = sorted_nums[-2][1]
# Work out the start/end indices of the ligand within the system.
idx_start = self._system.getIndex(self._system[ligand].getAtoms()[0])
idx_end = self._system.getIndex(self._system[ligand + 1].getAtoms()[0])
# Add the funnel variables.
p1_string = ",".join([str(x) for x in colvar.getAtoms0()])
p2_string = ",".join([str(x) for x in colvar.getAtoms1()])
self.addToConfig("\n# Set funnel variables.")
self.addToConfig(f"p1 = [{p1_string}]")
self.addToConfig(f"p2 = [{p2_string}]")
self.addToConfig(f"lig = [x for x in range({idx_start}, {idx_end})]")
sigma_proj = colvar.getHillWidth()[0].nanometers().value()
self.addToConfig("\n# Create the bias variable for the funnel projection.")
self.addToConfig(
"projection = CustomCentroidBondForce(3, 'distance(g1,g2)*cos(angle(g1,g2,g3))')"
)
self.addToConfig("projection.addGroup(lig)")
self.addToConfig("projection.addGroup(p1)")
self.addToConfig("projection.addGroup(p2)")
self.addToConfig("projection.addBond([0,1,2])")
self.addToConfig("projection.setUsesPeriodicBoundaryConditions(True)")
self.addToConfig(f"sigma_proj = {sigma_proj}")
if colvar.getLowerBound() is None and colvar.getUpperBound() is None:
# Sane defaults if no bounds are set.
lower_wall = 0.5
upper_wall = 4.5
else:
lower_wall = colvar.getLowerBound().getValue().nanometers().value()
upper_wall = colvar.getUpperBound().getValue().nanometers().value()
self.addToConfig(
f"proj = BiasVariable(projection, {lower_wall-0.2}, {upper_wall+0.2}, {sigma_proj}, False, gridWidth=200)"
)
sigma_ext = colvar.getHillWidth()[1].nanometers().value()
self.addToConfig("\n# Create the bias variable for the funnel extent.")
self.addToConfig(
"extent = CustomCentroidBondForce(3, 'distance(g1,g2)*sin(angle(g1,g2,g3))')"
)
self.addToConfig("extent.addGroup(lig)")
self.addToConfig("extent.addGroup(p1)")
self.addToConfig("extent.addGroup(p2)")
self.addToConfig("extent.addBond([0,1,2])")
self.addToConfig("extent.setUsesPeriodicBoundaryConditions(True)")
extent_max = (
colvar.getWidth().nanometers().value()
+ colvar.getBuffer().nanometers().value()
+ 0.2
)
self.addToConfig(f"sigma_ext = {sigma_ext}")
self.addToConfig(
f"ext = BiasVariable(extent, 0.0, {extent_max}, {sigma_ext}, False, gridWidth=200)"
)
# Add restraints.
self.addToConfig("\n# Add restraints.")
self.addToConfig("k1 = 10000*kilojoules_per_mole")
self.addToConfig("k2 = 1000*kilojoules_per_mole")
self.addToConfig(f"lower_wall = {lower_wall}*nanometer")
self.addToConfig(f"upper_wall = {upper_wall}*nanometer")
self.addToConfig("\n# Upper wall.")
self.addToConfig(
"upper_wall_rest = CustomCentroidBondForce(3, '(k1/2)*max(distance(g1,g2)*cos(angle(g1,g2,g3)) - upper_wall, 0)^2')"
)
self.addToConfig("upper_wall_rest.addGroup(lig)")
self.addToConfig("upper_wall_rest.addGroup(p1)")
self.addToConfig("upper_wall_rest.addGroup(p2)")
self.addToConfig("upper_wall_rest.addBond([0,1,2])")
self.addToConfig("upper_wall_rest.addGlobalParameter('k1', k1)")
self.addToConfig(
"upper_wall_rest.addGlobalParameter('upper_wall', upper_wall)"
)
self.addToConfig("upper_wall_rest.setUsesPeriodicBoundaryConditions(True)")
self.addToConfig("system.addForce(upper_wall_rest)")
self.addToConfig("\n# Sides of the funnel.")
self.addToConfig(
f"wall_width = {colvar.getWidth().nanometers().value()}*nanometer"
)
self.addToConfig(
f"wall_buffer = {colvar.getBuffer().nanometers().value()}*nanometer"
)
self.addToConfig(f"beta_cent = {colvar.getSteepness()}")
self.addToConfig(
f"s_cent = {colvar.getInflection().nanometers().value()}*nanometer"
)
self.addToConfig(
"dist_restraint = CustomCentroidBondForce(3, '(k2/2)*max(distance(g1,g2)*sin(angle(g1,g2,g3)) - (a/(1+exp(b*(distance(g1,g2)*cos(angle(g1,g2,g3))-c)))+d), 0)^2')"
)
self.addToConfig("dist_restraint.addGroup(lig)")
self.addToConfig("dist_restraint.addGroup(p1)")
self.addToConfig("dist_restraint.addGroup(p2)")
self.addToConfig("dist_restraint.addBond([0,1,2])")
self.addToConfig("dist_restraint.addGlobalParameter('k2', k2)")
self.addToConfig("dist_restraint.addGlobalParameter('a', wall_width)")
self.addToConfig("dist_restraint.addGlobalParameter('b', beta_cent)")
self.addToConfig("dist_restraint.addGlobalParameter('c', s_cent)")
self.addToConfig("dist_restraint.addGlobalParameter('d', wall_buffer)")
self.addToConfig("dist_restraint.setUsesPeriodicBoundaryConditions(True)")
self.addToConfig("system.addForce(dist_restraint)")
self.addToConfig("\n# Lower wall.")
self.addToConfig(
"lower_wall_rest = CustomCentroidBondForce(3, '(k1/2)*min(distance(g1,g2)*cos(angle(g1,g2,g3)) - lower_wall, 0)^2')"
)
self.addToConfig("lower_wall_rest.addGroup(lig)")
self.addToConfig("lower_wall_rest.addGroup(p1)")
self.addToConfig("lower_wall_rest.addGroup(p2)")
self.addToConfig("lower_wall_rest.addBond([0,1,2])")
self.addToConfig("lower_wall_rest.addGlobalParameter('k1', k1)")
self.addToConfig(
"lower_wall_rest.addGlobalParameter('lower_wall', lower_wall)"
)
self.addToConfig("lower_wall_rest.setUsesPeriodicBoundaryConditions(True)")
self.addToConfig("system.addForce(lower_wall_rest)")
# Get the number of steps to date.
step = 0
if _os.path.isfile(_os.path.join(str(self._work_dir), f"{self._name}.xml")):
if _os.path.isfile(
_os.path.join(str(self._work_dir), f"{self._name}.log")
):
with open(
_os.path.join(str(self._work_dir), f"{self._name}.log"), "r"
) as f:
lines = f.readlines()
last_line = lines[-1].split()
try:
step = int(last_line[0])
except:
raise _IncompatibleError(
"Failed to read current integration "
f"step from '{self._name}.log'"
)
else:
raise IOError(f"Missing log file: '{self._name}.log'")
# Get the report and restart intervals.
report_interval = self._protocol.getReportInterval()
restart_interval = self._protocol.getRestartInterval()
# Get the hill deposition frequency.
hill_freq = self._protocol.getHillFrequency()
# Work out the number of integration steps.
total_steps = _math.ceil(
self._protocol.getRunTime() / self._protocol.getTimeStep()
)
# Work out the number of cycles.
# (Record COLVAR and HILLS at hill deposition frequency.)
total_cycles = _math.ceil(total_steps / hill_freq)
# Subtract the current number of steps.
remaining_steps = total_steps - step
# Exit if the simulation has already finished.
if remaining_steps <= 0:
print("The simulation has already finished!")
return
# Cap the intervals at the total number of steps.
if report_interval > remaining_steps:
report_interval = remaining_steps
if restart_interval > remaining_steps:
restart_interval = remaining_steps
self.addToConfig("\n# Initialise the metadynamics object.")
if self._protocol.getBiasFactor() is None:
bias = 1.0000001
else:
bias = self._protocol.getBiasFactor()
self.addToConfig(f"bias = {bias}")
height = self._protocol.getHillHeight().kj_per_mol().value()
self.addToConfig(
f"meta = Metadynamics(system, [proj, ext], {temperature}*kelvin, bias, {height}*kilojoules_per_mole, {hill_freq}, biasDir = '.', saveFrequency = {report_interval})"
)
# Get the integration time step from the protocol.
timestep = self._protocol.getTimeStep().picoseconds().value()
# Set the integrator.
self.addToConfig("\n# Define the integrator.")
self.addToConfig(
f"integrator = LangevinMiddleIntegrator({temperature}*kelvin,"
)
self.addToConfig(" 1/picosecond,")
self.addToConfig(f" {timestep}*picoseconds)")
if self._is_seeded:
self.addToConfig(f"integrator.setRandomNumberSeed({self._seed})")
# Add the platform information.
self._add_config_platform()
# Set up the simulation object.
self.addToConfig("\n# Initialise and configure the simulation object.")
self.addToConfig("simulation = Simulation(prm.topology,")
self.addToConfig(" system,")
self.addToConfig(" integrator,")
self.addToConfig(" platform,")
self.addToConfig(" properties)")
self.addToConfig("simulation.context.setPositions(prm.positions)")
self.addToConfig("if prm.box_vectors is not None:")
self.addToConfig(
" box_vectors = reducePeriodicBoxVectors(prm.box_vectors)"
)
self.addToConfig(
" simulation.context.setPeriodicBoxVectors(*box_vectors)"
)
# Set initial velocities from temperature distribution.
self.addToConfig("\n# Setting initial system velocities.")
self.addToConfig(
f"simulation.context.setVelocitiesToTemperature({temperature})"
)
# Check for a restart file and load the simulation state.
is_restart, step = self._add_config_restart()
# Add the reporters.
self.addToConfig("\n# Add reporters.")
self._add_config_reporters(
state_interval=report_interval,
traj_interval=restart_interval,
is_restart=is_restart,
)
# Create the HILLS file.
self.addToConfig("\n# Create PLUMED compatible HILLS file.")
self.addToConfig("if is_restart:")
self.addToConfig(" file = open('HILLS','a')")
self.addToConfig("else:")
self.addToConfig(" file = open('HILLS','w')")
self.addToConfig(
" file.write('#! FIELDS time pp.proj pp.ext sigma_pp.proj sigma_pp.ext height biasf\\n')"
)
self.addToConfig(" file.write('#! SET multivariate false\\n')")
self.addToConfig(" file.write('#! SET kerneltype gaussian\\n')")
# Get the initial collective variables.
self.addToConfig("\n# Initialise the collective variable array.")
self.addToConfig(
"current_cvs = np.array(list(meta.getCollectiveVariables(simulation)) + [meta.getHillHeight(simulation)])"
)
# Write the initial record.
self.addToConfig("\n# Write the initial collective variable record.")
self.addToConfig("if is_restart:")
self.addToConfig(" if os.path.isfile('COLVAR.npy'):")
self.addToConfig(" colvar_array = np.load('COLVAR.npy')")
self.addToConfig(
" colvar_array = np.append(colvar_array, [current_cvs], axis=0)"
)
self.addToConfig(" else:")
self.addToConfig(" raise IOError('Missing COLVAR file: COLVAR.npy')")
self.addToConfig("else:")
self.addToConfig(" colvar_array = np.array([current_cvs])")
self.addToConfig(" line = colvar_array[0]")
self.addToConfig(" time = 0")
self.addToConfig(
" write_line = f'{time:15} {line[0]:20.16f} {line[1]:20.16f} {sigma_proj} {sigma_ext} {line[2]:20.16f} {bias}\\n'"
)
self.addToConfig(" file.write(write_line)")
# Run the metadynamics simulation.
self.addToConfig("\n# Run the simulation.")
self.addToConfig(f"total_steps = {total_steps}")
self.addToConfig(f"total_cycles = {total_cycles}")
self.addToConfig(f"remaining_steps = {remaining_steps}")
self.addToConfig("steps_per_cycle = math.ceil(total_steps / total_cycles)")
self.addToConfig(
"remaining_cycles = math.ceil(remaining_steps / steps_per_cycle)"
)
self.addToConfig(f"start_cycles = total_cycles - remaining_cycles")
self.addToConfig("checkpoint = 100")
self.addToConfig("if is_restart:")
self.addToConfig(" fraction_complete = step / total_steps")
self.addToConfig(" percent_complete = 100 * fraction_complete")
self.addToConfig(" print('Loaded state from an existing simulation.')")
self.addToConfig(
" print(f'Simulation is {percent_complete}% complete.')"
)
self.addToConfig(" if fraction_complete > 1.0:")
self.addToConfig(
" start_cycles = math.ceil(fraction_complete * total_cycles)"
)
self.addToConfig(" total_cycles = start_cycles + total_cycles")
self.addToConfig("for x in range(start_cycles, total_cycles):")
self.addToConfig(" meta.step(simulation, steps_per_cycle)")
self.addToConfig(
" current_cvs = np.array(list(meta.getCollectiveVariables(simulation)) + [meta.getHillHeight(simulation)])"
)
self.addToConfig(
" colvar_array = np.append(colvar_array, [current_cvs], axis=0)"
)
self.addToConfig(" np.save('COLVAR.npy', colvar_array)")
self.addToConfig(" line = colvar_array[x+1]")
self.addToConfig(f" time = int((x+1) * {timestep}*steps_per_cycle)")
self.addToConfig(
" write_line = f'{time:15} {line[0]:20.16f} {line[1]:20.16f} {sigma_proj} {sigma_ext} {line[2]:20.16f} {bias}\\n'"
)
self.addToConfig(" file.write(write_line)")
self.addToConfig(" # Save the simulation state every 100 picoseconds.")
self.addToConfig(" if time >= checkpoint:")
self.addToConfig(f" simulation.saveState('{self._name}.xml')")
self.addToConfig(" while time >= checkpoint:")
self.addToConfig(" checkpoint += 100")
# Create a dummy PLUMED input file so that we can bind PLUMED
# analysis functions to this process.
self._plumed = _Plumed(str(self._work_dir))
plumed_config, auxillary_files = self._plumed.createConfig(
self._system, self._protocol, self._property_map
)
# Expose the PLUMED specific member functions.
setattr(self, "getFreeEnergy", self._getFreeEnergy)
setattr(self, "getCollectiveVariable", self._getCollectiveVariable)
setattr(self, "sampleConfigurations", self._sampleConfigurations)
setattr(self, "getTime", self._getTime)
else:
raise _IncompatibleError(
"Unsupported protocol: '%s'" % self._protocol.__class__.__name__
)
# Flag that this isn't a custom protocol.
self._protocol._setCustomised(False)
[docs]
def getConfig(self):
"""
Get the list of strings defining the OpenMM Python script.
Returns
-------
config : [str]
The list of configuration strings.
"""
return super().getConfig()
def setConfig(self, config):
"""
Set the list of strings defining the OpenMM Python script.
Parameters
----------
config : str, [str]
The list of configuration strings, or a path to a configuration
file.
"""
return super().setConfig()
[docs]
def addToConfig(self, config):
"""
Add a string to the OpenMM Python script configuration.
Parameters
----------
config : str, [str]
A configuration string, a list of configuration strings, or a
path to a configuration file.
"""
# Call the base class method.
super().addToConfig(config)
[docs]
def resetConfig(self):
"""Reset the OpenMM Python script configuration."""
self._generate_config()
[docs]
def setConfig(self, config):
"""
Set the list of configuration file strings.
Parameters
----------
config : str, [str]
The list of configuration strings, or a path to a configuration
file.
"""
# Call the base class method.
super().setConfig(config)
[docs]
def start(self):
"""
Start the OpenMM process.
Returns
-------
process : :class:`Process.OpenMM <BioSimSpace.Process.OpenMM>`
A handle to the OpenMM process.
"""
# The process is currently queued.
if self.isQueued():
return
# Process is already running.
if self._process is not None:
if self._process.isRunning():
return
# Clear any existing output.
self._clear_output()
# Run the process in the working directory.
with _Utils.cd(self._work_dir):
# Create the arguments string list.
# The name of the Python script (config file) is the first argument.
args = ["%s" % self._config_file]
args.extend(self.getArgStringList())
# Write the command-line process to a README.txt file.
with open("README.txt", "w") as f:
# Set the command-line string.
self._command = (
"%s %s " % (self._exe, self._config_file) + self.getArgString()
)
# Write the command to file.
f.write("# OpenMM was run with the following command:\n")
f.write("%s\n" % self._command)
# Start the timer.
self._timer = _timeit.default_timer()
# Start the simulation.
self._process = _SireBase.Process.run(
self._exe, args, "%s.out" % self._name, "%s.err" % self._name
)
return self
[docs]
def getSystem(self, block="AUTO"):
"""
Get the latest molecular system.
Parameters
----------
block : bool
Whether to block until the process has finished running.
Returns
-------
system : :class:`System <BioSimSpace._SireWrappers.System>`
The latest molecular system.
"""
# Wait for the process to finish.
if block is True:
self.wait()
elif block == "AUTO" and self._is_blocked:
self.wait()
# Try to get the most recent trajectory frame.
try:
# Handle minimisation protocols separately.
if isinstance(self._protocol, _Protocol.Minimisation):
# Do we need to get coordinates for the lambda=1 state.
if "is_lambda1" in self._property_map:
is_lambda1 = True
else:
is_lambda1 = False
traj = self.getTrajectory()
# If there is no trajectory, simply return None.
if traj is None:
return None
# Get the last frame.
new_system = traj.getFrames(-1)[0]
# Make a copy of the existing molecular system.
old_system = self._system.copy()
# Update the coordinates and velocities and return a mapping between
# the molecule indices in the two systems.
sire_system, mapping = _SireIO.updateCoordinatesAndVelocities(
old_system._sire_object,
new_system._sire_object,
self._mapping,
is_lambda1,
self._property_map,
self._property_map,
)
# Update the underlying Sire object.
old_system._sire_object = sire_system
# Store the mapping between the MolIdx in both systems so we don't
# need to recompute it next time.
self._mapping = mapping
# Update the box information in the original system.
if "space" in new_system._sire_object.propertyKeys():
box = new_system._sire_object.property("space")
if box.isPeriodic():
old_system._sire_object.setProperty(
self._property_map.get("space", "space"), box
)
return old_system
else:
# Work out the total number of trajectory frames.
num_frames = int(
(self._protocol.getRunTime() / self._protocol.getTimeStep())
/ self._protocol.getRestartInterval()
)
# Work out the fraction of the simulation that has been completed.
frac_complete = self._protocol.getRunTime() / self.getTime()
# Make sure the fraction doesn't exceed one. OpenMM can report
# time values that are larger than the number of integration steps
# multiplied by the time step.
if frac_complete > 1:
frac_complete = 1
# Work out the trajectory frame index, rounding down.
# Remember that frames in MDTraj are zero indexed, like Python.
index = int(frac_complete * num_frames)
if index > 0:
index -= 1
# Return the most recent frame.
return self.getFrame(index)
except:
return None
[docs]
def getCurrentSystem(self):
"""
Get the latest molecular system.
Returns
-------
system : :class:`System <BioSimSpace._SireWrappers.System>`
The latest molecular system.
"""
return self.getSystem(block=False)
[docs]
def getTrajectory(self, backend="AUTO", block="AUTO"):
"""
Return a trajectory object.
Parameters
----------
backend : str
The backend to use for trajectory parsing. To see supported backends,
run BioSimSpace.Trajectory.backends(). Using "AUTO" will try each in
sequence.
block : bool
Whether to block until the process has finished running.
Returns
-------
trajectory : :class:`System <BioSimSpace.Trajectory.Trajectory>`
The latest trajectory object.
"""
if not isinstance(backend, str):
raise TypeError("'backend' must be of type 'str'")
if not isinstance(block, (bool, str)):
raise TypeError("'block' must be of type 'bool' or 'str'")
# Wait for the process to finish.
if block is True:
self.wait()
elif block == "AUTO" and self._is_blocked:
self.wait()
# Warn the user if the process has exited with an error.
if self.isError():
_warnings.warn("The process exited with an error!")
if not _os.path.isfile(self._traj_file):
return None
else:
return _Trajectory.Trajectory(process=self, backend=backend)
[docs]
def getFrame(self, index):
"""
Return a specific trajectory frame.
Parameters
----------
index : int
The index of the frame.
Returns
-------
frame : :class:`System <BioSimSpace._SireWrappers.System>`
The System object of the corresponding frame.
"""
if not type(index) is int:
raise TypeError("'index' must be of type 'int'")
max_index = (
int(
(self._protocol.getRunTime() / self._protocol.getTimeStep())
/ self._protocol.getRestartInterval()
)
- 1
)
if index < 0 or index > max_index:
raise ValueError(f"'index' must be in range [0, {max_index}].")
# Warn the user if the process has exited with an error.
if self.isError():
_warnings.warn("The process exited with an error!")
try:
# Do we need to get coordinates for the lambda=1 state.
if "is_lambda1" in self._property_map:
is_lambda1 = True
else:
is_lambda1 = False
# Create a copy of the existing system object.
old_system = self._system.copy()
# Get the latest trajectory frame.
new_system = _Trajectory.getFrame(self._traj_file, self._top_file, index)
# Update the coordinates and velocities and return a mapping between
# the molecule indices in the two systems.
sire_system, mapping = _SireIO.updateCoordinatesAndVelocities(
old_system._sire_object,
new_system._sire_object,
self._mapping,
is_lambda1,
self._property_map,
self._property_map,
)
# Update the underlying Sire object.
old_system._sire_object = sire_system
# Store the mapping between the MolIdx in both systems so we don't
# need to recompute it next time.
self._mapping = mapping
# Update the box information in the original system.
if "space" in new_system._sire_object.propertyKeys():
box = new_system._sire_object.property("space")
old_system._sire_object.setProperty(
self._property_map.get("space", "space"), box
)
return old_system
except:
return None
[docs]
def getRecord(self, record, time_series=False, unit=None, block="AUTO"):
"""
Get a record from the stdout dictionary.
Parameters
----------
record : str
The record key.
time_series : bool
Whether to return a list of time series records.
unit : :class:`Unit <BioSimSpace.Units>`
The unit to convert the record to.
block : bool
Whether to block until the process has finished running.
Returns
-------
record : :class:`Type <BioSimSpace.Types>`
The matching record.
"""
# Wait for the process to finish.
if block is True:
self.wait()
elif block == "AUTO" and self._is_blocked:
self.wait()
# Warn the user if the process has exited with an error.
if self.isError():
_warnings.warn("The process exited with an error!")
self._update_stdout_dict()
return self._get_stdout_record(record, time_series, unit)
[docs]
def getCurrentRecord(self, record, time_series=False, unit=None):
"""
Get a current record from the stdout dictionary.
Parameters
----------
record : str
The record key.
time_series : bool
Whether to return a list of time series records.
unit : :class:`Unit <BioSimSpace.Units>`
The unit to convert the record to.
Returns
-------
record : :class:`Type <BioSimSpace.Types>`
The matching record.
"""
# Warn the user if the process has exited with an error.
if self.isError():
_warnings.warn("The process exited with an error!")
self._update_stdout_dict()
return self._get_stdout_record(record, time_series, unit)
[docs]
def getRecords(self, block="AUTO"):
"""
Return the dictionary of stdout time-series records.
Parameters
----------
block : bool
Whether to block until the process has finished running.
Returns
-------
records : :class:`MultiDict <BioSimSpace.Process._process._MultiDict>`
The dictionary of time-series records.
"""
# Wait for the process to finish.
if block is True:
self.wait()
elif block == "AUTO" and self._is_blocked:
self.wait()
# Warn the user if the process has exited with an error.
if self.isError():
_warnings.warn("The process exited with an error!")
self._update_stdout_dict()
return self._stdout_dict.copy()
[docs]
def getCurrentRecords(self):
"""
Return the current dictionary of stdout time-series records.
Parameters
----------
block : bool
Whether to block until the process has finished running.
Returns
-------
records : :class:`MultiDict <BioSimSpace.Process._process._MultiDict>`
The dictionary of time-series records.
"""
return self.getRecords(block=False)
[docs]
def getTime(self, time_series=False, block="AUTO"):
"""
Get the simulation time.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
time : :class:`Time <BioSimSpace.Types.Time>`
The current simulation time in nanoseconds.
"""
return self.getRecord("TIME(PS)", time_series, _Units.Time.picosecond, block)
[docs]
def getCurrentTime(self, time_series=False):
"""
Get the current simulation time.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
time : :class:`Time <BioSimSpace.Types.Time>`
The current simulation time in nanoseconds.
"""
return self.getTime(time_series, block=False)
[docs]
def getStep(self, time_series=False, block="AUTO"):
"""
Get the number of integration steps.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
step : int
The current number of integration steps.
"""
return self.getRecord("STEP", time_series, None, block)
[docs]
def getCurrentStep(self, time_series=False):
"""
Get the current number of integration steps.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
Returns
-------
step : int
The current number of integration steps.
"""
return self.getStep(time_series, block=False)
[docs]
def getPotentialEnergy(self, time_series=False, block="AUTO"):
"""
Get the potential energy.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
energy : :class:`Energy <BioSimSpace.Types.Energy>`
The potential energy.
"""
return self.getRecord(
"POTENTIALENERGY(KJ/MOLE)", time_series, _Units.Energy.kj_per_mol, block
)
[docs]
def getCurrentPotentialEnergy(self, time_series=False):
"""
Get the current potential energy.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
Returns
-------
energy : :class:`Energy <BioSimSpace.Types.Energy>`
The potential energy.
"""
return self.getPotentialEnergy(time_series, block=False)
[docs]
def getKineticEnergy(self, time_series=False, block="AUTO"):
"""
Get the kinetic energy.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
energy : :class:`Energy <BioSimSpace.Types.Energy>`
The kinetic energy.
"""
return self.getRecord(
"KINETICENERGY(KJ/MOLE)", time_series, _Units.Energy.kj_per_mol, block
)
[docs]
def getCurrentKineticEnergy(self, time_series=False):
"""
Get the current kinetic energy.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
Returns
-------
energy : :class:`Energy <BioSimSpace.Types.Energy>`
The kinetic energy.
"""
return self.getKineticEnergy(time_series, block=False)
[docs]
def getTotalEnergy(self, time_series=False, block="AUTO"):
"""
Get the total energy.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
energy : :class:`Energy <BioSimSpace.Types.Energy>`
The total energy.
"""
return self.getRecord(
"TOTALENERGY(KJ/MOLE)", time_series, _Units.Energy.kj_per_mol, block
)
[docs]
def getCurrentTotalEnergy(self, time_series=False):
"""
Get the current total energy.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
Returns
-------
energy : :class:`Energy <BioSimSpace.Types.Energy>`
The total energy.
"""
return self.getTotalEnergy(time_series, block=False)
[docs]
def getTemperature(self, time_series=False, block="AUTO"):
"""
Get the temperature.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
temperature : :class:`Temperature <BioSimSpace.Types.Temperature>`
The temperature.
"""
return self.getRecord(
"TEMPERATURE(K)", time_series, _Units.Temperature.kelvin, block
)
[docs]
def getCurrentTemperature(self, time_series=False):
"""
Get the current temperature.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
Returns
-------
temperature : :class:`Temperature <BioSimSpace.Types.Temperature>`
The current temperature.
"""
return self.getTemperature(time_series, block=False)
[docs]
def getVolume(self, time_series=False, block="AUTO"):
"""
Get the volume.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
block : bool
Whether to block until the process has finished running.
Returns
-------
volume : :class:`Volume <BioSimSpace.Types.Volume>`
The volume.
"""
return self.getRecord(
"BOXVOLUME(NM^3)", time_series, _Units.Volume.nanometer3, block
)
[docs]
def getCurrentVolume(self, time_series=False):
"""
Get the current volume.
Parameters
----------
time_series : bool
Whether to return a list of time series records.
Returns
-------
volume : :class:`Volume <BioSimSpace.Types.Volume>`
The volume.
"""
return self.getVolume(time_series, block=False)
[docs]
def stdout(self, n=10):
"""
Print the last n lines of the stdout buffer.
Parameters
----------
n : int
The number of lines to print.
"""
# Note that thermodynamic records, e.g. energy, pressure, temperature,
# are redirected to a log file.
# Ensure that the number of lines is positive.
if n < 0:
raise ValueError("The number of lines must be positive!")
# Append any new lines to the stdout list.
for line in _pygtail.Pygtail(self._stdout_file):
self._stdout.append(line.rstrip())
# Get the current number of lines.
num_lines = len(self._stdout)
# Set the line from which to start printing.
if num_lines < n:
start = 0
else:
start = num_lines - n
# Print the lines.
for x in range(start, num_lines):
print(self._stdout[x])
def _add_config_imports(self):
"""
Helper function to write the header (import statements) to the
OpenMM Python script (config file).
"""
# We should verify that openmm is available to prevent
# difficult-to-debug errors in the run script
from BioSimSpace._Utils import _try_import, _assert_imported
_openmm = _try_import("openmm")
_assert_imported(_openmm)
self.addToConfig("from openmm import *")
self.addToConfig("from openmm.app import *")
self.addToConfig("from openmm.unit import *")
self.addToConfig(
"from openmm.app.internal.unitcell import reducePeriodicBoxVectors"
)
self.addToConfig("import parmed")
def _add_config_platform(self):
"""
Helper function to add platform information to the OpenMM
Python script.
"""
# Set the simulation platform.
self.addToConfig("\n# Set the simulation platform.")
self.addToConfig(f"platform = Platform.getPlatformByName('{self._platform}')")
if self._platform == "CPU":
self.addToConfig("properties = {}")
elif self._platform == "CUDA":
cuda_devices = _os.environ.get("CUDA_VISIBLE_DEVICES")
if cuda_devices is None:
cuda_devices = "0"
_warnings.warn(
"'CUDA' platform selected but 'CUDA_VISIBLE_DEVICES' "
"environment variable is unset. Defaulting to '0'."
)
self.addToConfig("properties = {'CudaDeviceIndex': '%s'}" % cuda_devices)
elif self._platform == "OPENCL":
opencl_devices = _os.environ.get("OPENCL_VISIBLE_DEVICES")
if opencl_devices is None:
opencl_devices = "0"
_warnings.warn(
"'OpenCL' platform selected but 'OPENCL_VISIBLE_DEVICES' "
"environment variable is unset. Defaulting to '0'."
)
self.addToConfig(
"properties = {'OpenCLDeviceIndex': '%s'}" % opencl_devices
)
def _add_config_restart(self):
"""Helper function to check for a restart file and load state information."""
self.addToConfig("\n# Check for a restart file.")
self.addToConfig(f"if os.path.isfile('{self._name}.xml'):")
self.addToConfig(" is_restart = True")
self.addToConfig(f" simulation.loadState('{self._name}.xml')")
self.addToConfig(f" if not os.path.isfile('{self._name}.log'):")
self.addToConfig(f" raise IOError('Missing log file: {self._name}.log')")
self.addToConfig(f" with open('{self._name}.log', 'r') as f:")
self.addToConfig(" lines = f.readlines()")
self.addToConfig(" last_line = lines[-1].split()")
self.addToConfig(" try:")
self.addToConfig(" step = int(last_line[0])")
self.addToConfig(" except:")
self.addToConfig(
f" raise IOError('Failed to read current integration step from {self._name}.log')"
)
self.addToConfig(" simulation.currentStep = step")
self.addToConfig("else:")
self.addToConfig(" is_restart = False")
if _os.path.isfile(_os.path.join(str(self._work_dir), f"{self._name}.xml")):
with open(
_os.path.join(str(self._work_dir), f"{self._name}.log"), "r"
) as f:
lines = f.readlines()
last_line = lines[-1].split()
step = int(last_line[0])
return True, step
else:
return False, 0
def _add_config_monkey_patches(self):
"""
Helper function to write any monkey-patches to the OpenMM Python
script (config file).
"""
# We monkey-patch the OpenMM DCDFile.writeModel method to avoid writing the
# positions of any dummy atoms that are used as restraints to trajectory files.
# This avoids the need to delete the dummies from the molecular system on read,
# allowing us to make use of System._updateCoordinates which requires that the
# number of atoms are consistent between systems. (Deleting the dummies from the
# system is slower than not writing them in the first place.)
self.addToConfig(
"\n# Monkey-patch the DCD.writeModel method to avoid writing dummy-atom positions."
)
# Store the original writeModel method.
self.addToConfig("writeModel = DCDFile.writeModel")
# Create a monkey-patch where we slice the positions list to match the
# number of atoms in the topology, then pass this through to the original
# writeModel method.
self.addToConfig(
"def writeModelPatched(self, positions, unitCellDimensions=None, periodicBoxVectors=None):"
)
self.addToConfig(
" positions = positions[:len(list(self._topology.atoms()))]"
)
self.addToConfig(" writeModel(self,")
self.addToConfig(" positions,")
self.addToConfig(" unitCellDimensions=unitCellDimensions,")
self.addToConfig(" periodicBoxVectors=periodicBoxVectors)")
# Replace the writeModel method with the monkey-patch.
self.addToConfig("DCDFile.writeModel = writeModelPatched")
def _add_config_reporters(
self, state_interval=100, traj_interval=500, is_restart=False
):
"""
Helper function to write the reporter (output statements) section
to the OpenMM Python script (config file).
Parameters
----------
state_interval : int
The frequency at which to write state information in
integration steps.
traj_interval : int
The frequency at which to write trajectory frames in
integration steps.
is_restart : bool
Whether the simulation is a restart.
"""
if not type(state_interval) is int:
raise TypeError("'state_interval' must be of type 'int'.")
if state_interval <= 0:
raise ValueError("'state_interval' must be a positive integer.")
if not type(traj_interval) is int:
raise TypeError("'traj_interval' must be of type 'int'.")
if traj_interval <= 0:
raise ValueError("'traj_interval' must be a positive integer.")
if not isinstance(is_restart, bool):
raise TypeError("'is_restart' must be of type 'bool'.")
# Append to a trajectory file every 500 steps.
self.addToConfig(
f"simulation.reporters.append(DCDReporter('{self._name}.dcd', {traj_interval}, append={is_restart}))"
)
# Disable specific state information for minimisation protocols.
if isinstance(self._protocol, _Protocol.Minimisation):
is_step = False
is_time = False
is_temperature = False
else:
is_step = True
is_time = True
is_temperature = True
# Work out the total number of steps.
if isinstance(self._protocol, _Protocol.Minimisation):
total_steps = 1
else:
total_steps = _math.ceil(
self._protocol.getRunTime() / self._protocol.getTimeStep()
)
# Write state information to file every 100 steps.
self.addToConfig(f"log_file = open('{self._name}.log', 'a')")
self.addToConfig(f"simulation.reporters.append(StateDataReporter(log_file,")
self.addToConfig(
f" {state_interval},"
)
self.addToConfig(
f" step={is_step},"
)
self.addToConfig(
f" time={is_time},"
)
self.addToConfig(
" potentialEnergy=True,"
)
self.addToConfig(
" kineticEnergy=True,"
)
self.addToConfig(
" totalEnergy=True,"
)
self.addToConfig(
f" temperature={is_temperature},"
)
self.addToConfig(" volume=True,")
self.addToConfig(
f" totalSteps={total_steps},"
)
self.addToConfig(" speed=True,")
self.addToConfig(
" remainingTime=True,"
)
self.addToConfig(
" separator=' '))"
)
def _add_config_restraints(self):
"""Helper function to add position restraints to the OpenMM script (config file)."""
# Add position restraints. This uses the approach from:
# https://github.com/openmm/openmm/issues/2262#issuecomment-464157489
# Here zero-mass dummy atoms are bonded to the restrained atoms to avoid
# issues with position rescaling during barostat updates.
restraint = self._protocol.getRestraint()
if restraint is not None:
# Search for the atoms to restrain by keyword.
if isinstance(restraint, str):
restrained_atoms = self._reference_system.getRestraintAtoms(restraint)
# Use the user-defined list of indices.
else:
restrained_atoms = restraint
self.addToConfig(
f"ref_prm = parmed.load_file('{self._name}.prm7', '{self._name}_ref.rst7')"
)
# Get the force constant in units of kJ_per_mol/nanometer**2
force_constant = self._protocol.getForceConstant()._sire_unit
force_constant = force_constant.to(
_SireUnits.kJ_per_mol / _SireUnits.nanometer2
)
self.addToConfig(
"\n# Restrain the position of atoms using zero-mass dummy atoms."
)
self.addToConfig("restraint = HarmonicBondForce()")
self.addToConfig("restraint.setUsesPeriodicBoundaryConditions(True)")
self.addToConfig("system.addForce(restraint)")
self.addToConfig(
"nonbonded = [f for f in system.getForces() if isinstance(f, NonbondedForce)][0]"
)
self.addToConfig("dummy_indices = []")
self.addToConfig("positions = ref_prm.positions")
self.addToConfig(f"restrained_atoms = {restrained_atoms}")
self.addToConfig("for i in restrained_atoms:")
self.addToConfig(" j = system.addParticle(0)")
self.addToConfig(" nonbonded.addParticle(0, 1, 0)")
self.addToConfig(" nonbonded.addException(i, j, 0, 1, 0)")
self.addToConfig(
f" restraint.addBond(i, j, 0*nanometers, {force_constant}*kilojoules_per_mole/nanometer**2)"
)
self.addToConfig(" dummy_indices.append(j)")
self.addToConfig(" positions.append(positions[i])")
def _update_stdout_dict(self):
"""Update the dictionary of thermodynamic records."""
# Exit if log file hasn't been created.
if not _os.path.isfile(self._log_file):
return
# A list of the new record lines.
lines = []
# Append any new lines.
for line in _pygtail.Pygtail(self._log_file):
lines.append(line)
# Append any new records to the stdout dictionary.
for line in lines:
# Strip leading/trailing whitespace.
line = line.strip()
# This is the header record.
if line[0] == "#":
if self._header is None:
# Store the header.
self._header = line
else:
# If this is a restart, make sure the information in the file
# is consistent.
if line != self._header:
raise _IncompatibleError(
"Mismatch in the log file header.! "
f"Original header: '{self._header}', "
f"Current header: '{line}'"
)
# Work out what records are in the file and the separator
# that is used. While we use a standard format, this makes
# sure that we can still parse the log file if the user
# happens to have changed the formatting.
# A tally counter for the number of quotes that we've seen
# in the line so far.
num_quotes = 0
# Initialise the separator.
sep = ""
# Loop over the characters in the line.
for c in line:
# Increment the number of quotes.
if c == '"':
num_quotes += 1
# This is the second quote we've seen, start adding
# characters to the separator.
if num_quotes == 2:
sep += c
# Break when we've reached the next quote.
elif num_quotes == 3:
break
# The separator includes a leading " character, so delete it.
self._record_separator = sep[1:]
# Now split the line on the separator to work out the records.
# We ignore the first character since it is a comment.
# Here we use the full separator, i.e. including the both quotes,
# so that we can correctly split record names with spaces in them.
records = line[1:].split(sep + '"')
# Store the number of records.
num_records = len(records)
# Map each record string to its position in the array (column order).
for idx, record in enumerate(records):
# Strip the extra quotes from the record.
if idx == 0:
record = record[1:]
elif idx == num_records - 1:
record = record[:-1]
# Map the index to the record. Store records in upper
# case without whitespace to help catch typos from
# the user.
self._record_mapping[idx] = record.upper().replace(" ", "")
# Extract the records and add them to the dictionary.
else:
# Split the line on the separator.
records = line.split(self._record_separator)
# Add each record to the appropriate key in the MultiDict.
for idx, record in enumerate(records):
# Get the key for this record.
key = self._record_mapping[idx]
# Update the record dictionary for this key.
self._stdout_dict[key] = record
def _get_stdout_record(self, key, time_series=False, unit=None):
"""
Helper function to get a stdout record from the dictionary.
Parameters
----------
key : str
The record key.
time_series : bool
Whether to return a time series of records.
unit : BioSimSpace.Types._type.Type
The unit to convert the record to.
Returns
-------
record :
The matching stdout record.
"""
# No data!
if len(self._stdout_dict) == 0:
return None
if not isinstance(time_series, bool):
_warnings.warn("Non-boolean time-series flag. Defaulting to False!")
time_series = False
# Validate the unit.
if unit is not None:
if not isinstance(unit, _Type):
raise TypeError("'unit' must be of type 'BioSimSpace.Types'")
# Return the list of dictionary values.
if time_series:
try:
if unit is None:
return [float(x) for x in self._stdout_dict[key]]
else:
return [
(float(x) * unit)._to_default_unit()
for x in self._stdout_dict[key]
]
except KeyError:
return None
# Return the most recent dictionary value.
else:
try:
if unit is None:
return float(self._stdout_dict[key][-1])
else:
return (float(self._stdout_dict[key][-1]) * unit)._to_default_unit()
except KeyError:
return None