######################################################################
# BioSimSpace: Making biomolecular simulation a breeze!
#
# Copyright: 2017-2024
#
# Authors: Lester Hedges <lester.hedges@gmail.com>
#
# BioSimSpace is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# BioSimSpace is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with BioSimSpace. If not, see <http://www.gnu.org/licenses/>.
#####################################################################
"""Functionality for relative free-energy simulations."""
__author__ = "Lester Hedges"
__email__ = "lester.hedges@gmail.com"
__all__ = ["Relative", "getData"]
import copy as _copy
import json as _json
import math as _math
import numpy as _np
import os as _os
import pandas as _pd
import pathlib as _pathlib
import pyarrow.parquet as _pq
import re as _re
import shutil as _shutil
import subprocess as _subprocess
import sys as _sys
import warnings as _warnings
import zipfile as _zipfile
from .._Utils import _assert_imported, _have_imported, _try_import
# alchemlyb isn't available for all variants of Python that we support, so we
# need to try_import it.
_alchemlyb = _try_import("alchemlyb")
if _have_imported(_alchemlyb):
import logging as _logging
# Silence pymbar warnings on startup.
_logger = _logging.getLogger("pymbar")
_logger.setLevel(_logging.ERROR)
# Handle alchemlyb MBAR API changes.
try:
from alchemlyb.estimators import AutoMBAR as _AutoMBAR
except ImportError:
from alchemlyb.estimators import MBAR as _AutoMBAR
from alchemlyb.estimators import TI as _TI
from alchemlyb.postprocessors.units import to_kcalmol as _to_kcalmol
from alchemlyb.parsing.amber import extract_dHdl as _amber_extract_dHdl
from alchemlyb.parsing.amber import extract_u_nk as _amber_extract_u_nk
from alchemlyb.parsing.gmx import extract_dHdl as _gmx_extract_dHdl
from alchemlyb.parsing.gmx import extract_u_nk as _gmx_extract_u_nk
from alchemlyb.preprocessing.subsampling import (
equilibrium_detection as _equilibrium_detection,
)
from alchemlyb.preprocessing.subsampling import (
statistical_inefficiency as _statistical_inefficiency,
)
from alchemlyb.preprocessing.subsampling import slicing as _slicing
from alchemlyb.preprocessing.subsampling import decorrelate_u_nk, decorrelate_dhdl
from alchemlyb.postprocessors.units import to_kcalmol as _to_kcalmol
from alchemlyb.postprocessors.units import kJ2kcal as _kJ2kcal
from alchemlyb.postprocessors.units import R_kJmol as _R_kJmol
from sire.legacy.Base import getBinDir as _getBinDir
from sire.legacy.Base import getShareDir as _getShareDir
from sire.legacy import IO as _SireIO
from sire.legacy import Mol as _SireMol
from .. import _gmx_exe
from .. import _is_notebook
from .. import _isVerbose
from .._Exceptions import AnalysisError as _AnalysisError
from .._Exceptions import MissingSoftwareError as _MissingSoftwareError
from .._SireWrappers import Molecules as _Molecules
from .._SireWrappers import System as _System
from .._Utils import cd as _cd
from .. import Process as _Process
from .. import Protocol as _Protocol
from .. import Types as _Types
from .. import Units as _Units
from .. import _Utils
if _is_notebook:
from IPython.display import FileLink as _FileLink
# Check that the analyse_freenrg script exists.
if _sys.platform != "win32":
_analyse_freenrg = _os.path.join(_getBinDir(), "analyse_freenrg")
else:
_analyse_freenrg = _os.path.join(
_os.path.normpath(_getShareDir()), "scripts", "analyse_freenrg.py"
)
if not _os.path.isfile(_analyse_freenrg):
raise _MissingSoftwareError(
"Cannot find free energy analysis script in expected location: '%s'"
% _analyse_freenrg
)
if _sys.platform == "win32":
_analyse_freenrg = "%s %s" % (
_os.path.join(_os.path.normpath(_getBinDir()), "sire_python.exe"),
_analyse_freenrg,
)
[docs]
class Relative:
"""Class for configuring and running relative free-energy perturbation simulations."""
# Create a list of supported molecular dynamics engines. (For running simulations.)
_engines = ["AMBER", "GROMACS", "SOMD"]
# Create a list of supported molecular dynamics engines. (For analysis.)
_engines_analysis = ["AMBER", "GROMACS", "SOMD", "SOMD2"]
[docs]
def __init__(
self,
system,
protocol=None,
work_dir=None,
engine=None,
setup_only=False,
property_map={},
**kwargs,
):
"""
Constructor.
Parameters
----------
system : :class:`System <BioSimSpace._SireWrappers.System>`
The molecular system for the perturbation leg. This must contain
a single perturbable molecule and is assumed to have already
been equilibrated.
protocol : :class:`Protocol.FreeEnergyMinimisation <BioSimSpace.Protocol.FreeEnergyMinimisation>`, \
:class:`Protocol.FreeEnergyEquilibration <BioSimSpace.Protocol.FreeEnergyEquilibration>`, \
:class:`Protocol.FreeEnergyProduction <BioSimSpace.Protocol.FreeEnergyProduction>`
The simulation protocol.
reference_system : :class:`System <BioSimSpace._SireWrappers.System>`
A reference system to use for position restraints.
work_dir : str
The working directory for the free-energy perturbation
simulation.
engine : str
The molecular dynamics engine used to run the simulation. Available
options are "GROMACS", or "SOMD". If this argument is omitted then
BioSimSpace will choose an appropriate engine for you.
setup_only : bool
Whether to only support simulation setup. If True, then no
simulation processes objects will be created, only the directory
hierarchy and input files to run a simulation externally. This
can be useful when you don't intend to use BioSimSpace to run
the simulation. Note that a 'work_dir' must also be specified.
property_map : dict
A dictionary that maps system "properties" to their user defined
values. This allows the user to refer to properties with their
own naming scheme, e.g. { "charge" : "my-charge" }
kwargs : dict
Additional keyword arguments to pass to the underlying Process
objects.
"""
# Validate the input.
if not isinstance(system, _System):
raise TypeError(
"'system' must be of type 'BioSimSpace._SireWrappers.System'"
)
else:
# Store a copy of the system.
self._system = system.copy()
# Validate the user specified molecular dynamics engine.
if engine is not None:
if not isinstance(engine, str):
raise TypeError("'engine' must be of type 'str'.")
# Strip whitespace from engine and convert to upper case.
engine = engine.replace(" ", "").upper()
# Check that the engine is supported.
if engine not in self._engines:
raise ValueError(
f"Unsupported molecular dynamics engine {engine}. "
"Supported engines are: %r." % ", ".join(self._engines)
)
# The system must have a perturbable molecule.
if system.nPerturbableMolecules() == 0:
raise ValueError(
"The system must contain a perturbable molecule! "
"Use the 'BioSimSpace.Align' package to map and merge molecules."
)
else:
# Use SOMD as a default.
engine = "SOMD"
# The system must have a single perturbable molecule.
if system.nPerturbableMolecules() != 1:
raise ValueError(
"The system must contain a single perturbable molecule! "
"Use the 'BioSimSpace.Align' package to map and merge molecules."
)
# Set the engine.
self._engine = engine
# Validate the protocol.
if protocol is not None:
from ..Protocol._free_energy_mixin import _FreeEnergyMixin
if isinstance(protocol, _FreeEnergyMixin):
if engine == "SOMD" and not isinstance(
protocol, _Protocol.FreeEnergyProduction
):
raise ValueError(
"Currently SOMD only supports protocols of type 'BioSimSpace.Protocol.FreeEnergyProduction'"
)
self._protocol = protocol
else:
raise TypeError(
"'protocol' must be of type 'BioSimSpace.Protocol.FreeEnergy'"
)
else:
# Use a default protocol.
self._protocol = _Protocol.FreeEnergy()
# Check that multi-step perturbation isn't specified if GROMACS is the chosen engine.
if engine == "GROMACS":
if self._protocol.getPerturbationType() != "full":
raise NotImplementedError(
"GROMACS currently only supports the 'full' perturbation "
"type. Please use engine='SOMD' when running multistep "
"perturbation types."
)
if not isinstance(setup_only, bool):
raise TypeError("'setup_only' must be of type 'bool'.")
else:
self._setup_only = setup_only
if work_dir is None and setup_only:
raise ValueError(
"A 'work_dir' must be specified when 'setup_only' is True!"
)
# Create the working directory.
self._work_dir = _Utils.WorkDir(work_dir)
# Check that the map is valid.
if not isinstance(property_map, dict):
raise TypeError("'property_map' must be of type 'dict'")
self._property_map = property_map
# Validate the kwargs.
if not isinstance(kwargs, dict):
raise TypeError("'kwargs' must be of type 'dict'.")
self._kwargs = kwargs
# Create fake instance methods for 'analyse', 'checkOverlap',
# and 'difference'. These pass instance data through to the
# staticmethod versions.
self.analyse = self._analyse
self.checkOverlap = self._checkOverlap
self.difference = self._difference
# Initialise the process runner.
self._initialise_runner(self._system)
[docs]
def run(self, serial=True):
"""
Run the simulation.
Parameters
----------
serial : bool
Whether to run the individual processes for the lambda windows
in serial.
"""
if not isinstance(serial, bool):
raise TypeError("'serial' must be of type 'bool'.")
if self._setup_only:
_warnings.warn("No processes exist! Object created in 'setup_only' mode.")
else:
self._runner.startAll(serial=serial)
[docs]
def wait(self):
"""Wait for the simulation to finish."""
if self._setup_only:
_warnings.warn("No processes exist! Object created in 'setup_only' mode.")
else:
self._runner.wait()
[docs]
def kill(self, index):
"""
Kill a process for a specific lambda window.
Parameters
----------
index : int
The index of the lambda window.
"""
self._runner.kill(index)
[docs]
def killAll(self):
"""Kill any running processes for all lambda windows."""
self._runner.killAll()
[docs]
def workDir(self):
"""
Return the working directory.
Returns
-------
work_dir : str
The path of the working directory.
"""
return str(self._work_dir)
[docs]
def getData(self, name="data", file_link=False, work_dir=None):
"""
Return a link to a zip file containing the data files required for
post-simulation analysis.
Parameters
----------
name : str
The name of the zip file.
file_link : bool
Whether to return a FileLink when working in Jupyter.
work_dir : str
The working directory for the free-energy perturbation
simulation.
Returns
-------
output : str, IPython.display.FileLink
A path, or file link, to an archive of the process input.
"""
if self._work_dir is None:
raise ValueError("'work_dir' must be set!")
else:
if not isinstance(work_dir, str):
raise TypeError("'work_dir' must be of type 'str'.")
if not _os.path.isdir(work_dir):
raise ValueError("'work_dir' doesn't exist!")
if not isinstance(name, str):
raise TypeError("'name' must be of type 'str'")
# Generate the zip file name.
zipname = "%s.zip" % name
# Get the current working directory.
cwd = _os.getcwd()
# Change into the working directory.
with _cd(work_dir):
# Specify the path to glob.
glob_path = _pathlib.Path(work_dir)
# First try SOMD data.
files = glob_path.glob("**/gradients.dat")
if len(files) == 0:
files = glob_path.glob("**/[!bar]*.xvg")
if len(files) == 0:
raise ValueError(
f"Couldn't find any analysis files in '{work_dir}'"
)
# Write to the zip file.
with _zipfile.Zipfile(_os.join(cwd, zipname), "w") as zip:
for file in files:
zip.write(file)
# Return a link to the archive.
if _is_notebook:
if file_link:
# Create a FileLink to the archive.
f_link = _FileLink(zipname)
# Set the download attribute so that JupyterLab doesn't try to open the file.
f_link.html_link_str = (
f"<a href='%s' target='_blank' download='{zipname}'>%s</a>"
)
# Return a link to the archive.
return f_link
else:
return zipname
# Return the path to the archive.
else:
return zipname
[docs]
@staticmethod
def analyse(work_dir, estimator="MBAR", method="alchemlyb", **kwargs):
"""
Analyse existing free-energy data from a simulation working directory.
Parameters
----------
work_dir : str
The working directory for the simulation.
estimator : str
The estimator to use for the free-energy analysis. ("MBAR" or "TI")
method : str
The method to use for the free-energy analysis. ("alchemlyb" or "native")
Returns
-------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF). The data is a list of tuples,
where each tuple contains the lambda value, the PMF, and the
standard error.
overlap : np.matrix, None
The overlap matrix. This gives the overlap between each lambda
window. This parameter is only computed when available for the
specified estimator and engine, otherwise None will be returned.
"""
if not isinstance(work_dir, str):
raise TypeError("'work_dir' must be of type 'str'.")
if not _os.path.isdir(work_dir):
raise ValueError("'work_dir' doesn't exist!")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
if not isinstance(method, str):
raise TypeError("'method' must be of type 'str'.")
method = method.replace("-", "").upper()
if method == "ALCHEMLYB":
_assert_imported("alchemlyb")
function_glob_dict = {
"SOMD": (Relative._analyse_somd, "**/simfile.dat"),
"SOMD2": (Relative._analyse_somd2, "**/*.parquet"),
"GROMACS": (Relative._analyse_gromacs, "**/[!bar]*.xvg"),
"AMBER": (Relative._analyse_amber, "**/*.out"),
}
for engine, (func, mask) in function_glob_dict.items():
glob_path = _pathlib.Path(work_dir)
data = sorted(glob_path.glob(mask))
if data and engine == "AMBER":
if method != "ALCHEMLYB":
raise _AnalysisError(
f"AMBER can only use the 'alchemlyb' analysis method."
)
if data and engine == "SOMD" and estimator == "TI" and method == "native":
raise _AnalysisError(
"SOMD cannot use 'TI' estimator with 'native' analysis method."
)
if data and engine == "SOMD2":
if method != "ALCHEMLYB":
raise _AnalysisError(
f"SOMD2 can only use the 'alchemlyb' analysis method."
)
if data and engine == "GROMACS" and method == "native":
_warnings.warn(
"Native GROMACS analysis cannot do MBAR/TI. BAR will be used."
)
if data:
return func(work_dir, estimator, method, **kwargs)
raise ValueError(
"Couldn't find any SOMD, SOMD2, GROMACS or AMBER free-energy output?"
)
[docs]
@staticmethod
def checkOverlap(overlap, threshold=0.03):
"""
Check the overlap of an FEP leg.
Parameters
----------
overlap : numpy.matrix
The overlap matrix. This gives the overlap between lambda windows.
threshold : float
The threshold value used to check the off-diagonals.
Returns
-------
is_okay : boolean
True if the overlap is okay, False if any off-diagonals are less
than the threshold value.
num_low : int
The number of off-diagonals that are less than the threshold value.
"""
if not isinstance(overlap, _np.matrix):
raise TypeError("'overlap' must be of type 'numpy.matrix'.")
if not isinstance(threshold, float):
raise TypeError("'threshold' must be of type 'float'.")
if threshold < 0.0 or threshold > 1.0:
raise ValueError("'threshold' must be between 0 and 1.")
# Get all off diagonal elements.
off_diagonal = (_np.diagonal(overlap, 1)).tolist()
for x in (_np.diagonal(overlap, -1)).tolist():
off_diagonal.append(x)
# Check if any off diagonals are less than the threshold value.
num_low = 0
is_okay = False
for od in off_diagonal:
if od < threshold:
num_low += 1
if num_low > 0:
_warnings.warn(
f"Overlap is poor: {num_low} off-diagonals are less than {threshold}."
)
else:
is_okay = True
return is_okay, num_low
[docs]
@staticmethod
def difference(pmf, pmf_ref=None):
"""
Compute the relative free-energy difference between two perturbation
legs, or between the end states of a single leg.
Parameters
----------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF) of interest. The data is a list
of tuples, where each tuple contains the lambda value, the PMF,
and the standard error.
pmf_ref : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The reference potential of mean force (PMF). The data is a list
of tuples, where each tuple contains the lambda value, the PMF,
and the standard error.
Returns
-------
free_energy : (:class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)
The relative free-energy difference and its associated error.
"""
if not isinstance(pmf, list):
raise TypeError("'pmf' must be of type 'list'.")
if pmf_ref is not None and not isinstance(pmf_ref, list):
raise TypeError("'pmf_ref' must be of type 'list'.")
for rec in pmf:
if not isinstance(rec, tuple):
raise TypeError(
"'pmf1' must contain tuples containing lambda "
"values and the associated free-energy and error."
)
else:
if len(rec) != 3:
raise ValueError(
"Each tuple in 'pmf1' must contain three items: "
"a lambda value and the associated free energy "
"and error."
)
for val in rec[1:]:
if not isinstance(val, _Types.Energy):
raise TypeError(
"'pmf' must contain 'BioSimSpace.Types.Energy' types."
)
if pmf_ref is not None:
for rec in pmf_ref:
if not isinstance(rec, tuple):
raise TypeError(
"'pmf_ref' must contain tuples containing lambda "
"values and the associated free-energy and error."
)
else:
if len(rec) != 3:
raise ValueError(
"Each tuple in 'pmf_ref' must contain three items: "
"a lambda value and the associated free energy "
"and error."
)
for val in rec[1:]:
if not isinstance(val, _Types.Energy):
raise TypeError(
"'pmf_ref' must contain 'BioSimSpace.Types.Energy' types."
)
if pmf_ref is not None:
# Work out the difference in free energy.
free_energy = (pmf[-1][1] - pmf[0][1]) - (pmf_ref[-1][1] - pmf_ref[0][1])
# Propagate the errors. (These add in quadrature.)
# Measure.
error0 = _math.sqrt(
(pmf[-1][2].value() * pmf[-1][2].value())
+ (pmf[0][2].value() * pmf[0][2].value())
)
# Reference.
error1 = _math.sqrt(
(pmf_ref[-1][2].value() * pmf_ref[-1][2].value())
+ (pmf_ref[0][2].value() * pmf_ref[0][2].value())
)
# Error for free-energy difference.
error = (
_math.sqrt((error0 * error0) + (error1 * error1))
* _Units.Energy.kcal_per_mol
)
else:
# Work out the difference in free energy.
free_energy = pmf[-1][1] - pmf[0][1]
# Work out the error.
error = (
_math.sqrt(
(pmf[-1][2].value() * pmf[-1][2].value())
+ (pmf[0][2].value() * pmf[0][2].value())
)
* _Units.Energy.kcal_per_mol
)
return (free_energy, error)
@staticmethod
def _get_data(files, temperatures, engine, estimator):
"""
files : list(pathlib.Path)
List of files for all lambda values to analyse. Should be sorted.
temperatures : list(float)
List of temperatures at which the simulation was carried out at for each lambda window.
Index of the temperature value should match it's corresponding lambda window index in files.
lambdas : list(float)
Sorted list of lambda values used for the simulation.
engine : str
Engine with which the simulation was run.
estimator : str
The estimator to use for the analysis. Options are "MBAR" or "TI".
Returns
-------
data : list(pandas.DataFrame)
A list of dataframes containing the data for each lambda window.
"""
if not isinstance(files, (tuple, list)):
raise TypeError("'files' must be of type 'list' or 'tuple'.")
if not all(isinstance(x, _pathlib.Path) for x in files):
raise TypeError("'files' must be a list of 'pathlib.Path' types.")
if not isinstance(temperatures, (tuple, list)):
raise TypeError("'temperatures' must be of type 'list' or 'tuple'.")
if not all(isinstance(x, float) for x in temperatures):
raise TypeError("'temperatures' must be a list of 'float' types.")
if not isinstance(engine, str):
raise TypeError("'engine' must be of type 'str'.")
engine = engine.replace(" ", "").upper()
if not engine in Relative._engines_analysis:
raise ValueError(
f"Unsupported engine '{engine}'. Options are: {', '.join(Relative._engines_analysis)}"
)
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
estimator = estimator.replace(" ", "").upper()
if not estimator in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
if estimator == "MBAR":
is_mbar = True
else:
is_mbar = False
from functools import partial
function_dict = {
"SOMD": partial(Relative._somd_extract, estimator=estimator),
"SOMD2": partial(Relative._somd2_extract, estimator=estimator),
"GROMACS": _gmx_extract_u_nk if is_mbar else _gmx_extract_dHdl,
"AMBER": _amber_extract_u_nk if is_mbar else _amber_extract_dHdl,
}
# Extract the data.
func = function_dict[engine]
try:
data = [func(file, T=temp) for file, temp in zip(files, temperatures)]
except Exception as e:
msg = "Could not extract the data from the provided files!"
if _isVerbose():
raise _AnalysisError(msg) from e
else:
raise _AnalysisError(msg) from None
return data
@staticmethod
def _get_u_nk(files, temperatures, engine):
"""
Get the u_nk dataframes for MBAR analysis.
Parameters
----------
files : list(pathlib.Path)
A list of data files.
temperatures : list(float)
A list of temperatures.
engine : str
The simulation engine used to generate the data.
Returns
-------
u_nk : list(pandas.DataFrame)
A list of dataframes containing the u_nk data for each lambda window.
"""
return Relative._get_data(files, temperatures, engine, "MBAR")
@staticmethod
def _get_dh_dl(files, temperatures, engine):
"""
Get the dh_dl dataframes for TI analysis.
Parameters
----------
files : list(pathlib.Path)
A list of data files.
temperatures : list(float)
A list of temperatures.
engine : str
The simulation engine used to generate the data.
Returns
-------
dh_dl : list(pandas.DataFrame)
A list of dataframes containing the u_nk data for each lambda window.
"""
return Relative._get_data(files, temperatures, engine, "TI")
def _analyse(self, estimator="MBAR"):
"""
Analyse free-energy data for this object.
Parameters
----------
estimator : str
The estimator to use for the free-energy analysis. ("MBAR" or "TI")
Returns
-------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF). The data is a list of tuples,
where each tuple contains the lambda value, the PMF, and the
standard error.
overlap : np.matrix, None
The overlap matrix. This gives the overlap between each lambda
window. This parameter is only computed when available for the
specified estimator and engine, otherwise None will be returned.
"""
# Return the result of calling the staticmethod, passing in the working
# directory of this object and the specified estimator.
return Relative.analyse(str(self._work_dir), estimator=estimator)
@staticmethod
def _somd_extract(simfile, T=None, estimator="MBAR"):
"""
Extract required data from a SOMD output file (simfile.dat).
Parameters
----------
simfile : str
Path to the simfile.dat file.
T : float
Temperature in Kelvin at which the simulations were performed;
needed to generated the reduced potential (in units of kT).
estimator : str
The estimator that the returned data will be used with. This can
be either 'MBAR' or 'TI'.
Returns
-------
data : pandas.DataFrame
Either: Reduced potential for each alchemical state (k) for each
frame (n) for MBAR, or dH/dl as a function of time for this lambda
window for TI.
"""
if not isinstance(simfile, _pathlib.Path):
raise TypeError("'simfile' must be of type 'pathlib.Path'.")
if not _os.path.isfile(simfile):
raise ValueError("'simfile' doesn't exist!")
if not isinstance(T, float):
raise TypeError("'T' must be of type 'float'.")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
if estimator.replace(" ", "").upper() not in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
# Flag that we're using MBAR.
if estimator == "MBAR":
is_mbar = True
else:
is_mbar = False
# For dhdl need to consider the temperature, as the gradient is in
# kcal/mol in the simfile.dat.
if not is_mbar:
k_b = _R_kJmol * _kJ2kcal
beta = 1 / (k_b * T)
# Flags to check if we have found the required records.
found_lambda = False
found_array = False
found_time = False
# Process the file.
with open(simfile, "r") as f:
# Terms to search for in the record lines.
start_w = "#Generating lambda is"
start_a = "#Alchemical array is"
start_t = " and "
end_t = " ps"
# Read the file line-by-line.
for line in f.readlines():
if start_w in line:
lambda_win = float(line.replace(start_w, "").strip())
if lambda_win is not None:
found_lambda = True
if start_a in line:
lambda_array = (
(line.replace(start_a, ""))
.strip()
.replace("(", "")
.replace(")", "")
.replace(" ", "")
).split(",")
lambda_array = [float(lam) for lam in lambda_array]
if lambda_array is not None:
found_array = True
if start_t and end_t in line:
sim_length = float(
((line.split(start_t)[1]).split(end_t)[0]).strip()
)
if sim_length is not None:
found_time = True
# All records found, break the loop.
if found_lambda:
if found_array:
if found_time:
break
if not found_lambda:
raise ValueError(
f"The lambda window was not detected in the SOMD output file: {file}"
)
if not found_array:
raise ValueError(
f"The lambda array was not detected in the SOMD output file: {file}"
)
if not found_time:
raise ValueError(
f"The simulation time was not detected in the SOMD output file: {file}"
)
# TODO: get header from the file instead of like this.
header = [
"step",
"potential",
"gradient",
"forward_Metropolis",
"backward_Metropolis",
]
header.extend(lambda_array)
file_df = _pd.read_fwf(
simfile, skipinitialspace=True, skiprows=13, header=None, names=header
)
time_step = sim_length / len(file_df["step"])
time_rows = _np.arange(0, len(file_df["step"]), 1)
time = _np.arange(0, sim_length, time_step)
# For MBAR, results in list of lists where each list is the 0 to 1
# window values that lambda value. For TI, it is a list of gradients
# at that lambda.
if is_mbar:
results = (
file_df.iloc[:, 5:].subtract(file_df[lambda_win], axis=0).to_numpy()
)
else:
# This is actually in units of kT, but is reported incorrectly in
# the file originally written by SOMD.
results = file_df["gradient"].to_numpy()
# Turn into a dataframe that can be processed by alchemlyb.
if is_mbar:
df = _pd.DataFrame(
results,
columns=_np.array(lambda_array, dtype=_np.float64),
index=_pd.MultiIndex.from_arrays(
[time, _np.repeat(lambda_win, len(time))],
names=["time", "fep-lambda"],
),
)
else:
df = _pd.DataFrame(
results,
columns=["fep"],
index=_pd.MultiIndex.from_arrays(
[time, _np.repeat(lambda_win, len(time))],
names=["time", "fep-lambda"],
),
)
df.attrs["temperature"] = T
df.attrs["energy_unit"] = "kT"
return df
@staticmethod
def _somd2_extract(parquet_file, T=None, estimator="MBAR"):
"""
Extract required data from a SOMD2 output file (parquet file).
Parameters
----------
parquet_file : str
Path to the parquet file.
T : float
Temperature in Kelvin at which the simulations were performed;
needed to generated the reduced potential (in units of kT).
estimator : str
The estimator that the returned data will be used with. This can
be either 'MBAR' or 'TI'.
Returns
-------
data : pandas.DataFrame
Either: Reduced potential for each alchemical state (k) for each
frame (n) for MBAR, or dH/dl as a function of time for this lambda
window for TI.
"""
if not isinstance(parquet_file, _pathlib.Path):
raise TypeError("'parquet_file' must be of type 'pathlib.Path'.")
if not _os.path.isfile(parquet_file):
raise ValueError("'parquet_file' doesn't exist!")
if not isinstance(T, float):
raise TypeError("'T' must be of type 'float'.")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
if estimator.replace(" ", "").upper() not in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
# Flag that we're using MBAR.
if estimator == "MBAR":
is_mbar = True
else:
is_mbar = False
# Try to read the file.
try:
table = _pq.read_table(parquet_file)
except:
raise IOError(f"Could not read the SOMD2 parquet file: {parquet_file}")
# Try to extract the metadata.
try:
metadata = _json.loads(table.schema.metadata["somd2".encode()])
except:
raise IOError(
f"Could not read the SOMD2 metadta from parquet file: {parquet_file}"
)
# Validate metadata required by all analysis methods.
try:
temperature = float(metadata["temperature"])
except:
raise ValueError("Parquet metadata does not contain 'temperature'.")
try:
attrs = dict(metadata["attrs"])
except:
raise ValueError("Parquet metadata does not contain 'attrs'.")
try:
lam = float(metadata["lambda"])
except:
raise ValueError("Parquet metadata does not contain 'lambda'.")
try:
lambda_array = metadata["lambda_array"]
except:
raise ValueError("Parquet metadata does not contain 'lambda array'")
if not is_mbar:
try:
lambda_grad = metadata["lambda_grad"]
except:
raise ValueError("Parquet metadata does not contain 'lambda grad'")
# Make sure that the temperature is correct.
if not T == temperature:
raise ValueError(
f"The temperature in the parquet metadata '{temperature:%.3f}' "
f"does not match the specified temperature '{T:%.3f}'."
)
# Convert to a pandas dataframe.
df = table.to_pandas()
if is_mbar:
# Extract the columns correspodning to the lambda array.
df = df[[x for x in lambda_array]]
# Subtract the potential at the simulated lambda.
df = df.subtract(df[lam], axis=0)
# Apply the existing attributes.
df.attrs = attrs
return df.dropna()
else:
# Forward or backward difference.
if len(lambda_grad) == 1:
lam_delta = lambda_grad[0]
# Forward difference.
if lam_delta > lam:
double_incr = (lam_delta - lam) * 2
grad = (df[lam_delta] - df[lam]) * 2 / double_incr
# Backward difference.
else:
double_incr = (lam - lam_delta) * 2
grad = (df[lam] - df[lam_delta]) * 2 / double_incr
# Central difference.
else:
lam_below, lam_above = lambda_grad
double_incr = lam_above - lam_below
grad = (df[lam_above] - df[lam_below]) / double_incr
# Create a DataFrame with the multi-index
df = _pd.DataFrame({"fep": grad}, index=df.index)
# Set the original attributes.
df.attrs = attrs
return df.dropna()
@staticmethod
def _preprocess_data(data, estimator, **kwargs):
"""
Preprocess FEP simulation data.
Parameters
----------
data : list
List of extracted dHdl or u_nk data.
estimator : str
The estimator ('MBAR' or 'TI') used.
Returns
-------
processed_data : pandas.DataFrame
Dataframe of dHdl or u_nk data processed using automated equilibration
detection followed by statistical inefficiency.
"""
if not isinstance(data, (list, _pd.DataFrame)):
raise TypeError("'data' must be of type 'list' or 'pandas.DataFrame'.")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
if not estimator.replace(" ", "").upper() in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
# Assign defaults in case not passed via kwargs.
auto_eq = False
stat_ineff = False
truncate = False
truncate_keep = "start"
# Parse kwargs.
for key, value in kwargs.items():
key = key.replace(" ", "").replace("_", "").upper()
if key == "AUTOEQUILIBRATION":
auto_eq = value
if key == "STATISTICALINEFFICIENCY":
stat_ineff = value
if key == "TRUNCATEPERCENTAGE":
truncate = value
if key == "TRUNCATEKEEP":
truncate_keep = value
# First truncate data.
raw_data = data
if truncate:
# Get the upper and lower bounds for truncate.
data_len = len(data[0])
data_step = round((data[0].index[-1][0] - data[0].index[-2][0]), 1)
data_kept = data_len * (truncate / 100)
data_time = data_kept * data_step
if truncate_keep == "start":
truncate_lower = 0
truncate_upper = data_time - data_step
if truncate_keep == "end":
truncate_lower = (data_len * data_step) - data_time
truncate_upper = (data_len * data_step) - data_step
try:
data = [
_slicing(i, lower=truncate_lower, upper=truncate_upper)
for i in raw_data
]
except:
_warnings.warn("Could not truncate data.")
data = raw_data
else:
data = raw_data
# The decorrelate function calls either autoequilibration or statistical_inefficiency
# in alchemlyb, which both subsample the data. As such, auto equilibration (remove_burnin)
# can only be carried out if statistical inefficency detection is also carried out.
if stat_ineff:
if estimator == "MBAR":
decorrelated_data = [
decorrelate_u_nk(i, method="dE", remove_burnin=auto_eq)
for i in data
]
elif estimator == "TI":
decorrelated_data = [
decorrelate_dhdl(i, remove_burnin=auto_eq) for i in data
]
sampled_data = decorrelated_data
for i in decorrelated_data:
if len(i.iloc[:, 0]) < 50:
_warnings.warn(
"Less than 50 samples as a result of preprocessing. No preprocessing will be performed."
)
sampled_data = data
else:
# Need stats ineff for auto eq to run as well.
if auto_eq:
_warnings.warn(
"Auto equilibration can only be detected if statistical inefficiency is run as well."
)
sampled_data = data
# Concatanate in alchemlyb format.
processed_data = _alchemlyb.concat(sampled_data)
return processed_data
@staticmethod
def _analyse_internal(files, temperatures, lambdas, engine, estimator, **kwargs):
"""
Analyse existing free-energy data using MBAR and the alchemlyb library.
Parameters
----------
files : list(pathlib.Path)
List of files for all lambda values to analyse. Should be sorted.
temperatures : list(float)
List of temperatures at which the simulation was carried out at for each lambda window.
Index of the temperature value should match it's corresponding lambda window index in files.
lambdas : list(float)
Sorted list of lambda values used for the simulation.
engine : str
Engine with which the simulation was run.
estimator : str
The estimator to use for the analysis. Options are "MBAR" or "TI".
Returns
-------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF). The data is a list of tuples,
where each tuple contains the lambda value, the PMF, and the
standard error.
overlap : numpy.matrix, None
The overlap matrix. This gives the overlap between each lambda
window. Returns None if overlap isn't supported for the chosen
estimator or engine.
"""
if not isinstance(files, (tuple, list)):
raise TypeError("'files' must be of type 'list' or 'tuple'.")
if not all(isinstance(x, _pathlib.Path) for x in files):
raise TypeError("'files' must be a list of 'pathlib.Path' types.")
if not isinstance(temperatures, (tuple, list)):
raise TypeError("'temperatures' must be of type 'list' or 'tuple'.")
if not all(isinstance(x, float) for x in temperatures):
raise TypeError("'temperatures' must be a list of 'float' types.")
if not isinstance(lambdas, (tuple, list)):
raise TypeError("'lambdas' must be of type 'list' or 'tuple'.")
if not all(isinstance(x, float) for x in lambdas):
raise TypeError("'lambdas' must be a list of 'float' types.")
if not isinstance(engine, str):
raise TypeError("'engine' must be of type 'str'.")
if not engine.replace(" ", "").upper() in Relative._engines_analysis:
raise ValueError(
f"Unsupported engine '{engine}'. Options are: {', '.join(Relative._engines_analysis)}"
)
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
estimator = estimator.replace(" ", "").upper()
if not estimator in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
if estimator == "MBAR":
is_mbar = True
else:
is_mbar = False
# Extract the data.
try:
data = Relative._get_data(files, temperatures, engine, estimator)
except Exception as e:
msg = "Could not extract the data from the provided files!"
if _isVerbose():
raise _AnalysisError(msg) from e
else:
raise _AnalysisError(msg) from None
# Preprocess the data.
try:
processed_data = Relative._preprocess_data(data, estimator, **kwargs)
except:
_warnings.warn("Could not preprocess the data!")
processed_data = _alchemlyb.concat(data)
mbar_method = None
if is_mbar:
# Check kwargs in case there is an mbar_method and then use this
for key, value in kwargs.items():
key = key.replace(" ", "").replace("_", "").upper()
if key == "MBARMETHOD":
mbar_method = value
# MBAR analysis using a specified method.
if mbar_method:
try:
alchem = _AutoMBAR(method=mbar_method)
alchem.fit(processed_data)
except Exception as e:
msg = f"MBAR free-energy analysis failed with {mbar_method} as mbar_method!"
if _isVerbose():
raise _AnalysisError(msg) from e
else:
raise _AnalysisError(msg) from None
# Standard analysis.
else:
try:
if is_mbar:
alchem = _AutoMBAR().fit(processed_data)
else:
alchem = _TI().fit(processed_data)
except Exception as e:
msg = f"{estimator} free-energy analysis failed!"
if _isVerbose():
raise _AnalysisError(msg) from e
else:
raise _AnalysisError(msg) from None
# Extract the data from the results.
data = []
# Convert the data frames to kcal/mol.
delta_f_ = _to_kcalmol(alchem.delta_f_)
d_delta_f_ = _to_kcalmol(alchem.d_delta_f_)
for lambda_, t in zip(lambdas, temperatures):
x = lambdas.index(lambda_)
mbar_value = delta_f_.iloc[0, x]
mbar_error = d_delta_f_.iloc[0, x]
# Append the data.
data.append(
(
lambda_,
(mbar_value) * _Units.Energy.kcal_per_mol,
(mbar_error) * _Units.Energy.kcal_per_mol,
)
)
if is_mbar:
return (data, _np.matrix(alchem.overlap_matrix))
else:
return (data, None)
@staticmethod
def _analyse_amber(work_dir=None, estimator="MBAR", method="alchemlyb", **kwargs):
"""
Analyse the AMBER free energy data.
Parameters
----------
work_dir : str
The path to the working directory.
estimator : str
The estimator ('MBAR' or 'TI') used.
method : str
The method used to analyse the data. Options are "alchemlyb" or "native".
Returns
-------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF). The data is a list of tuples,
where each tuple contains the lambda value, the PMF, and the
standard error.
overlap or dHdl : numpy.matrix or alchemlyb.estimators.ti_.TI
For MBAR, this returns the overlap matrix for the overlap between
each lambda window. For TI, this returns None.
"""
if not isinstance(work_dir, str):
raise TypeError("'work_dir' must be of type 'str'.")
if not _os.path.isdir(work_dir):
raise ValueError("'work_dir' doesn't exist!")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
if estimator.replace(" ", "").upper() not in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
if not isinstance(method, str):
raise TypeError("'method' must be of type 'str'.")
if method.replace(" ", "").upper() != "ALCHEMLYB":
raise ValueError(
"Only 'alchemyb' is supported as an analysis method for AMBER."
)
# Find the output files and work out the lambda windows from the directory names.
glob_path = _pathlib.Path(work_dir)
files = sorted(glob_path.glob("**/*.out"))
lambdas = []
for file in files:
for part in file.parts:
if "lambda" in part:
lambdas.append(float(part.split("_")[-1]))
# Find the temperature for each lambda window.
temperatures = []
for file, lambda_ in zip(files, lambdas):
found_temperature = False
with open(file) as f:
for line in f.readlines():
if not found_temperature:
match = _re.search(r"temp0=([\d.]+)", line)
if match is not None:
temperatures += [float(match.group(1))]
found_temperature = True
break
if not found_temperature:
raise ValueError(
f"The temperature was not detected in the AMBER output file: {file}."
)
if temperatures[0] != temperatures[-1]:
raise ValueError("The temperatures at the endstates don't match!")
return Relative._analyse_internal(
files, temperatures, lambdas, "AMBER", estimator, **kwargs
)
@staticmethod
def _analyse_gromacs(work_dir=None, estimator="MBAR", method="alchemlyb", **kwargs):
"""
Analyse GROMACS free energy data.
Parameters
----------
work_dir : str
The path to the working directory.
estimator : str
The estimator ('MBAR' or 'TI') used.
method : str
The method used to analyse the data. Options are "alchemlyb" or "native".
Returns
-------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF). The data is a list of tuples,
where each tuple contains the lambda value, the PMF, and the
standard error.
overlap or dHdl : numpy.matrix or alchemlyb.estimators.ti_.TI
For MBAR, this returns the overlap matrix for the overlap between
each lambda window. For TI, this returns None.
"""
if not isinstance(work_dir, str):
raise TypeError("'work_dir' must be of type 'str'.")
if not _os.path.isdir(work_dir):
raise ValueError("'work_dir' doesn't exist!")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
if not estimator.replace(" ", "").upper() in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
if not isinstance(method, str):
raise TypeError("'method' must be of type 'str'.")
method = method.replace(" ", "").upper()
if method not in ["ALCHEMLYB", "NATIVE"]:
raise ValueError("'method' must be either 'alchemlyb' or 'native'.")
if method == "ALCHEMLYB":
# Find the output files and work out the lambda windows from the directory names.
glob_path = _pathlib.Path(work_dir)
files = sorted(glob_path.glob("**/[!bar]*.xvg"))
lambdas = []
for file in files:
for part in file.parts:
if "lambda" in part:
lambdas.append(float(part.split("_")[-1]))
# Find the temperature at each lambda window
temperatures = []
for file in files:
found_temperature = False
with open(file, "r") as f:
for line in f.readlines():
t = None
start = "T ="
end = "(K)"
if start and end in line:
t = int(((line.split(start)[1]).split(end)[0]).strip())
temperatures.append(float(t))
if t is not None:
found_temperature = True
break
if not found_temperature:
raise ValueError(
f"The temperature was not detected in the GROMACS output file, {file}"
)
if temperatures[0] != temperatures[-1]:
raise ValueError("The temperatures at the endstates don't match!")
return Relative._analyse_internal(
files, temperatures, lambdas, "GROMACS", estimator, **kwargs
)
# For the older gromacs versions and native use the gmx bar analysis.
elif method == "NATIVE":
if _gmx_exe is None:
raise _MissingSoftwareError(
"Cannot use native gmx bar analysis as GROMACS is not installed!"
)
_warnings.warn(
"using 'native' for GROMACS does not return an overlap/dHdl."
)
_warnings.warn("using 'native' for GROMACS uses BAR.")
# Create the command.
glob_path = _pathlib.Path(work_dir)
xvg_files = sorted(glob_path.glob("**/[!bar]*.xvg"))
xvg_files = [str(file.absolute()) for file in xvg_files]
command = "%s bar -f %s -o %s/bar.xvg" % (
_gmx_exe,
" ".join(xvg_files),
work_dir,
)
# Run the command.
proc = _subprocess.run(
_Utils.command_split(command),
shell=False,
stdout=_subprocess.PIPE,
stderr=_subprocess.PIPE,
)
if proc.returncode != 0:
raise _AnalysisError("native GROMACS free-energy analysis failed!")
# Initialise list to hold the data.
data = []
# Extract the data from the output files.
with open("%s/bar.xvg" % work_dir) as file:
# Read all of the lines into a list.
lines = []
for line in file:
# Ignore comments and xmgrace directives.
if line[0] != "#" and line[0] != "@":
lines.append(line.rstrip())
# Store the initial free energy reading.
data.append(
(
0.0,
0.0 * _Units.Energy.kcal_per_mol,
0.0 * _Units.Energy.kcal_per_mol,
)
)
# Zero the accumulated error.
total_error = 0
# Zero the accumulated free energy difference.
total_freenrg = 0
# Process the BAR data.
for x, line in enumerate(lines):
# Extract the data from the line.
records = line.split()
# Update the total free energy difference.
total_freenrg += float(records[1])
# Extract the error.
error = float(records[2])
# Update the accumulated error.
total_error = _math.sqrt(total_error * total_error + error * error)
# Append the data.
data.append(
(
(x + 1) / (len(lines)),
(total_freenrg * _Units.Energy.kt).kcal_per_mol(),
(total_error * _Units.Energy.kt).kcal_per_mol(),
)
)
return (data, None)
@staticmethod
def _analyse_somd(work_dir=None, estimator="MBAR", method="alchemlyb", **kwargs):
"""
Analyse SOMD free energy data.
Parameters
----------
work_dir : str
The path to the working directory.
estimator : str
The estimator ('MBAR' or 'TI') used.
method : str
The method used to analyse the data. Options are "alchemlyb" or "native".
Returns
-------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF). The data is a list of tuples,
where each tuple contains the lambda value, the PMF, and the
standard error.
overlap or dHdl : numpy.matrix or alchemlyb.estimators.ti_.TI
For MBAR, this returns the overlap matrix for the overlap between
each lambda window. For TI, this returns None.
"""
if not isinstance(work_dir, str):
raise TypeError("'work_dir' must be of type 'str'.")
if not _os.path.isdir(work_dir):
raise ValueError("'work_dir' doesn't exist!")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
estimator = estimator.replace(" ", "").upper()
if estimator not in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
if not isinstance(method, str):
raise TypeError("'method' must be of type 'str'.")
method = method.replace(" ", "").upper()
if not method in ["ALCHEMLYB", "NATIVE"]:
raise ValueError("'method' must be either 'alchemlyb' or 'native'.")
if method == "ALCHEMLYB":
# Glob the data files and work out the lambda values.
glob_path = _pathlib.Path(work_dir)
files = sorted(glob_path.glob("**/simfile.dat"))
lambdas = []
for file in files:
for part in file.parts:
if "lambda" in part:
lambdas.append(float(part.split("_")[-1]))
temperatures = []
for file in files:
found_temperature = False
with open(file, "r") as f:
for line in f.readlines():
temp = None
start = "#Generating temperature is"
if start in line:
split_line = (line.split(start)[1]).strip().split(" ")
temp = split_line[0]
unit = split_line[-1]
if unit.upper() == "C":
temp = float(temp) + 273.15
else:
temp = float(temp)
temperatures.append(temp)
if temp is not None:
found_temperature = True
break
if not found_temperature:
raise ValueError(
f"The temperature was not detected in the SOMD output file: {file}"
)
if temperatures[0] != temperatures[-1]:
raise ValueError("The temperatures at the endstates don't match!")
return Relative._analyse_internal(
files, temperatures, lambdas, "SOMD", estimator, **kwargs
)
elif method == "NATIVE":
# Create the command.
command = (
"%s mbar -i %s/lambda_*/simfile.dat -o %s/mbar.txt --overlap --percent 5"
% (_analyse_freenrg, work_dir, work_dir)
)
# Run the first command.
proc = _subprocess.run(
_Utils.command_split(command),
shell=False,
stdout=_subprocess.PIPE,
stderr=_subprocess.PIPE,
)
if proc.returncode != 0:
raise _AnalysisError("Native SOMD free-energy analysis failed!")
# Re-run without subsampling if the subsampling has resulted in less than 50 samples.
with open("%s/mbar.txt" % work_dir) as file:
for line in file:
if (
"#WARNING SUBSAMPLING ENERGIES RESULTED IN LESS THAN 50 SAMPLES"
in line
):
_warnings.warn(
"Subsampling resulted in less than 50 samples, "
f"re-running without subsampling for '{work_dir}'"
)
command = (
"%s mbar -i %s/lambda_*/simfile.dat -o %s/mbar.txt --overlap"
% (_analyse_freenrg, work_dir, work_dir)
)
proc = _subprocess.run(
_Util.command_split(command),
shell=False,
stdout=_subprocess.PIPE,
stderr=_subprocess.PIPE,
)
if proc.returncode != 0:
raise _AnalysisError("SOMD free-energy analysis failed!")
break
# Initialise list to hold the data.
data = []
# Initialise list to hold the overlap matrix.
overlap = []
# Extract the data from the output files.
with open("%s/mbar.txt" % work_dir) as file:
# Process the MBAR data.
for line in file:
# Process the overlap matrix.
if "#Overlap matrix" in line:
# Get the next row.
row = next(file)
# Loop until we hit the next section.
while not row.startswith("#DG"):
# Extract the data for this row.
records = [float(x) for x in row.split()]
# Append to the overlap matrix.
overlap.append(records)
# Get the next line.
row = next(file)
# Process the PMF.
elif "PMF from MBAR" in line:
# Get the next row.
row = next(file)
# Loop until we hit the next section.
while not row.startswith("#TI"):
# Split the line.
records = row.split()
# Append the data.
data.append(
(
float(records[0]),
float(records[1]) * _Units.Energy.kcal_per_mol,
float(records[2]) * _Units.Energy.kcal_per_mol,
)
)
# Get the next line.
row = next(file)
return (data, _np.matrix(overlap))
@staticmethod
def _analyse_somd2(work_dir=None, estimator="MBAR", method="alchemlyb", **kwargs):
"""
Analyse SOMD2 free energy data.
Parameters
----------
work_dir : str
The path to the working directory.
estimator : str
The estimator ('MBAR' or 'TI') used.
method : str
The method used to analyse the data. Options are "alchemlyb" or "native".
Returns
-------
pmf : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The potential of mean force (PMF). The data is a list of tuples,
where each tuple contains the lambda value, the PMF, and the
standard error.
overlap or dHdl : numpy.matrix or alchemlyb.estimators.ti_.TI
For MBAR, this returns the overlap matrix for the overlap between
each lambda window. For TI, this returns None.
"""
if not isinstance(work_dir, str):
raise TypeError("'work_dir' must be of type 'str'.")
if not _os.path.isdir(work_dir):
raise ValueError("'work_dir' doesn't exist!")
if not isinstance(estimator, str):
raise TypeError("'estimator' must be of type 'str'.")
estimator = estimator.replace(" ", "").upper()
if estimator not in ["MBAR", "TI"]:
raise ValueError("'estimator' must be either 'MBAR' or 'TI'.")
if not isinstance(method, str):
raise TypeError("'method' must be of type 'str'.")
method = method.replace(" ", "").upper()
if method != "ALCHEMLYB":
raise ValueError(
"Only 'alchemlyb' analysis 'method' is supported for SOMD2."
)
# Glob the data files.
glob_path = _pathlib.Path(work_dir)
files = sorted(glob_path.glob("**/*.parquet"))
# Loop over each file and try to extract the metadata to work out
# the lambda value and temperature for each window.
lambdas = []
temperatures = []
for file in files:
try:
metadata = _json.loads(
_pq.read_metadata(file).metadata["somd2".encode()]
)
lambdas.append(float(metadata["lambda"]))
temperatures.append(float(metadata["temperature"]))
except:
raise IOError(f"Unable to parse metadata from SOMD2 file: {file}")
# Sort the lists based on the lambda values.
temperatures = [x for _, x in sorted(zip(lambdas, temperatures))]
lambdas = sorted(lambdas)
# Check that the temperatures at the end states match.
if temperatures[0] != temperatures[-1]:
raise ValueError("The temperatures at the endstates don't match!")
return Relative._analyse_internal(
files, temperatures, lambdas, "SOMD2", estimator, **kwargs
)
def _checkOverlap(self, estimator="MBAR", threshold=0.03):
"""
Check the overlap of an FEP simulation leg.
Parameters
----------
estimator : str
The estimator used for the free-energy analysis. ("MBAR" or "TI")
threshold : float
The threshold value used to check the off-diagonals.
Returns
-------
is_okay : boolean
True if the overlap is okay, False if any off-diagonals are less
than the threshold value.
num_low : int
The number of off-diagonals that are less than the threshold value.
"""
# Calculate the overlap for this object.
_, overlap = self.analyse(estimator=estimator)
if overlap:
return Relative.checkOverlap(overlap, threshold=threshold)
else:
raise ValueError("Overlap matrix isn't supported for this estimator.")
def _difference(self, pmf_ref=None):
"""
Compute the relative free-energy difference between two perturbation
legs, or between the end states of a single leg.
Parameters
----------
pmf_ref : [(float, :class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)]
The reference potential of mean force (PMF). The data is a list
of tuples, where each tuple contains the lambda value, the PMF,
and the standard error.
Returns
-------
free_energy : (:class:`Energy <BioSimSpace.Types.Energy>`, :class:`Energy <BioSimSpace.Types.Energy>`)
The relative free-energy difference and its associated error.
"""
# Calculate the PMF for this object.
pmf, _ = self.analyse()
# Now call the staticmethod passing in both PMFs.
return Relative.difference(pmf, pmf_ref=pmf_ref)
def _initialise_runner(self, system):
"""
Internal helper function to initialise the process runner.
Parameters
----------
system : :class:`System <BioSimSpace._SireWrappers.System>`
The molecular system.
"""
# Initialise list to store the processe
processes = []
# Convert to an appropriate water topology.
if self._engine in ["AMBER", "SOMD"]:
system._set_water_topology("AMBER", property_map=self._property_map)
elif self._engine == "GROMACS":
system._set_water_topology("GROMACS", property_map=self._property_map)
# Setup all of the simulation processes for each leg.
# Get the lambda values from the protocol for the first leg.
lam_vals = self._protocol.getLambdaValues()
# Create a process for the first lambda value.
lam = lam_vals[0]
# Update the protocol lambda values.
self._protocol.setLambdaValues(lam=lam, lam_vals=lam_vals)
# Create and append the required processes for each leg.
# Nest the working directories inside self._work_dir.
# Name the first directory.
first_dir = "%s/lambda_%5.4f" % (self._work_dir, lam)
# SOMD.
if self._engine == "SOMD":
# Check for GPU support.
if "CUDA_VISIBLE_DEVICES" in _os.environ:
platform = "CUDA"
else:
platform = "CPU"
first_process = _Process.Somd(
system,
self._protocol,
platform=platform,
work_dir=first_dir,
property_map=self._property_map,
**self._kwargs,
)
if self._setup_only:
del first_process
else:
processes.append(first_process)
# GROMACS.
elif self._engine == "GROMACS":
first_process = _Process.Gromacs(
system,
self._protocol,
work_dir=first_dir,
property_map=self._property_map,
**self._kwargs,
)
if not self._setup_only:
processes.append(first_process)
# AMBER.
elif self._engine == "AMBER":
first_process = _Process.Amber(
system,
self._protocol,
work_dir=first_dir,
property_map=self._property_map,
**self._kwargs,
)
if self._setup_only:
del first_process
else:
processes.append(first_process)
# Loop over the rest of the lambda values.
for x, lam in enumerate(lam_vals[1:]):
# Name the directory.
new_dir = "%s/lambda_%5.4f" % (self._work_dir, lam)
# Use absolute path.
if not _os.path.isabs(new_dir):
new_dir = _os.path.abspath(new_dir)
# Delete any existing directories.
if _os.path.isdir(new_dir):
_shutil.rmtree(new_dir, ignore_errors=True)
# Copy the first directory to that of the current lambda value.
_shutil.copytree(first_dir, new_dir)
# Update the protocol lambda values.
self._protocol.setLambdaValues(lam=lam, lam_vals=lam_vals)
# Now update the lambda values in the config files.
# SOMD.
if self._engine == "SOMD":
new_config = []
with open(new_dir + "/somd.cfg", "r") as f:
for line in f:
if "lambda_val" in line:
new_config.append("lambda_val = %s\n" % lam)
else:
new_config.append(line)
with open(new_dir + "/somd.cfg", "w") as f:
for line in new_config:
f.write(line)
# Create a copy of the process and update the working
# directory.
if not self._setup_only:
process = _copy.copy(first_process)
process._system = first_process._system.copy()
process._protocol = self._protocol
process._work_dir = new_dir
process._std_out_file = _os.path.join(new_dir, "somd.out")
process._std_err_file = _os.path.join(new_dir, "somd.err")
process._rst_file = _os.path.join(new_dir, "somd.rst7")
process._top_file = _os.path.join(new_dir, "somd.prm7")
process._traj_file = _os.path.join(new_dir, "traj000000001.dcd")
process._restart_file = _os.path.join(new_dir, "latest.rst")
process._config_file = _os.path.join(new_dir, "somd.cfg")
process._pert_file = _os.path.join(new_dir, "somd.pert")
process._gradients_file = _os.path.join(new_dir, "gradients.dat")
process._input_files = [
process._config_file,
process._rst_file,
process._top_file,
process._pert_file,
]
processes.append(process)
# GROMACS.
elif self._engine == "GROMACS":
# Delete the existing binary run file.
_os.remove(new_dir + "/gromacs.tpr")
new_config = []
with open(new_dir + "/gromacs.mdp", "r") as f:
for line in f:
if "init-lambda-state" in line:
new_config.append("init-lambda-state = %d\n" % (x + 1))
else:
new_config.append(line)
with open(new_dir + "/gromacs.mdp", "w") as f:
for line in new_config:
f.write(line)
mdp = _os.path.join(new_dir, "gromacs.mdp")
gro = _os.path.join(new_dir, "gromacs.gro")
top = _os.path.join(new_dir, "gromacs.top")
tpr = _os.path.join(new_dir, "gromacs.tpr")
# Use grompp to generate the portable binary run input file.
_Process.Gromacs._generate_binary_run_file(
mdp,
gro,
top,
gro,
tpr,
first_process._exe,
**self._kwargs,
)
# Create a copy of the process and update the working
# directory.
if not self._setup_only:
process = _copy.copy(first_process)
process._system = first_process._system.copy()
process._protocol = self._protocol
process._work_dir = new_dir
process._std_out_file = _os.path.join(new_dir, "gromacs.out")
process._std_err_file = _os.path.join(new_dir, "gromacs.err")
process._gro_file = _os.path.join(new_dir, "gromacs.gro")
process._top_file = _os.path.join(new_dir, "gromacs.top")
process._ref_file = _os.path.join(new_dir, "gromacs_ref.gro")
process._traj_file = _os.path.join(new_dir, "gromacs.trr")
process._config_file = _os.path.join(new_dir, "gromacs.mdp")
process._tpr_file = _os.path.join(new_dir, "gromacs.tpr")
process._input_files = [
process._config_file,
process._gro_file,
process._top_file,
process._tpr_file,
]
processes.append(process)
# AMBER.
elif self._engine == "AMBER":
new_config = []
with open(new_dir + "/amber.cfg", "r") as f:
for line in f:
if "clambda" in line:
new_config.append(" clambda=%s,\n" % lam)
else:
new_config.append(line)
with open(new_dir + "/amber.cfg", "w") as f:
for line in new_config:
f.write(line)
# Create a copy of the process and update the working
# directory.
if not self._setup_only:
process = _copy.copy(first_process)
process._system = first_process._system.copy()
process._protocol = self._protocol
process._work_dir = new_dir
process._std_out_file = _os.path.join(new_dir, "amber.out")
process._std_err_file = _os.path.join(new_dir, "amber.err")
process._rst_file = _os.path.join(new_dir, "amber.rst7")
process._top_file = _os.path.join(new_dir, "amber.prm7")
process._ref_file = _os.path.join(new_dir, "amber_ref.rst7")
process._traj_file = _os.path.join(new_dir, "amber.nc")
process._config_file = _os.path.join(new_dir, "amber.cfg")
process._nrg_file = _os.path.join(new_dir, "amber.nrg")
process._input_files = [
process._config_file,
process._rst_file,
process._top_file,
]
processes.append(process)
if not self._setup_only:
# Initialise the process runner. All processes have already been nested
# inside the working directory so no need to re-nest.
self._runner = _Process.ProcessRunner(processes)
[docs]
def getData(name="data", file_link=False, work_dir=None):
"""
Return a link to a zip file containing the data files required for
post-simulation analysis.
Parameters
----------
name : str
The name of the zip file.
file_link : bool
Whether to return a FileLink when working in Jupyter.
work_dir : str
The working directory for the simulation.
Returns
-------
output : str, IPython.display.FileLink
A path, or file link, to an archive of the process input.
"""
return Relative.getData(name=name, file_link=file_link, work_dir=work_dir)