|
import os
|
|
import re
|
|
import tempfile
|
|
from abc import ABC, abstractmethod
|
|
|
|
from openhands_aci.utils.diff import get_diff
|
|
|
|
from openhands.core.config import AppConfig
|
|
from openhands.core.logger import openhands_logger as logger
|
|
from openhands.events.action import (
|
|
FileEditAction,
|
|
FileReadAction,
|
|
FileWriteAction,
|
|
IPythonRunCellAction,
|
|
)
|
|
from openhands.events.event import FileEditSource
|
|
from openhands.events.observation import (
|
|
ErrorObservation,
|
|
FileEditObservation,
|
|
FileReadObservation,
|
|
FileWriteObservation,
|
|
Observation,
|
|
)
|
|
from openhands.linter import DefaultLinter
|
|
from openhands.llm.llm import LLM
|
|
from openhands.llm.metrics import Metrics
|
|
from openhands.utils.chunk_localizer import Chunk, get_top_k_chunk_matches
|
|
|
|
SYS_MSG = """Your job is to produce a new version of the file based on the old version and the
|
|
provided draft of the new version. The provided draft may be incomplete (it may skip lines) and/or incorrectly indented. You should try to apply the changes present in the draft to the old version, and output a new version of the file.
|
|
NOTE:
|
|
- The output file should be COMPLETE and CORRECTLY INDENTED. Do not omit any lines, and do not change any lines that are not part of the changes.
|
|
- You should output the new version of the file by wrapping the new version of the file content in a ``` block.
|
|
- If there's no explicit comment to remove the existing code, we should keep them and append the new code to the end of the file.
|
|
- If there's placeholder comments like `# no changes before` or `# no changes here`, we should replace these comments with the original code near the placeholder comments.
|
|
"""
|
|
|
|
USER_MSG = """
|
|
HERE IS THE OLD VERSION OF THE FILE:
|
|
```
|
|
{old_contents}
|
|
```
|
|
|
|
HERE IS THE DRAFT OF THE NEW VERSION OF THE FILE:
|
|
```
|
|
{draft_changes}
|
|
```
|
|
|
|
GIVE ME THE NEW VERSION OF THE FILE.
|
|
IMPORTANT:
|
|
- There should be NO placeholder comments like `# no changes before` or `# no changes here`. They should be replaced with the original code near the placeholder comments.
|
|
- The output file should be COMPLETE and CORRECTLY INDENTED. Do not omit any lines, and do not change any lines that are not part of the changes.
|
|
""".strip()
|
|
|
|
|
|
def _extract_code(string):
|
|
pattern = r'```(?:\w*\n)?(.*?)```'
|
|
matches = re.findall(pattern, string, re.DOTALL)
|
|
if not matches:
|
|
return None
|
|
return matches[0]
|
|
|
|
|
|
def get_new_file_contents(
|
|
llm: LLM, old_contents: str, draft_changes: str, num_retries: int = 3
|
|
) -> str | None:
|
|
while num_retries > 0:
|
|
messages = [
|
|
{'role': 'system', 'content': SYS_MSG},
|
|
{
|
|
'role': 'user',
|
|
'content': USER_MSG.format(
|
|
old_contents=old_contents, draft_changes=draft_changes
|
|
),
|
|
},
|
|
]
|
|
resp = llm.completion(messages=messages)
|
|
new_contents = _extract_code(resp['choices'][0]['message']['content'])
|
|
if new_contents is not None:
|
|
return new_contents
|
|
num_retries -= 1
|
|
return None
|
|
|
|
|
|
class FileEditRuntimeInterface(ABC):
|
|
config: AppConfig
|
|
|
|
@abstractmethod
|
|
def read(self, action: FileReadAction) -> Observation:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def write(self, action: FileWriteAction) -> Observation:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
|
|
pass
|
|
|
|
|
|
class FileEditRuntimeMixin(FileEditRuntimeInterface):
|
|
|
|
|
|
MAX_LINES_TO_EDIT = 300
|
|
|
|
def __init__(self, enable_llm_editor: bool, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.enable_llm_editor = enable_llm_editor
|
|
|
|
if not self.enable_llm_editor:
|
|
return
|
|
|
|
draft_editor_config = self.config.get_llm_config('draft_editor')
|
|
|
|
|
|
llm_metrics = Metrics(model_name='draft_editor:' + draft_editor_config.model)
|
|
if draft_editor_config.caching_prompt:
|
|
logger.debug(
|
|
'It is not recommended to cache draft editor LLM prompts as it may incur high costs for the same prompt. '
|
|
'Automatically setting caching_prompt=false.'
|
|
)
|
|
draft_editor_config.caching_prompt = False
|
|
|
|
self.draft_editor_llm = LLM(draft_editor_config, metrics=llm_metrics)
|
|
logger.debug(
|
|
f'[Draft edit functionality] enabled with LLM: {self.draft_editor_llm}'
|
|
)
|
|
|
|
def _validate_range(
|
|
self, start: int, end: int, total_lines: int
|
|
) -> Observation | None:
|
|
|
|
if (
|
|
(start < 1 and start != -1)
|
|
or start > total_lines
|
|
or (start > end and end != -1 and start != -1)
|
|
):
|
|
return ErrorObservation(
|
|
f'Invalid range for editing: start={start}, end={end}, total lines={total_lines}. start must be >= 1 and <={total_lines} (total lines of the edited file), start <= end, or start == -1 (append to the end of the file).'
|
|
)
|
|
if (
|
|
(end < 1 and end != -1)
|
|
or end > total_lines
|
|
or (end < start and start != -1 and end != -1)
|
|
):
|
|
return ErrorObservation(
|
|
f'Invalid range for editing: start={start}, end={end}, total lines={total_lines}. end must be >= 1 and <= {total_lines} (total lines of the edited file), end >= start, or end == -1 (to edit till the end of the file).'
|
|
)
|
|
return None
|
|
|
|
def _get_lint_error(
|
|
self,
|
|
suffix: str,
|
|
old_content: str,
|
|
new_content: str,
|
|
filepath: str,
|
|
diff: str,
|
|
) -> ErrorObservation | None:
|
|
linter = DefaultLinter()
|
|
|
|
with (
|
|
tempfile.NamedTemporaryFile(
|
|
suffix=suffix, mode='w+', encoding='utf-8'
|
|
) as original_file_copy,
|
|
tempfile.NamedTemporaryFile(
|
|
suffix=suffix, mode='w+', encoding='utf-8'
|
|
) as updated_file_copy,
|
|
):
|
|
|
|
original_file_copy.write(old_content)
|
|
original_file_copy.flush()
|
|
|
|
|
|
updated_file_copy.write(new_content)
|
|
updated_file_copy.flush()
|
|
|
|
updated_lint_error = linter.lint_file_diff(
|
|
original_file_copy.name, updated_file_copy.name
|
|
)
|
|
|
|
if len(updated_lint_error) > 0:
|
|
_obs = FileEditObservation(
|
|
content=diff,
|
|
path=filepath,
|
|
prev_exist=True,
|
|
old_content=old_content,
|
|
new_content=new_content,
|
|
)
|
|
error_message = (
|
|
(
|
|
f'\n[Linting failed for edited file {filepath}. {len(updated_lint_error)} lint errors found.]\n'
|
|
'[begin attempted changes]\n'
|
|
f'{_obs.visualize_diff(change_applied=False)}\n'
|
|
'[end attempted changes]\n'
|
|
)
|
|
+ '-' * 40
|
|
+ '\n'
|
|
)
|
|
error_message += '-' * 20 + 'First 5 lint errors' + '-' * 20 + '\n'
|
|
for i, lint_error in enumerate(updated_lint_error[:5]):
|
|
error_message += f'[begin lint error {i}]\n'
|
|
error_message += lint_error.visualize().strip() + '\n'
|
|
error_message += f'[end lint error {i}]\n'
|
|
error_message += '-' * 40 + '\n'
|
|
return ErrorObservation(error_message)
|
|
return None
|
|
|
|
def edit(self, action: FileEditAction) -> Observation:
|
|
if action.impl_source == FileEditSource.OH_ACI:
|
|
|
|
return self.run_ipython(
|
|
IPythonRunCellAction(
|
|
code=action.translated_ipython_code,
|
|
include_extra=False,
|
|
)
|
|
)
|
|
|
|
obs = self.read(FileReadAction(path=action.path))
|
|
if (
|
|
isinstance(obs, ErrorObservation)
|
|
and 'File not found'.lower() in obs.content.lower()
|
|
):
|
|
logger.debug(
|
|
f'Agent attempted to edit a file that does not exist. Creating the file. Error msg: {obs.content}'
|
|
)
|
|
|
|
obs = self.write(
|
|
FileWriteAction(path=action.path, content=action.content.strip())
|
|
)
|
|
if isinstance(obs, ErrorObservation):
|
|
return obs
|
|
if not isinstance(obs, FileWriteObservation):
|
|
raise ValueError(
|
|
f'Expected FileWriteObservation, got {type(obs)}: {str(obs)}'
|
|
)
|
|
return FileEditObservation(
|
|
content=get_diff('', action.content, action.path),
|
|
path=action.path,
|
|
prev_exist=False,
|
|
old_content='',
|
|
new_content=action.content,
|
|
)
|
|
if not isinstance(obs, FileReadObservation):
|
|
raise ValueError(
|
|
f'Expected FileReadObservation, got {type(obs)}: {str(obs)}'
|
|
)
|
|
|
|
original_file_content = obs.content
|
|
old_file_lines = original_file_content.split('\n')
|
|
|
|
start = action.start
|
|
end = action.end
|
|
|
|
error = self._validate_range(start, end, len(old_file_lines))
|
|
if error is not None:
|
|
return error
|
|
|
|
|
|
if start == -1:
|
|
updated_content = '\n'.join(old_file_lines + action.content.split('\n'))
|
|
diff = get_diff(original_file_content, updated_content, action.path)
|
|
|
|
if self.config.sandbox.enable_auto_lint:
|
|
suffix = os.path.splitext(action.path)[1]
|
|
|
|
error_obs = self._get_lint_error(
|
|
suffix,
|
|
original_file_content,
|
|
updated_content,
|
|
action.path,
|
|
diff,
|
|
)
|
|
if error_obs is not None:
|
|
return error_obs
|
|
|
|
obs = self.write(FileWriteAction(path=action.path, content=updated_content))
|
|
return FileEditObservation(
|
|
content=diff,
|
|
path=action.path,
|
|
prev_exist=True,
|
|
old_content=original_file_content,
|
|
new_content=updated_content,
|
|
)
|
|
|
|
|
|
start_idx = start - 1
|
|
if end != -1:
|
|
|
|
|
|
end_idx = end - 1 + 1
|
|
else:
|
|
|
|
end_idx = len(old_file_lines)
|
|
|
|
|
|
length_of_range = end_idx - start_idx
|
|
if length_of_range > self.MAX_LINES_TO_EDIT + 1:
|
|
error_msg = (
|
|
f'[Edit error: The range of lines to edit is too long.]\n'
|
|
f'[The maximum number of lines allowed to edit at once is {self.MAX_LINES_TO_EDIT}. '
|
|
f'Got (L{start_idx + 1}-L{end_idx}) {length_of_range} lines.]\n'
|
|
)
|
|
|
|
topk_chunks: list[Chunk] = get_top_k_chunk_matches(
|
|
text=original_file_content,
|
|
query=action.content,
|
|
k=3,
|
|
max_chunk_size=20,
|
|
)
|
|
error_msg += (
|
|
'Here are some snippets that maybe relevant to the provided edit.\n'
|
|
)
|
|
for i, chunk in enumerate(topk_chunks):
|
|
error_msg += f'[begin relevant snippet {i+1}. Line range: L{chunk.line_range[0]}-L{chunk.line_range[1]}. Similarity: {chunk.normalized_lcs}]\n'
|
|
error_msg += f'[Browse around it via `open_file("{action.path}", {(chunk.line_range[0] + chunk.line_range[1]) // 2})`]\n'
|
|
error_msg += chunk.visualize() + '\n'
|
|
error_msg += f'[end relevant snippet {i+1}]\n'
|
|
error_msg += '-' * 40 + '\n'
|
|
|
|
error_msg += 'Consider using `open_file` to explore around the relevant snippets if needed.\n'
|
|
error_msg += f'**IMPORTANT**: Please REDUCE the range of edits to less than {self.MAX_LINES_TO_EDIT} lines by setting `start` and `end` in the edit action (e.g. `<file_edit path="{action.path}" start=[PUT LINE NUMBER HERE] end=[PUT LINE NUMBER HERE] />`). '
|
|
|
|
return ErrorObservation(error_msg)
|
|
|
|
content_to_edit = '\n'.join(old_file_lines[start_idx:end_idx])
|
|
self.draft_editor_llm.reset()
|
|
_edited_content = get_new_file_contents(
|
|
self.draft_editor_llm, content_to_edit, action.content
|
|
)
|
|
if _edited_content is None:
|
|
ret_err = ErrorObservation(
|
|
'Failed to get new file contents. '
|
|
'Please try to reduce the number of edits and try again.'
|
|
)
|
|
ret_err.llm_metrics = self.draft_editor_llm.metrics
|
|
return ret_err
|
|
|
|
|
|
updated_lines = (
|
|
old_file_lines[:start_idx]
|
|
+ _edited_content.split('\n')
|
|
+ old_file_lines[end_idx:]
|
|
)
|
|
updated_content = '\n'.join(updated_lines)
|
|
diff = get_diff(original_file_content, updated_content, action.path)
|
|
|
|
|
|
if self.config.sandbox.enable_auto_lint:
|
|
suffix = os.path.splitext(action.path)[1]
|
|
error_obs = self._get_lint_error(
|
|
suffix, original_file_content, updated_content, action.path, diff
|
|
)
|
|
if error_obs is not None:
|
|
error_obs.llm_metrics = self.draft_editor_llm.metrics
|
|
return error_obs
|
|
|
|
obs = self.write(FileWriteAction(path=action.path, content=updated_content))
|
|
ret_obs = FileEditObservation(
|
|
content=diff,
|
|
path=action.path,
|
|
prev_exist=True,
|
|
old_content=original_file_content,
|
|
new_content=updated_content,
|
|
)
|
|
ret_obs.llm_metrics = self.draft_editor_llm.metrics
|
|
return ret_obs
|
|
|