Source code for mofbuilder.md.setup

from openmm.app import (
    GromacsGroFile,
    GromacsTopFile,
    PDBFile,
    PDBReporter,
    NoCutoff,
    Simulation,
    PME,
    HBonds,
    StateDataReporter,
)
from openmm import LangevinIntegrator, MonteCarloBarostat
from openmm.unit import (
    atmosphere,
    picosecond,
    picoseconds,
    kelvin,
    femtoseconds,
    nanometer,
)
import mpi4py.MPI as MPI
import sys
from typing import Optional, List, Any, Union
from veloxchem.outputstream import OutputStream
from veloxchem.veloxchemlib import mpi_master


[docs] class OpenmmSetup: """Set up and execute GROMACS/OPENMM simulations for molecular dynamics. Handles system initialization, energy minimization, equilibration in NVT/NPT ensembles, and trajectory/log file writing, with parallel output support via MPI. Attributes: comm (Any): MPI communicator. rank (int): MPI process rank. nodes (int): Number of MPI processes. ostream (OutputStream): Output stream for printing/logging. system_pbc (bool): Whether to use periodic boundary conditions. gro_file (str): Path to GROMACS .gro coordinates file. top_file (str): Path to GROMACS .top topology file. temperature_K (float): Simulation temperature in Kelvin. timestep_fs (float): Integration timestep in femtoseconds. gro: Loaded GROMACS structure file (GromacsGroFile). top: Loaded GROMACS topology file (GromacsTopFile). positions: Atomic positions. barostat_added (bool): Tracks if barostat has been added for NPT. simulation: The OpenMM Simulation object (initialized on demand). temperature (openmm.unit.Quantity): Temperature (set during pipeline). timestep (openmm.unit.Quantity): Timestep (set during pipeline). system: OpenMM System object (set during pipeline). """ def __init__( self, gro_file: str, top_file: str, temperature_K: float = 300, timestep_fs: float = 1, comm: Optional[Any] = None, ostream: Optional[OutputStream] = None, ): """Initializes the OpenmmSetup simulation context. Args: gro_file (str): Path to input .gro coordinate file. top_file (str): Path to input .top topology file. temperature_K (float, optional): Simulation temperature in Kelvin. Defaults to 300. timestep_fs (float, optional): Integration timestep (femtoseconds). Defaults to 1. comm (Any, optional): MPI communicator. Defaults to MPI.COMM_WORLD. ostream (OutputStream, optional): Output stream for logging. """ self.comm = comm or MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.nodes = self.comm.Get_size() self.ostream = ostream or OutputStream( sys.stdout if self.rank == mpi_master() else None ) self.system_pbc = True # assume PBC for MD self.gro_file = gro_file self.top_file = top_file self.temperature_K = temperature_K self.timestep_fs = timestep_fs self.simulation = None # type: Optional[Simulation] # Load GROMACS files self.gro = GromacsGroFile(gro_file) self.top = GromacsTopFile( top_file, periodicBoxVectors=self.gro.getPeriodicBoxVectors(), includeDir="." ) self.positions = self.gro.positions self.barostat_added = False def _steps_from_ps(self, ps_time: float) -> int: """Convert a time in picoseconds to number of simulation steps. Args: ps_time (float): Duration (picoseconds). Returns: int: Number of steps for current timestep. """ return int(ps_time / self.timestep.value_in_unit(picoseconds))
[docs] def run_em(self, output_prefix: str = "em", whole_traj_file: Optional[str] = None) -> None: """Performs energy minimization using OpenMM and writes output. Writes a minimized structure (PDB) to disk and logs potential/temperature. Optionally appends coordinates to a trajectory PDB. Args: output_prefix (str, optional): Prefix for output and log files. whole_traj_file (Optional[str], optional): If provided, append structure to this PDB. """ print("Starting energy minimization...") if self.simulation is None: integrator = LangevinIntegrator( self.temperature, 1 / picosecond, self.timestep ) self.simulation = Simulation(self.top.topology, self.system, integrator) self.simulation.context.setPositions(self.positions) self.simulation.minimizeEnergy() self.positions = self.simulation.context.getState( getPositions=True ).getPositions() # Add log self.simulation.reporters.append( StateDataReporter( f"{output_prefix}.log", 1, step=True, potentialEnergy=True, temperature=True ) ) # Save minimized structure em_file = f"{output_prefix}_minimized.pdb" PDBFile.writeFile( self.simulation.topology, self.positions, open(em_file, "w") ) print("Energy minimization done.") # Append to whole trajectory if requested if whole_traj_file: PDBFile.writeFile( self.simulation.topology, self.positions, open(whole_traj_file, "a") )
def _run_md( self, mode: str = "nvt", time_ps: float = 100, output_prefix: str = "traj", record_interval_ps: float = 1, whole_traj_file: Optional[str] = None, ) -> None: """Runs an MD segment in either NVT or NPT ensemble. Args: mode (str, optional): 'nvt' or 'npt'. time_ps (float, optional): Duration to run (picoseconds). output_prefix (str, optional): Output file prefix. record_interval_ps (float, optional): Trajectory/log frame interval (picoseconds). whole_traj_file (Optional[str], optional): If given, appends frames to this PDB. """ nsteps = self._steps_from_ps(time_ps) steps_per_frame = self._steps_from_ps(record_interval_ps) if mode.lower() == "npt" and not self.barostat_added: barostat = MonteCarloBarostat(1 * atmosphere, self.temperature) self.system.addForce(barostat) self.barostat_added = True if self.simulation is None: integrator = LangevinIntegrator( self.temperature, 1 / picosecond, self.timestep ) self.simulation = Simulation(self.top.topology, self.system, integrator) self.simulation.context.setPositions(self.positions) # Set reporters if whole_traj_file: self.simulation.reporters.append( PDBReporter(whole_traj_file, steps_per_frame) ) else: self.simulation.reporters.append( PDBReporter(f"{output_prefix}.pdb", steps_per_frame) ) self.simulation.reporters.append( StateDataReporter( f"{output_prefix}.log", steps_per_frame, step=True, potentialEnergy=True, temperature=True, density=(mode.lower() == "npt"), ) ) print(f"Running {mode.upper()} for {time_ps} ps ...") self.simulation.step(nsteps) self.positions = self.simulation.context.getState( getPositions=True ).getPositions() print(f"{mode.upper()} done.")
[docs] def run_nvt( self, nvt_time: float = 100, output_prefix: str = "nvt", record_interval: float = 1, whole_traj_file: Optional[str] = None, ) -> None: """Run an NVT (constant volume) MD segment. Args: nvt_time (float, optional): Simulation time (ps). output_prefix (str, optional): Output file prefix. record_interval (float, optional): Frame/log interval (ps). whole_traj_file (Optional[str], optional): Append all frames to this PDB if provided. """ self._run_md( mode="nvt", time_ps=nvt_time, output_prefix=output_prefix, record_interval_ps=record_interval, whole_traj_file=whole_traj_file, )
[docs] def run_npt( self, npt_time: float = 100, output_prefix: str = "npt", record_interval: float = 1, whole_traj_file: Optional[str] = None, ) -> None: """Run an NPT (constant pressure) MD segment. Args: npt_time (float, optional): Simulation time (ps). output_prefix (str, optional): Output file prefix. record_interval (float, optional): Frame/log interval (ps). whole_traj_file (Optional[str], optional): Append all frames to this PDB if provided. """ self._run_md( mode="npt", time_ps=npt_time, output_prefix=output_prefix, record_interval_ps=record_interval, whole_traj_file=whole_traj_file, )
[docs] def run_pipeline( self, steps: Optional[List[str]] = None, nvt_time: float = 100, npt_time: float = 100, output_prefix: str = "mdsim", record_interval: float = 1, whole_traj: bool = False, ) -> None: """Executes an MD workflow: minimization, NVT, and/or NPT on demand. The method prepares the system, then runs the requested steps in order. If whole_traj is enabled, all coordinates are appended to a single trajectory PDB. Args: steps (Optional[List[str]], optional): List of stages to run (e.g., ['em', 'nvt', 'npt']). nvt_time (float, optional): Time for NVT segment (ps). npt_time (float, optional): Time for NPT segment (ps). output_prefix (str, optional): Prefix for all outputs/logs. record_interval (float, optional): Frame/log interval (ps). whole_traj (bool, optional): If True, write all frames to a single PDB. Note: The system and OpenMM integrator are initialized in this function. """ if steps is None: steps = ["em", "nvt", "npt"] self.temperature = self.temperature_K * kelvin self.timestep = self.timestep_fs * femtoseconds whole_traj_file = f"{output_prefix}_whole.pdb" if whole_traj else None self.system = self.top.createSystem( nonbondedMethod=PME if self.system_pbc else NoCutoff, nonbondedCutoff=1 * nanometer, constraints=HBonds, rigidWater=True, ) for step in steps: if step.lower() == "em": self.run_em(output_prefix=f"{output_prefix}_em", whole_traj_file=whole_traj_file) elif step.lower() == "nvt": self.run_nvt( nvt_time=nvt_time, output_prefix=f"{output_prefix}_nvt", record_interval=record_interval, whole_traj_file=whole_traj_file, ) elif step.lower() == "npt": self.run_npt( npt_time=npt_time, output_prefix=f"{output_prefix}_npt", record_interval=record_interval, whole_traj_file=whole_traj_file, ) else: print(f"Unknown step: {step}")