"""
Conversion helpers for one-way TPS simulations
These classes are used to make it very easy to convert from existing one-way
TPS simulations. They require the user to do the following:
1. Create an OPS network representing their TPS simulation.
2. Create a summary file, which consists of one line per MC step, where each
line is the space-separated ``file_name shooting_point direction accepted``,
where ``file_name`` is the name of the file for the partial trajectory,
``shooting_point`` is the index of the shooting point from (from zero) in
the *previous* trajectory, ``direction`` tells whether this was a forward
or backward shot, and ``accepted`` tells whether the path was accepted.
3. If the input trajectories are full trajectories instead of only the
partial one-way trajectories for each move, then you need to provide an
addition element on each line (after ``accepted``) for the
``shooting_point_in_trial``, which is the index of the shooting point in the
trial trajectory.
Valid values for forward ``direction`` include ``+1``, ``1``, ``FW``,
``forward``, with backward given by ``-1``, ``BW``, or ``backward``.
Valid values for a trajectory that is ``accepted`` are ``T``, ``True``,
``1``, ``Y``, and ``Yes``. If the trajectory is not accepted, use ``F``,
``False``, ``0``, ``N``, or ``No``.
In general, we suggest the ``FW``/``BW`` pair for ``direction``, and
``True``/``False`` for ``accepted``. These tend to be most readable.
"""
import os
import openpathsampling as paths
import ops_piggybacker as oink
from openpathsampling.engines.openmm.tools import ops_load_trajectory
from openpathsampling.tools import refresh_output
from collections import namedtuple
_tps_converter_option_list = ('trim retrim_shooting auto_reverse '
+ 'includes_shooting_point full_trajectory')
[docs]class TPSConverterOptions(namedtuple("TPSConverterOptions",
_tps_converter_option_list)):
"""
Parameters
----------
trim : bool
whether to trim the file trajectories to minimum acceptable
length (default True)
retrim_shooting : bool
whether the shooting point index given is based on an untrimmed
trajectory, and therefore needs to be shifted (default False).
auto_reverse : bool
whether to reverse backward trajectories (if the file version is
forward, instead of backward, default False)
includes_shooting_point : bool
whether the one-way trial trajectory includes the shooting
point, and therefore must have it trimmed off (default True)
full_trajectory : bool
whether the input trajectories are the full trajectories, instead of
the partial one-way trajectories (default False). Note that if you
use full_trajectory=True, you should also use auto_reverse=False.
"""
__slots__ = ()
def __new__(cls, trim=True, retrim_shooting=False, auto_reverse=False,
includes_shooting_point=True, full_trajectory=False):
return super(TPSConverterOptions, cls).__new__(
cls, trim, retrim_shooting, auto_reverse,
includes_shooting_point, full_trajectory
)
[docs]class OneWayTPSConverter(oink.ShootingPseudoSimulator):
"""
Single-ensemble network shooting pseudo-simulator from external
trajectories.
This object handles a wide variety of external simulators. The idea is
that the user must create a "simulation summary" file, which contains
the information we need to perform the pseudo-simulation, where the
trajectories are loaded via mdtraj.
"""
def __init__(self, storage, initial_file, mover, network, options=None,
options_rejected=None):
# TODO: mke the initial file into an initial trajectory
if options is None:
options = TPSConverterOptions()
if options_rejected is None:
options_rejected = options
self.options = options
self.options_rejected = options_rejected
self.initial_file = initial_file # needed for restore
traj = self.load_trajectory(initial_file)
# assume we're TPS here
ensemble = network.sampling_ensembles[0]
initial_trajectories = ensemble.split(traj)
if len(initial_trajectories) == 0: # pragma: no cover
raise RuntimeError("Initial trajectory in " + str(initial_file)
+ " has no subtrajectory satisfying the "
+ "TPS ensemble.")
elif len(initial_trajectories) > 1: # pragma: no cover
raise RuntimeWarning("More than one potential initial "
+ "subtrajectory. We use the first.")
initial_trajectory = initial_trajectories[0]
initial_conditions = paths.SampleSet([
paths.Sample(replica=0,
trajectory=initial_trajectory,
ensemble=ensemble)
])
self.extra_bw_frames = traj.index(initial_trajectory[0])
final_frame_index = traj.index(initial_trajectory[-1])
self.extra_fw_frames = len(traj) - final_frame_index - 1
# extra -1 bc frame index counts from 0; len counts from 1
self.summary_root_dir = None
self.report_progress = None
super(OneWayTPSConverter, self).__init__(
storage=storage,
initial_conditions=initial_conditions,
mover=mover,
network=network
)
# initial_states = self.network.initial_states
# final_states = self.network.final_states
# TODO: prefer the above, but the below work until fix for network
# storage
initial_states = [self.network.sampling_transitions[0].stateA]
final_states = [self.network.sampling_transitions[0].stateB]
all_states = paths.join_volumes(initial_states + final_states)
self.fw_ensemble = paths.SequentialEnsemble([
paths.AllOutXEnsemble(all_states),
paths.AllInXEnsemble(all_states) & paths.LengthEnsemble(1)
])
self.bw_ensemble = paths.SequentialEnsemble([
paths.AllInXEnsemble(all_states) & paths.LengthEnsemble(1),
paths.AllOutXEnsemble(all_states)
])
self.full_ensemble = paths.SequentialEnsemble([
paths.AllInXEnsemble(all_states) & paths.LengthEnsemble(1),
paths.AllOutXEnsemble(all_states),
paths.AllInXEnsemble(all_states) & paths.LengthEnsemble(1)
])
self.all_states = all_states
def load_trajectory(self, file_name):
raise NotImplementedError(
"Can't instantiate abstract OneWayTPSConverter: Use a subclass"
)
@staticmethod
def _get_direction(val):
"""Identifies the direction based on val"""
is_forward = ['+1', '1', 'FW', 'F', 'FORWARD']
is_backward = ['-1', 'BW', 'B', 'BACKWARD']
if val.upper() in is_forward:
return +1
elif val.upper() in is_backward:
return -1
else: # pragma: no cover
raise ValueError("Unrecognized direction: " + str(val))
@staticmethod
def _get_accepted(val):
is_true = ['1', 'T', 'TRUE', 'Y', 'YES', 'ACC']
is_false = ['0', 'F', 'FALSE', 'N', 'NO', 'REJ']
if val.upper() in is_true:
return True
elif val.upper() in is_false:
return False
else: # pragma: no cover
raise ValueError("Unknown truth value for acceptance: " +
str(val))
[docs] def parse_summary_line(self, line):
"""Parse a line from the summary file.
To control the parsing, set the OneWayTPSConverter.options (see
:class:`.TPSConverterOptions`).
Parameters
----------
line : str
the input line
Returns
-------
replica : 0
always zero for now
trial_trajectory : openpathsampling.Trajectory
one-way trial segments
shooting_point_index : int
index of the shooting point based on the previous trajectory
(None if no previous trajectory)
accepted : bool
whether the trial was accepted
direction : 1 or -1
positive if forward shooting, negative if backward
"""
#print line
splitted = line.split()
assert 4 <= len(splitted) <= 5, \
"Incorrect number of fields in input: " + line
replica = 0
file_name = splitted[0]
full_file_name = os.path.join(self.summary_root_dir, file_name)
trajectory = self.load_trajectory(full_file_name)
shooting_index = int(splitted[1])
direction = self._get_direction(splitted[2])
accepted = self._get_accepted(splitted[3])
if accepted:
options = self.options
else:
options = self.options_rejected
# if reversed, make sure time is in the right direction
if options.auto_reverse and direction < 0:
trajectory = trajectory.reversed
old_extra_bw_frames = self.extra_bw_frames
old_extra_fw_frames = self.extra_fw_frames
# if reversed, make sure time is in the right direction
# ensure the trajectory doesn't have extra frames
if options.trim and not options.full_trajectory:
len_pre_trim = len(trajectory)
if direction > 0:
# in the forward direction, we do an extra check which can
# make this much faster (otherwise, this is quadratic until
# OPS supports some of the ideas of path ensemble theory).
# The BW part doesn't need this bc it will already be linear
# in this case.
# Check whether amy states are in that traj
if any(self.all_states(s) for s in trajectory):
segments = self.fw_ensemble.split(trajectory)
else:
segments = []
# If following not true, simulation failed. Perhaps max
# length or simulation crash? Should be fine as rejected
if len(segments) > 0 or accepted:
trajectory = segments[0]
if accepted:
self.extra_fw_frames = len_pre_trim - len(trajectory)
elif direction < 0:
segments = self.bw_ensemble.split(trajectory)
if len(segments) > 0 or accepted:
trajectory = segments[-1]
if accepted:
self.extra_bw_frames = len_pre_trim - len(trajectory)
if options.trim and options.full_trajectory:
# all segments that could be generated by TPS are identified by
# the full_ensemble.split, and the one that has the shooting
# point in it is the one that we actually identify
segments = self.full_ensemble.split(trajectory)
shooting_index_in_trial = int(splitted[4])
shooting_frame = trajectory[shooting_index_in_trial]
shooting_segments = [seg for seg in segments
if shooting_frame in seg]
if len(shooting_segments) > 1: # pragma: no cover
raise RuntimeError("Your shooting point appears more"
+ " than once!")
new_traj = shooting_segments[0]
new_extra_bw_frames = trajectory.index(new_traj[0])
final_frame_index = trajectory.index(new_traj[-1])
new_extra_fw_frames = len(trajectory) - final_frame_index - 1
trajectory = new_traj
if accepted:
self.extra_bw_frames = new_extra_bw_frames
self.extra_fw_frames = new_extra_fw_frames
if options.retrim_shooting:
if shooting_index >= 0:
shooting_index -= old_extra_bw_frames
else:
shooting_index += old_extra_fw_frames
# if this is a full trajectory, cut it down to one-way segments
if options.full_trajectory:
shooting_index_in_trial = int(splitted[4])
if options.retrim_shooting:
if shooting_index_in_trial >= 0:
shooting_index_in_trial -= new_extra_bw_frames
else:
shooting_index_in_trial += new_extra_fw_frames
shoot_pt = 0 if options.includes_shooting_point else 1
if direction > 0:
trajectory = trajectory[shooting_index_in_trial+shoot_pt:]
elif direction < 0:
trajectory = trajectory[0:shooting_index_in_trial+1-shoot_pt]
# remove shooting point from trial, if necessary
if options.includes_shooting_point:
if direction > 0:
trajectory = trajectory[1:]
else:
trajectory = trajectory[:-1]
return (replica, trajectory, shooting_index, accepted, direction)
[docs] def run(self, summary_file_name, n_trajs_per_block=None):
# this will basically create the move_info_list for part of the
# summary_file, and then call super's RUN
summary = open(summary_file_name, 'r')
if self.summary_root_dir is None:
self.summary_root_dir = os.path.dirname(summary_file_name)
lines = [l for l in summary]
n_steps = len(lines)
if n_trajs_per_block is None:
n_trajs_per_block = n_steps
line_num = 0
while line_num < n_steps:
if self.report_progress is not None:
refresh_output("Working on MC step " + str(line_num) + "\n",
output_stream=self.report_progress)
end = min(line_num + n_trajs_per_block, n_steps)
block = lines[line_num:end]
moves = [self.parse_summary_line(l) for l in block]
super(OneWayTPSConverter, self).run(moves)
line_num += n_trajs_per_block
[docs]class GromacsOneWayTPSConverter(OneWayTPSConverter):
def __init__(self, storage, network, initial_file, topology_file,
options=None, options_rejected=None):
self.topology_file = topology_file
mover = oink.ShootingStub(ensemble=network.sampling_ensembles[0],
selector=paths.UniformSelector(),
pre_joined=False)
super(GromacsOneWayTPSConverter, self).__init__(
storage=storage, network=network, initial_file=initial_file,
mover=mover, options=options, options_rejected=options_rejected
)
[docs] def load_trajectory(self, file_name):
"""Creates an OPS trajectory from the given file"""
return ops_load_trajectory(file_name, top=self.topology_file)