|
import json
|
|
import logging
|
|
import multiprocessing as mp
|
|
import os
|
|
from typing import Callable
|
|
|
|
import pandas as pd
|
|
|
|
from openhands.controller.state.state import State
|
|
from openhands.core.logger import get_console_handler
|
|
from openhands.core.logger import openhands_logger as logger
|
|
from openhands.events.action import Action
|
|
from openhands.events.action.message import MessageAction
|
|
|
|
|
|
def codeact_user_response(
|
|
state: State,
|
|
encapsulate_solution: bool = False,
|
|
try_parse: Callable[[Action | None], str] | None = None,
|
|
) -> str:
|
|
encaps_str = (
|
|
(
|
|
'Please encapsulate your final answer (answer ONLY) within <solution> and </solution>.\n'
|
|
'For example: The answer to the question is <solution> 42 </solution>.\n'
|
|
)
|
|
if encapsulate_solution
|
|
else ''
|
|
)
|
|
msg = (
|
|
'Please continue working on the task on whatever approach you think is suitable.\n'
|
|
'If you think you have solved the task, please first send your answer to user through message and then finish the interaction.\n'
|
|
f'{encaps_str}'
|
|
'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP.\n'
|
|
)
|
|
|
|
if state.history:
|
|
|
|
if try_parse is not None:
|
|
last_action = next(
|
|
(
|
|
event
|
|
for event in reversed(state.history)
|
|
if isinstance(event, Action)
|
|
),
|
|
None,
|
|
)
|
|
ans = try_parse(last_action)
|
|
if ans is not None:
|
|
return '/exit'
|
|
|
|
|
|
user_msgs = [
|
|
event
|
|
for event in state.history
|
|
if isinstance(event, MessageAction) and event.source == 'user'
|
|
]
|
|
if len(user_msgs) >= 2:
|
|
|
|
return (
|
|
msg
|
|
+ 'If you want to give up, run: <execute_bash> exit </execute_bash>.\n'
|
|
)
|
|
return msg
|
|
|
|
|
|
def cleanup():
|
|
print('Cleaning up child processes...')
|
|
for process in mp.active_children():
|
|
print(f'Terminating child process: {process.name}')
|
|
process.terminate()
|
|
process.join()
|
|
|
|
|
|
def prepare_dataset(dataset: pd.DataFrame, output_file: str, eval_n_limit: int):
|
|
assert 'instance_id' in dataset.columns, (
|
|
"Expected 'instance_id' column in the dataset. You should define your own "
|
|
"unique identifier for each instance and use it as the 'instance_id' column."
|
|
)
|
|
id_column = 'instance_id'
|
|
logger.info(f'Writing evaluation output to {output_file}')
|
|
finished_ids = set()
|
|
if os.path.exists(output_file):
|
|
with open(output_file, 'r') as f:
|
|
for line in f:
|
|
data = json.loads(line)
|
|
finished_ids.add(data[id_column])
|
|
logger.warning(
|
|
f'Output file {output_file} already exists. Loaded '
|
|
f'{len(finished_ids)} finished instances.'
|
|
)
|
|
|
|
if eval_n_limit:
|
|
dataset = dataset.head(eval_n_limit)
|
|
logger.info(f'Limiting evaluation to first {eval_n_limit} instances.')
|
|
|
|
new_dataset = [
|
|
instance
|
|
for _, instance in dataset.iterrows()
|
|
if instance[id_column] not in finished_ids
|
|
]
|
|
logger.info(
|
|
f'Finished instances: {len(finished_ids)}, '
|
|
f'Remaining instances: {len(new_dataset)}'
|
|
)
|
|
|
|
return pd.DataFrame(new_dataset)
|
|
|
|
|
|
def reset_logger_for_multiprocessing(
|
|
logger: logging.Logger, instance_id: str, log_dir: str
|
|
):
|
|
"""Reset the logger for multiprocessing.
|
|
|
|
Save logs to a separate file for each process, instead of trying to write to the
|
|
same file/console from multiple processes.
|
|
"""
|
|
|
|
log_file = os.path.join(
|
|
log_dir,
|
|
f'instance_{instance_id}.log',
|
|
)
|
|
|
|
for handler in logger.handlers[:]:
|
|
logger.removeHandler(handler)
|
|
|
|
logger.addHandler(get_console_handler())
|
|
logger.info(
|
|
f'Starting resolver for instance {instance_id}.\n'
|
|
f'Hint: run "tail -f {log_file}" to see live logs in a separate shell'
|
|
)
|
|
|
|
for handler in logger.handlers[:]:
|
|
logger.removeHandler(handler)
|
|
os.makedirs(os.path.dirname(log_file), exist_ok=True)
|
|
file_handler = logging.FileHandler(log_file)
|
|
file_handler.setFormatter(
|
|
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
)
|
|
logger.addHandler(file_handler)
|
|
|