ar08's picture
Upload 1040 files
246d201 verified
from openhands.controller.state.state import State
from openhands.core.logger import openhands_logger as logger
from openhands.events.action.action import Action
from openhands.events.action.commands import IPythonRunCellAction
from openhands.events.action.empty import NullAction
from openhands.events.action.message import MessageAction
from openhands.events.event import Event, EventSource
from openhands.events.observation import (
CmdOutputObservation,
IPythonRunCellObservation,
)
from openhands.events.observation.empty import NullObservation
from openhands.events.observation.error import ErrorObservation
from openhands.events.observation.observation import Observation
class StuckDetector:
SYNTAX_ERROR_MESSAGES = [
'SyntaxError: unterminated string literal (detected at line',
'SyntaxError: invalid syntax. Perhaps you forgot a comma?',
'SyntaxError: incomplete input',
]
def __init__(self, state: State):
self.state = state
def is_stuck(self, headless_mode: bool = True):
"""Checks if the agent is stuck in a loop.
Args:
headless_mode: Matches AgentController's headless_mode.
If True: Consider all history (automated/testing)
If False: Consider only history after last user message (interactive)
Returns:
bool: True if the agent is stuck in a loop, False otherwise.
"""
if not headless_mode:
# In interactive mode, only look at history after the last user message
last_user_msg_idx = -1
for i, event in enumerate(reversed(self.state.history)):
if (
isinstance(event, MessageAction)
and event.source == EventSource.USER
):
last_user_msg_idx = len(self.state.history) - i - 1
break
history_to_check = self.state.history[last_user_msg_idx + 1 :]
else:
# In headless mode, look at all history
history_to_check = self.state.history
# Filter out user messages and null events
filtered_history = [
event
for event in history_to_check
if not (
# Filter works elegantly in both modes:
# - In headless: actively filters out user messages from full history
# - In non-headless: no-op since we already sliced after last user message
(isinstance(event, MessageAction) and event.source == EventSource.USER)
# there might be some NullAction or NullObservation in the history at least for now
or isinstance(event, (NullAction, NullObservation))
)
]
# it takes 3 actions minimum to detect a loop, otherwise nothing to do here
if len(filtered_history) < 3:
return False
# the first few scenarios detect 3 or 4 repeated steps
# prepare the last 4 actions and observations, to check them out
last_actions: list[Event] = []
last_observations: list[Event] = []
# retrieve the last four actions and observations starting from the end of history, wherever they are
for event in reversed(filtered_history):
if isinstance(event, Action) and len(last_actions) < 4:
last_actions.append(event)
elif isinstance(event, Observation) and len(last_observations) < 4:
last_observations.append(event)
if len(last_actions) == 4 and len(last_observations) == 4:
break
# scenario 1: same action, same observation
if self._is_stuck_repeating_action_observation(last_actions, last_observations):
return True
# scenario 2: same action, errors
if self._is_stuck_repeating_action_error(last_actions, last_observations):
return True
# scenario 3: monologue
if self._is_stuck_monologue(filtered_history):
return True
# scenario 4: action, observation pattern on the last six steps
if len(filtered_history) < 6:
return False
if self._is_stuck_action_observation_pattern(filtered_history):
return True
return False
def _is_stuck_repeating_action_observation(self, last_actions, last_observations):
# scenario 1: same action, same observation
# it takes 4 actions and 4 observations to detect a loop
# assert len(last_actions) == 4 and len(last_observations) == 4
# Check for a loop of 4 identical action-observation pairs
if len(last_actions) == 4 and len(last_observations) == 4:
actions_equal = all(
self._eq_no_pid(last_actions[0], action) for action in last_actions
)
observations_equal = all(
self._eq_no_pid(last_observations[0], observation)
for observation in last_observations
)
if actions_equal and observations_equal:
logger.warning('Action, Observation loop detected')
return True
return False
def _is_stuck_repeating_action_error(self, last_actions, last_observations):
# scenario 2: same action, errors
# it takes 3 actions and 3 observations to detect a loop
# check if the last three actions are the same and result in errors
if len(last_actions) < 4 or len(last_observations) < 4:
return False
# are the last three actions the "same"?
if all(self._eq_no_pid(last_actions[0], action) for action in last_actions[:3]):
# and the last three observations are all errors?
if all(isinstance(obs, ErrorObservation) for obs in last_observations[:3]):
logger.warning('Action, ErrorObservation loop detected')
return True
# or, are the last three observations all IPythonRunCellObservation with SyntaxError?
elif all(
isinstance(obs, IPythonRunCellObservation)
for obs in last_observations[:3]
):
warning = 'Action, IPythonRunCellObservation loop detected'
for error_message in self.SYNTAX_ERROR_MESSAGES:
if error_message.startswith(
'SyntaxError: unterminated string literal (detected at line'
):
if self._check_for_consistent_line_error(
last_observations[:3], error_message
):
logger.warning(warning)
return True
elif error_message in (
'SyntaxError: invalid syntax. Perhaps you forgot a comma?',
'SyntaxError: incomplete input',
) and self._check_for_consistent_invalid_syntax(
last_observations[:3], error_message
):
logger.warning(warning)
return True
return False
def _check_for_consistent_invalid_syntax(self, observations, error_message):
first_lines = []
valid_observations = []
for obs in observations:
content = obs.content
lines = content.strip().split('\n')
if len(lines) < 6: # 6 because a real syntax error has at least 6 lines
return False
line1 = lines[0].strip()
if not line1.startswith('Cell In[1], line'):
return False
first_lines.append(line1) # Store the first line of each observation
# Check last three lines
if (
lines[-1].startswith('[Jupyter Python interpreter:')
and lines[-2].startswith('[Jupyter current working directory:')
and error_message in lines[-3]
):
valid_observations.append(obs)
# Check if:
# 1. All first lines are identical
# 2. We have exactly 3 valid observations
# 3. The error message line is identical in all valid observations
return (
len(set(first_lines)) == 1
and len(valid_observations) == 3
and len(
set(
obs.content.strip().split('\n')[:-2][-1]
for obs in valid_observations
)
)
== 1
)
def _check_for_consistent_line_error(self, observations, error_message):
error_lines = []
for obs in observations:
content = obs.content
lines = content.strip().split('\n')
if len(lines) < 3:
return False
last_lines = lines[-3:]
# Check if the last two lines are our own
if not (
last_lines[-2].startswith('[Jupyter current working directory:')
and last_lines[-1].startswith('[Jupyter Python interpreter:')
):
return False
# Check for the error message in the 3rd-to-last line
if error_message in last_lines[-3]:
error_lines.append(last_lines[-3])
# Check if we found the error message in all 3 observations
# and the 3rd-to-last line is identical across all occurrences
return len(error_lines) == 3 and len(set(error_lines)) == 1
def _is_stuck_monologue(self, filtered_history):
# scenario 3: monologue
# check for repeated MessageActions with source=AGENT
# see if the agent is engaged in a good old monologue, telling itself the same thing over and over
agent_message_actions = [
(i, event)
for i, event in enumerate(filtered_history)
if isinstance(event, MessageAction) and event.source == EventSource.AGENT
]
# last three message actions will do for this check
if len(agent_message_actions) >= 3:
last_agent_message_actions = agent_message_actions[-3:]
if all(
(last_agent_message_actions[0][1] == action[1])
for action in last_agent_message_actions
):
# check if there are any observations between the repeated MessageActions
# then it's not yet a loop, maybe it can recover
start_index = last_agent_message_actions[0][0]
end_index = last_agent_message_actions[-1][0]
has_observation_between = False
for event in filtered_history[start_index + 1 : end_index]:
if isinstance(event, Observation):
has_observation_between = True
break
if not has_observation_between:
logger.warning('Repeated MessageAction with source=AGENT detected')
return True
return False
def _is_stuck_action_observation_pattern(self, filtered_history):
# scenario 4: action, observation pattern on the last six steps
# check if the agent repeats the same (Action, Observation)
# every other step in the last six steps
last_six_actions: list[Event] = []
last_six_observations: list[Event] = []
# the end of history is most interesting
for event in reversed(filtered_history):
if isinstance(event, Action) and len(last_six_actions) < 6:
last_six_actions.append(event)
elif isinstance(event, Observation) and len(last_six_observations) < 6:
last_six_observations.append(event)
if len(last_six_actions) == 6 and len(last_six_observations) == 6:
break
# this pattern is every other step, like:
# (action_1, obs_1), (action_2, obs_2), (action_1, obs_1), (action_2, obs_2),...
if len(last_six_actions) == 6 and len(last_six_observations) == 6:
actions_equal = (
# action_0 == action_2 == action_4
self._eq_no_pid(last_six_actions[0], last_six_actions[2])
and self._eq_no_pid(last_six_actions[0], last_six_actions[4])
# action_1 == action_3 == action_5
and self._eq_no_pid(last_six_actions[1], last_six_actions[3])
and self._eq_no_pid(last_six_actions[1], last_six_actions[5])
)
observations_equal = (
# obs_0 == obs_2 == obs_4
self._eq_no_pid(last_six_observations[0], last_six_observations[2])
and self._eq_no_pid(last_six_observations[0], last_six_observations[4])
# obs_1 == obs_3 == obs_5
and self._eq_no_pid(last_six_observations[1], last_six_observations[3])
and self._eq_no_pid(last_six_observations[1], last_six_observations[5])
)
if actions_equal and observations_equal:
logger.warning('Action, Observation pattern detected')
return True
return False
def _eq_no_pid(self, obj1, obj2):
if isinstance(obj1, IPythonRunCellAction) and isinstance(
obj2, IPythonRunCellAction
):
# for loop detection on edit actions, ignore the thought, compare some code
# the code should have at least 3 lines, to avoid simple one-liners
if (
'edit_file_by_replace(' in obj1.code
and 'edit_file_by_replace(' in obj2.code
):
return (
len(obj1.code.split('\n')) > 2
and obj1.code.split('\n')[:3] == obj2.code.split('\n')[:3]
)
else:
# default comparison
return obj1 == obj2
elif isinstance(obj1, CmdOutputObservation) and isinstance(
obj2, CmdOutputObservation
):
# for loop detection, ignore command_id, which is the pid
return obj1.command == obj2.command and obj1.exit_code == obj2.exit_code
else:
# this is the default comparison
return obj1 == obj2