ar08's picture
Upload 1040 files
246d201 verified
import os
import re
import tempfile
from abc import ABC, abstractmethod
from openhands_aci.utils.diff import get_diff
from openhands.core.config import AppConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events.action import (
FileEditAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.event import FileEditSource
from openhands.events.observation import (
ErrorObservation,
FileEditObservation,
FileReadObservation,
FileWriteObservation,
Observation,
)
from openhands.linter import DefaultLinter
from openhands.llm.llm import LLM
from openhands.llm.metrics import Metrics
from openhands.utils.chunk_localizer import Chunk, get_top_k_chunk_matches
SYS_MSG = """Your job is to produce a new version of the file based on the old version and the
provided draft of the new version. The provided draft may be incomplete (it may skip lines) and/or incorrectly indented. You should try to apply the changes present in the draft to the old version, and output a new version of the file.
NOTE:
- The output file should be COMPLETE and CORRECTLY INDENTED. Do not omit any lines, and do not change any lines that are not part of the changes.
- You should output the new version of the file by wrapping the new version of the file content in a ``` block.
- If there's no explicit comment to remove the existing code, we should keep them and append the new code to the end of the file.
- If there's placeholder comments like `# no changes before` or `# no changes here`, we should replace these comments with the original code near the placeholder comments.
"""
USER_MSG = """
HERE IS THE OLD VERSION OF THE FILE:
```
{old_contents}
```
HERE IS THE DRAFT OF THE NEW VERSION OF THE FILE:
```
{draft_changes}
```
GIVE ME THE NEW VERSION OF THE FILE.
IMPORTANT:
- There should be NO placeholder comments like `# no changes before` or `# no changes here`. They should be replaced with the original code near the placeholder comments.
- The output file should be COMPLETE and CORRECTLY INDENTED. Do not omit any lines, and do not change any lines that are not part of the changes.
""".strip()
def _extract_code(string):
pattern = r'```(?:\w*\n)?(.*?)```'
matches = re.findall(pattern, string, re.DOTALL)
if not matches:
return None
return matches[0]
def get_new_file_contents(
llm: LLM, old_contents: str, draft_changes: str, num_retries: int = 3
) -> str | None:
while num_retries > 0:
messages = [
{'role': 'system', 'content': SYS_MSG},
{
'role': 'user',
'content': USER_MSG.format(
old_contents=old_contents, draft_changes=draft_changes
),
},
]
resp = llm.completion(messages=messages)
new_contents = _extract_code(resp['choices'][0]['message']['content'])
if new_contents is not None:
return new_contents
num_retries -= 1
return None
class FileEditRuntimeInterface(ABC):
config: AppConfig
@abstractmethod
def read(self, action: FileReadAction) -> Observation:
pass
@abstractmethod
def write(self, action: FileWriteAction) -> Observation:
pass
@abstractmethod
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
pass
class FileEditRuntimeMixin(FileEditRuntimeInterface):
# Most LLMs have output token limit of 4k tokens.
# This restricts the number of lines we can edit to avoid exceeding the token limit.
MAX_LINES_TO_EDIT = 300
def __init__(self, enable_llm_editor: bool, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enable_llm_editor = enable_llm_editor
if not self.enable_llm_editor:
return
draft_editor_config = self.config.get_llm_config('draft_editor')
# manually set the model name for the draft editor LLM to distinguish token costs
llm_metrics = Metrics(model_name='draft_editor:' + draft_editor_config.model)
if draft_editor_config.caching_prompt:
logger.debug(
'It is not recommended to cache draft editor LLM prompts as it may incur high costs for the same prompt. '
'Automatically setting caching_prompt=false.'
)
draft_editor_config.caching_prompt = False
self.draft_editor_llm = LLM(draft_editor_config, metrics=llm_metrics)
logger.debug(
f'[Draft edit functionality] enabled with LLM: {self.draft_editor_llm}'
)
def _validate_range(
self, start: int, end: int, total_lines: int
) -> Observation | None:
# start and end are 1-indexed and inclusive
if (
(start < 1 and start != -1)
or start > total_lines
or (start > end and end != -1 and start != -1)
):
return ErrorObservation(
f'Invalid range for editing: start={start}, end={end}, total lines={total_lines}. start must be >= 1 and <={total_lines} (total lines of the edited file), start <= end, or start == -1 (append to the end of the file).'
)
if (
(end < 1 and end != -1)
or end > total_lines
or (end < start and start != -1 and end != -1)
):
return ErrorObservation(
f'Invalid range for editing: start={start}, end={end}, total lines={total_lines}. end must be >= 1 and <= {total_lines} (total lines of the edited file), end >= start, or end == -1 (to edit till the end of the file).'
)
return None
def _get_lint_error(
self,
suffix: str,
old_content: str,
new_content: str,
filepath: str,
diff: str,
) -> ErrorObservation | None:
linter = DefaultLinter()
# Copy the original file to a temporary file (with the same ext) and lint it
with (
tempfile.NamedTemporaryFile(
suffix=suffix, mode='w+', encoding='utf-8'
) as original_file_copy,
tempfile.NamedTemporaryFile(
suffix=suffix, mode='w+', encoding='utf-8'
) as updated_file_copy,
):
# Lint the original file
original_file_copy.write(old_content)
original_file_copy.flush()
# Lint the updated file
updated_file_copy.write(new_content)
updated_file_copy.flush()
updated_lint_error = linter.lint_file_diff(
original_file_copy.name, updated_file_copy.name
)
if len(updated_lint_error) > 0:
_obs = FileEditObservation(
content=diff,
path=filepath,
prev_exist=True,
old_content=old_content,
new_content=new_content,
)
error_message = (
(
f'\n[Linting failed for edited file {filepath}. {len(updated_lint_error)} lint errors found.]\n'
'[begin attempted changes]\n'
f'{_obs.visualize_diff(change_applied=False)}\n'
'[end attempted changes]\n'
)
+ '-' * 40
+ '\n'
)
error_message += '-' * 20 + 'First 5 lint errors' + '-' * 20 + '\n'
for i, lint_error in enumerate(updated_lint_error[:5]):
error_message += f'[begin lint error {i}]\n'
error_message += lint_error.visualize().strip() + '\n'
error_message += f'[end lint error {i}]\n'
error_message += '-' * 40 + '\n'
return ErrorObservation(error_message)
return None
def edit(self, action: FileEditAction) -> Observation:
if action.impl_source == FileEditSource.OH_ACI:
# Translate to ipython command to file_editor
return self.run_ipython(
IPythonRunCellAction(
code=action.translated_ipython_code,
include_extra=False,
)
)
obs = self.read(FileReadAction(path=action.path))
if (
isinstance(obs, ErrorObservation)
and 'File not found'.lower() in obs.content.lower()
):
logger.debug(
f'Agent attempted to edit a file that does not exist. Creating the file. Error msg: {obs.content}'
)
# directly write the new content
obs = self.write(
FileWriteAction(path=action.path, content=action.content.strip())
)
if isinstance(obs, ErrorObservation):
return obs
if not isinstance(obs, FileWriteObservation):
raise ValueError(
f'Expected FileWriteObservation, got {type(obs)}: {str(obs)}'
)
return FileEditObservation(
content=get_diff('', action.content, action.path),
path=action.path,
prev_exist=False,
old_content='',
new_content=action.content,
)
if not isinstance(obs, FileReadObservation):
raise ValueError(
f'Expected FileReadObservation, got {type(obs)}: {str(obs)}'
)
original_file_content = obs.content
old_file_lines = original_file_content.split('\n')
# NOTE: start and end are 1-indexed
start = action.start
end = action.end
# validate the range
error = self._validate_range(start, end, len(old_file_lines))
if error is not None:
return error
# append to the end of the file
if start == -1:
updated_content = '\n'.join(old_file_lines + action.content.split('\n'))
diff = get_diff(original_file_content, updated_content, action.path)
# Lint the updated content
if self.config.sandbox.enable_auto_lint:
suffix = os.path.splitext(action.path)[1]
error_obs = self._get_lint_error(
suffix,
original_file_content,
updated_content,
action.path,
diff,
)
if error_obs is not None:
return error_obs
obs = self.write(FileWriteAction(path=action.path, content=updated_content))
return FileEditObservation(
content=diff,
path=action.path,
prev_exist=True,
old_content=original_file_content,
new_content=updated_content,
)
# Get the 0-indexed start and end
start_idx = start - 1
if end != -1:
# remove 1 to make it 0-indexed
# then add 1 since the `end` is inclusive
end_idx = end - 1 + 1
else:
# end == -1 means the user wants to edit till the end of the file
end_idx = len(old_file_lines)
# Get the range of lines to edit - reject if too long
length_of_range = end_idx - start_idx
if length_of_range > self.MAX_LINES_TO_EDIT + 1:
error_msg = (
f'[Edit error: The range of lines to edit is too long.]\n'
f'[The maximum number of lines allowed to edit at once is {self.MAX_LINES_TO_EDIT}. '
f'Got (L{start_idx + 1}-L{end_idx}) {length_of_range} lines.]\n' # [start_idx, end_idx), so no need to + 1
)
# search for relevant ranges to hint the agent
topk_chunks: list[Chunk] = get_top_k_chunk_matches(
text=original_file_content,
query=action.content, # edit draft as query
k=3,
max_chunk_size=20, # lines
)
error_msg += (
'Here are some snippets that maybe relevant to the provided edit.\n'
)
for i, chunk in enumerate(topk_chunks):
error_msg += f'[begin relevant snippet {i+1}. Line range: L{chunk.line_range[0]}-L{chunk.line_range[1]}. Similarity: {chunk.normalized_lcs}]\n'
error_msg += f'[Browse around it via `open_file("{action.path}", {(chunk.line_range[0] + chunk.line_range[1]) // 2})`]\n'
error_msg += chunk.visualize() + '\n'
error_msg += f'[end relevant snippet {i+1}]\n'
error_msg += '-' * 40 + '\n'
error_msg += 'Consider using `open_file` to explore around the relevant snippets if needed.\n'
error_msg += f'**IMPORTANT**: Please REDUCE the range of edits to less than {self.MAX_LINES_TO_EDIT} lines by setting `start` and `end` in the edit action (e.g. `<file_edit path="{action.path}" start=[PUT LINE NUMBER HERE] end=[PUT LINE NUMBER HERE] />`). '
return ErrorObservation(error_msg)
content_to_edit = '\n'.join(old_file_lines[start_idx:end_idx])
self.draft_editor_llm.reset()
_edited_content = get_new_file_contents(
self.draft_editor_llm, content_to_edit, action.content
)
if _edited_content is None:
ret_err = ErrorObservation(
'Failed to get new file contents. '
'Please try to reduce the number of edits and try again.'
)
ret_err.llm_metrics = self.draft_editor_llm.metrics
return ret_err
# piece the updated content with the unchanged content
updated_lines = (
old_file_lines[:start_idx]
+ _edited_content.split('\n')
+ old_file_lines[end_idx:]
)
updated_content = '\n'.join(updated_lines)
diff = get_diff(original_file_content, updated_content, action.path)
# Lint the updated content
if self.config.sandbox.enable_auto_lint:
suffix = os.path.splitext(action.path)[1]
error_obs = self._get_lint_error(
suffix, original_file_content, updated_content, action.path, diff
)
if error_obs is not None:
error_obs.llm_metrics = self.draft_editor_llm.metrics
return error_obs
obs = self.write(FileWriteAction(path=action.path, content=updated_content))
ret_obs = FileEditObservation(
content=diff,
path=action.path,
prev_exist=True,
old_content=original_file_content,
new_content=updated_content,
)
ret_obs.llm_metrics = self.draft_editor_llm.metrics
return ret_obs