File size: 2,090 Bytes
246d201 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
from openhands.core.logger import openhands_logger as logger
from openhands.events.action.action import Action
from openhands.events.event import Event, EventSource
class ReplayManager:
"""ReplayManager manages the lifecycle of a replay session of a given trajectory.
Replay manager keeps track of a list of events, replays actions, and ignore
messages and observations. It could lead to unexpected or even errorneous
results if any action is non-deterministic, or if the initial state before
the replay session is different from the initial state of the trajectory.
"""
def __init__(self, replay_events: list[Event] | None):
if replay_events:
logger.info(f'Replay logs loaded, events length = {len(replay_events)}')
self.replay_events = replay_events
self.replay_mode = bool(replay_events)
self.replay_index = 0
def _replayable(self) -> bool:
return (
self.replay_events is not None
and self.replay_index < len(self.replay_events)
and isinstance(self.replay_events[self.replay_index], Action)
and self.replay_events[self.replay_index].source != EventSource.USER
)
def should_replay(self) -> bool:
"""
Whether the controller is in trajectory replay mode, and the replay
hasn't finished. Note: after the replay is finished, the user and
the agent could continue to message/act.
This method also moves "replay_index" to the next action, if applicable.
"""
if not self.replay_mode:
return False
assert self.replay_events is not None
while self.replay_index < len(self.replay_events) and not self._replayable():
self.replay_index += 1
return self._replayable()
def step(self) -> Action:
assert self.replay_events is not None
event = self.replay_events[self.replay_index]
assert isinstance(event, Action)
self.replay_index += 1
return event
|