|
import json
|
|
import os
|
|
import re
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, ClassVar
|
|
|
|
import jinja2
|
|
import requests
|
|
|
|
from openhands.core.config import LLMConfig
|
|
from openhands.core.logger import openhands_logger as logger
|
|
from openhands.events.event import Event
|
|
from openhands.llm.llm import LLM
|
|
from openhands.resolver.github_issue import GithubIssue, ReviewThread
|
|
|
|
|
|
class IssueHandlerInterface(ABC):
|
|
issue_type: ClassVar[str]
|
|
llm: LLM
|
|
|
|
@abstractmethod
|
|
def get_converted_issues(
|
|
self, issue_numbers: list[int] | None = None, comment_id: int | None = None
|
|
) -> list[GithubIssue]:
|
|
"""Download issues from GitHub."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_instruction(
|
|
self,
|
|
issue: GithubIssue,
|
|
prompt_template: str,
|
|
repo_instruction: str | None = None,
|
|
) -> tuple[str, list[str]]:
|
|
"""Generate instruction and image urls for the agent."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def guess_success(
|
|
self, issue: GithubIssue, history: list[Event], git_patch: str | None = None
|
|
) -> tuple[bool, list[bool] | None, str]:
|
|
"""Guess if the issue has been resolved based on the agent's output and git patch."""
|
|
pass
|
|
|
|
|
|
class IssueHandler(IssueHandlerInterface):
|
|
issue_type: ClassVar[str] = 'issue'
|
|
default_git_patch: ClassVar[str] = 'No changes made yet'
|
|
|
|
def __init__(self, owner: str, repo: str, token: str, llm_config: LLMConfig):
|
|
self.download_url = 'https://api.github.com/repos/{}/{}/issues'
|
|
self.owner = owner
|
|
self.repo = repo
|
|
self.token = token
|
|
self.llm = LLM(llm_config)
|
|
|
|
def _download_issues_from_github(self) -> list[Any]:
|
|
url = self.download_url.format(self.owner, self.repo)
|
|
headers = {
|
|
'Authorization': f'token {self.token}',
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
}
|
|
params: dict[str, int | str] = {'state': 'open', 'per_page': 100, 'page': 1}
|
|
all_issues = []
|
|
|
|
|
|
while True:
|
|
response = requests.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
issues = response.json()
|
|
|
|
|
|
if not issues:
|
|
break
|
|
|
|
|
|
if not isinstance(issues, list) or any(
|
|
[not isinstance(issue, dict) for issue in issues]
|
|
):
|
|
raise ValueError('Expected list of dictionaries from Github API.')
|
|
|
|
|
|
all_issues.extend(issues)
|
|
assert isinstance(params['page'], int)
|
|
params['page'] += 1
|
|
|
|
return all_issues
|
|
|
|
def _extract_image_urls(self, issue_body: str) -> list[str]:
|
|
|
|
image_pattern = r'!\[.*?\]\((https?://[^\s)]+)\)'
|
|
return re.findall(image_pattern, issue_body)
|
|
|
|
def _extract_issue_references(self, body: str) -> list[int]:
|
|
|
|
body = re.sub(r'```.*?```', '', body, flags=re.DOTALL)
|
|
|
|
|
|
body = re.sub(r'`[^`]*`', '', body)
|
|
|
|
|
|
body = re.sub(r'https?://[^\s)]*#\d+[^\s)]*', '', body)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pattern = r'(?:^|[\s\[({]|[^\w#])#(\d+)(?=[\s,.\])}]|$)'
|
|
return [int(match) for match in re.findall(pattern, body)]
|
|
|
|
def _get_issue_comments(
|
|
self, issue_number: int, comment_id: int | None = None
|
|
) -> list[str] | None:
|
|
"""Retrieve comments for a specific issue from Github.
|
|
|
|
Args:
|
|
issue_number: The ID of the issue to get comments for
|
|
comment_id: The ID of a single comment, if provided, otherwise all comments
|
|
"""
|
|
url = f'https://api.github.com/repos/{self.owner}/{self.repo}/issues/{issue_number}/comments'
|
|
headers = {
|
|
'Authorization': f'token {self.token}',
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
}
|
|
params = {'per_page': 100, 'page': 1}
|
|
all_comments = []
|
|
|
|
|
|
while True:
|
|
response = requests.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
comments = response.json()
|
|
|
|
if not comments:
|
|
break
|
|
|
|
|
|
if comment_id:
|
|
matching_comment = next(
|
|
(
|
|
comment['body']
|
|
for comment in comments
|
|
if comment['id'] == comment_id
|
|
),
|
|
None,
|
|
)
|
|
if matching_comment:
|
|
return [matching_comment]
|
|
else:
|
|
|
|
all_comments.extend([comment['body'] for comment in comments])
|
|
|
|
params['page'] += 1
|
|
|
|
return all_comments if all_comments else None
|
|
|
|
def get_converted_issues(
|
|
self, issue_numbers: list[int] | None = None, comment_id: int | None = None
|
|
) -> list[GithubIssue]:
|
|
"""Download issues from Github.
|
|
|
|
Args:
|
|
issue_numbers: The numbers of the issues to download
|
|
comment_id: The ID of a single comment, if provided, otherwise all comments
|
|
|
|
Returns:
|
|
List of Github issues.
|
|
"""
|
|
|
|
if not issue_numbers:
|
|
raise ValueError('Unspecified issue number')
|
|
|
|
all_issues = self._download_issues_from_github()
|
|
logger.info(f'Limiting resolving to issues {issue_numbers}.')
|
|
all_issues = [
|
|
issue
|
|
for issue in all_issues
|
|
if issue['number'] in issue_numbers and 'pull_request' not in issue
|
|
]
|
|
|
|
if len(issue_numbers) == 1 and not all_issues:
|
|
raise ValueError(f'Issue {issue_numbers[0]} not found')
|
|
|
|
converted_issues = []
|
|
for issue in all_issues:
|
|
|
|
if any([issue.get(key) is None for key in ['number', 'title']]):
|
|
logger.warning(
|
|
f'Skipping issue {issue} as it is missing number or title.'
|
|
)
|
|
continue
|
|
|
|
|
|
if issue.get('body') is None:
|
|
issue['body'] = ''
|
|
|
|
|
|
thread_comments = self._get_issue_comments(
|
|
issue['number'], comment_id=comment_id
|
|
)
|
|
|
|
issue_details = GithubIssue(
|
|
owner=self.owner,
|
|
repo=self.repo,
|
|
number=issue['number'],
|
|
title=issue['title'],
|
|
body=issue['body'],
|
|
thread_comments=thread_comments,
|
|
review_comments=None,
|
|
)
|
|
|
|
converted_issues.append(issue_details)
|
|
|
|
return converted_issues
|
|
|
|
def get_instruction(
|
|
self,
|
|
issue: GithubIssue,
|
|
prompt_template: str,
|
|
repo_instruction: str | None = None,
|
|
) -> tuple[str, list[str]]:
|
|
"""Generate instruction for the agent.
|
|
|
|
Args:
|
|
issue: The issue to generate instruction for
|
|
prompt_template: The prompt template to use
|
|
repo_instruction: The repository instruction if it exists
|
|
"""
|
|
|
|
|
|
thread_context = ''
|
|
if issue.thread_comments:
|
|
thread_context = '\n\nIssue Thread Comments:\n' + '\n---\n'.join(
|
|
issue.thread_comments
|
|
)
|
|
|
|
|
|
images = []
|
|
images.extend(self._extract_image_urls(issue.body))
|
|
images.extend(self._extract_image_urls(thread_context))
|
|
|
|
template = jinja2.Template(prompt_template)
|
|
return (
|
|
template.render(
|
|
body=issue.title + '\n\n' + issue.body + thread_context,
|
|
repo_instruction=repo_instruction,
|
|
),
|
|
images,
|
|
)
|
|
|
|
def guess_success(
|
|
self, issue: GithubIssue, history: list[Event], git_patch: str | None = None
|
|
) -> tuple[bool, None | list[bool], str]:
|
|
"""Guess if the issue is fixed based on the history and the issue description.
|
|
|
|
Args:
|
|
issue: The issue to check
|
|
history: The agent's history
|
|
git_patch: Optional git patch showing the changes made
|
|
"""
|
|
last_message = history[-1].message
|
|
|
|
|
|
issue_context = issue.body
|
|
if issue.thread_comments:
|
|
issue_context += '\n\nIssue Thread Comments:\n' + '\n---\n'.join(
|
|
issue.thread_comments
|
|
)
|
|
|
|
|
|
with open(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
'prompts/guess_success/issue-success-check.jinja',
|
|
),
|
|
'r',
|
|
) as f:
|
|
template = jinja2.Template(f.read())
|
|
prompt = template.render(
|
|
issue_context=issue_context,
|
|
last_message=last_message,
|
|
git_patch=git_patch or self.default_git_patch,
|
|
)
|
|
|
|
|
|
response = self.llm.completion(messages=[{'role': 'user', 'content': prompt}])
|
|
|
|
answer = response.choices[0].message.content.strip()
|
|
pattern = r'--- success\n*(true|false)\n*--- explanation*\n((?:.|\n)*)'
|
|
match = re.search(pattern, answer)
|
|
if match:
|
|
return match.group(1).lower() == 'true', None, match.group(2)
|
|
|
|
return False, None, f'Failed to decode answer from LLM response: {answer}'
|
|
|
|
|
|
class PRHandler(IssueHandler):
|
|
issue_type: ClassVar[str] = 'pr'
|
|
|
|
def __init__(self, owner: str, repo: str, token: str, llm_config: LLMConfig):
|
|
super().__init__(owner, repo, token, llm_config)
|
|
self.download_url = 'https://api.github.com/repos/{}/{}/pulls'
|
|
|
|
def __download_pr_metadata(
|
|
self, pull_number: int, comment_id: int | None = None
|
|
) -> tuple[list[str], list[int], list[str], list[ReviewThread], list[str]]:
|
|
"""Run a GraphQL query against the GitHub API for information.
|
|
|
|
Retrieves information about:
|
|
1. unresolved review comments
|
|
2. referenced issues the pull request would close
|
|
|
|
Args:
|
|
pull_number: The number of the pull request to query.
|
|
comment_id: Optional ID of a specific comment to focus on.
|
|
query: The GraphQL query as a string.
|
|
variables: A dictionary of variables for the query.
|
|
token: Your GitHub personal access token.
|
|
|
|
Returns:
|
|
The JSON response from the GitHub API.
|
|
"""
|
|
|
|
|
|
query = """
|
|
query($owner: String!, $repo: String!, $pr: Int!) {
|
|
repository(owner: $owner, name: $repo) {
|
|
pullRequest(number: $pr) {
|
|
closingIssuesReferences(first: 10) {
|
|
edges {
|
|
node {
|
|
body
|
|
number
|
|
}
|
|
}
|
|
}
|
|
url
|
|
reviews(first: 100) {
|
|
nodes {
|
|
body
|
|
state
|
|
fullDatabaseId
|
|
}
|
|
}
|
|
reviewThreads(first: 100) {
|
|
edges{
|
|
node{
|
|
id
|
|
isResolved
|
|
comments(first: 100) {
|
|
totalCount
|
|
nodes {
|
|
body
|
|
path
|
|
fullDatabaseId
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
variables = {'owner': self.owner, 'repo': self.repo, 'pr': pull_number}
|
|
|
|
|
|
url = 'https://api.github.com/graphql'
|
|
headers = {
|
|
'Authorization': f'Bearer {self.token}',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
response = requests.post(
|
|
url, json={'query': query, 'variables': variables}, headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
response_json = response.json()
|
|
|
|
|
|
pr_data = (
|
|
response_json.get('data', {}).get('repository', {}).get('pullRequest', {})
|
|
)
|
|
|
|
|
|
closing_issues = pr_data.get('closingIssuesReferences', {}).get('edges', [])
|
|
closing_issues_bodies = [issue['node']['body'] for issue in closing_issues]
|
|
closing_issue_numbers = [
|
|
issue['node']['number'] for issue in closing_issues
|
|
]
|
|
|
|
|
|
reviews = pr_data.get('reviews', {}).get('nodes', [])
|
|
if comment_id is not None:
|
|
reviews = [
|
|
review
|
|
for review in reviews
|
|
if int(review['fullDatabaseId']) == comment_id
|
|
]
|
|
review_bodies = [review['body'] for review in reviews]
|
|
|
|
|
|
review_threads = []
|
|
thread_ids = []
|
|
raw_review_threads = pr_data.get('reviewThreads', {}).get('edges', [])
|
|
for thread in raw_review_threads:
|
|
node = thread.get('node', {})
|
|
if not node.get(
|
|
'isResolved', True
|
|
):
|
|
id = node.get('id')
|
|
thread_contains_comment_id = False
|
|
my_review_threads = node.get('comments', {}).get('nodes', [])
|
|
message = ''
|
|
files = []
|
|
for i, review_thread in enumerate(my_review_threads):
|
|
if (
|
|
comment_id is not None
|
|
and int(review_thread['fullDatabaseId']) == comment_id
|
|
):
|
|
thread_contains_comment_id = True
|
|
|
|
if (
|
|
i == len(my_review_threads) - 1
|
|
):
|
|
if len(my_review_threads) > 1:
|
|
message += '---\n'
|
|
message += 'latest feedback:\n' + review_thread['body'] + '\n'
|
|
else:
|
|
message += (
|
|
review_thread['body'] + '\n'
|
|
)
|
|
|
|
|
|
file = review_thread.get('path')
|
|
if file and file not in files:
|
|
files.append(file)
|
|
|
|
|
|
if comment_id is None or thread_contains_comment_id:
|
|
unresolved_thread = ReviewThread(comment=message, files=files)
|
|
review_threads.append(unresolved_thread)
|
|
thread_ids.append(id)
|
|
|
|
return (
|
|
closing_issues_bodies,
|
|
closing_issue_numbers,
|
|
review_bodies,
|
|
review_threads,
|
|
thread_ids,
|
|
)
|
|
|
|
|
|
def _get_pr_comments(
|
|
self, pr_number: int, comment_id: int | None = None
|
|
) -> list[str] | None:
|
|
"""Download comments for a specific pull request from Github."""
|
|
url = f'https://api.github.com/repos/{self.owner}/{self.repo}/issues/{pr_number}/comments'
|
|
headers = {
|
|
'Authorization': f'token {self.token}',
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
}
|
|
params = {'per_page': 100, 'page': 1}
|
|
all_comments = []
|
|
|
|
while True:
|
|
response = requests.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
comments = response.json()
|
|
|
|
if not comments:
|
|
break
|
|
|
|
if comment_id is not None:
|
|
matching_comment = next(
|
|
(
|
|
comment['body']
|
|
for comment in comments
|
|
if comment['id'] == comment_id
|
|
),
|
|
None,
|
|
)
|
|
if matching_comment:
|
|
return [matching_comment]
|
|
else:
|
|
all_comments.extend([comment['body'] for comment in comments])
|
|
|
|
params['page'] += 1
|
|
|
|
return all_comments if all_comments else None
|
|
|
|
def __get_context_from_external_issues_references(
|
|
self,
|
|
closing_issues: list[str],
|
|
closing_issue_numbers: list[int],
|
|
issue_body: str,
|
|
review_comments: list[str],
|
|
review_threads: list[ReviewThread],
|
|
thread_comments: list[str] | None,
|
|
):
|
|
new_issue_references = []
|
|
|
|
if issue_body:
|
|
new_issue_references.extend(self._extract_issue_references(issue_body))
|
|
|
|
if review_comments:
|
|
for comment in review_comments:
|
|
new_issue_references.extend(self._extract_issue_references(comment))
|
|
|
|
if review_threads:
|
|
for review_thread in review_threads:
|
|
new_issue_references.extend(
|
|
self._extract_issue_references(review_thread.comment)
|
|
)
|
|
|
|
if thread_comments:
|
|
for thread_comment in thread_comments:
|
|
new_issue_references.extend(
|
|
self._extract_issue_references(thread_comment)
|
|
)
|
|
|
|
non_duplicate_references = set(new_issue_references)
|
|
unique_issue_references = non_duplicate_references.difference(
|
|
closing_issue_numbers
|
|
)
|
|
|
|
for issue_number in unique_issue_references:
|
|
try:
|
|
url = f'https://api.github.com/repos/{self.owner}/{self.repo}/issues/{issue_number}'
|
|
headers = {
|
|
'Authorization': f'Bearer {self.token}',
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
}
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
issue_data = response.json()
|
|
issue_body = issue_data.get('body', '')
|
|
if issue_body:
|
|
closing_issues.append(issue_body)
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f'Failed to fetch issue {issue_number}: {str(e)}')
|
|
|
|
return closing_issues
|
|
|
|
def get_converted_issues(
|
|
self, issue_numbers: list[int] | None = None, comment_id: int | None = None
|
|
) -> list[GithubIssue]:
|
|
if not issue_numbers:
|
|
raise ValueError('Unspecified issue numbers')
|
|
|
|
all_issues = self._download_issues_from_github()
|
|
logger.info(f'Limiting resolving to issues {issue_numbers}.')
|
|
all_issues = [issue for issue in all_issues if issue['number'] in issue_numbers]
|
|
|
|
converted_issues = []
|
|
for issue in all_issues:
|
|
|
|
if any([issue.get(key) is None for key in ['number', 'title']]):
|
|
logger.warning(f'Skipping #{issue} as it is missing number or title.')
|
|
continue
|
|
|
|
|
|
body = issue.get('body') if issue.get('body') is not None else ''
|
|
(
|
|
closing_issues,
|
|
closing_issues_numbers,
|
|
review_comments,
|
|
review_threads,
|
|
thread_ids,
|
|
) = self.__download_pr_metadata(issue['number'], comment_id=comment_id)
|
|
head_branch = issue['head']['ref']
|
|
|
|
|
|
thread_comments = self._get_pr_comments(
|
|
issue['number'], comment_id=comment_id
|
|
)
|
|
|
|
closing_issues = self.__get_context_from_external_issues_references(
|
|
closing_issues,
|
|
closing_issues_numbers,
|
|
body,
|
|
review_comments,
|
|
review_threads,
|
|
thread_comments,
|
|
)
|
|
|
|
issue_details = GithubIssue(
|
|
owner=self.owner,
|
|
repo=self.repo,
|
|
number=issue['number'],
|
|
title=issue['title'],
|
|
body=body,
|
|
closing_issues=closing_issues,
|
|
review_comments=review_comments,
|
|
review_threads=review_threads,
|
|
thread_ids=thread_ids,
|
|
head_branch=head_branch,
|
|
thread_comments=thread_comments,
|
|
)
|
|
|
|
converted_issues.append(issue_details)
|
|
|
|
return converted_issues
|
|
|
|
def get_instruction(
|
|
self,
|
|
issue: GithubIssue,
|
|
prompt_template: str,
|
|
repo_instruction: str | None = None,
|
|
) -> tuple[str, list[str]]:
|
|
"""Generate instruction for the agent."""
|
|
template = jinja2.Template(prompt_template)
|
|
images = []
|
|
|
|
issues_str = None
|
|
if issue.closing_issues:
|
|
issues_str = json.dumps(issue.closing_issues, indent=4)
|
|
images.extend(self._extract_image_urls(issues_str))
|
|
|
|
|
|
review_comments_str = None
|
|
if issue.review_comments:
|
|
review_comments_str = json.dumps(issue.review_comments, indent=4)
|
|
images.extend(self._extract_image_urls(review_comments_str))
|
|
|
|
|
|
review_thread_str = None
|
|
review_thread_file_str = None
|
|
if issue.review_threads:
|
|
review_threads = [
|
|
review_thread.comment for review_thread in issue.review_threads
|
|
]
|
|
review_thread_files = []
|
|
for review_thread in issue.review_threads:
|
|
review_thread_files.extend(review_thread.files)
|
|
review_thread_str = json.dumps(review_threads, indent=4)
|
|
review_thread_file_str = json.dumps(review_thread_files, indent=4)
|
|
images.extend(self._extract_image_urls(review_thread_str))
|
|
|
|
|
|
thread_context = ''
|
|
if issue.thread_comments:
|
|
thread_context = '\n---\n'.join(issue.thread_comments)
|
|
images.extend(self._extract_image_urls(thread_context))
|
|
|
|
instruction = template.render(
|
|
issues=issues_str,
|
|
review_comments=review_comments_str,
|
|
review_threads=review_thread_str,
|
|
files=review_thread_file_str,
|
|
thread_context=thread_context,
|
|
repo_instruction=repo_instruction,
|
|
)
|
|
return instruction, images
|
|
|
|
def _check_feedback_with_llm(self, prompt: str) -> tuple[bool, str]:
|
|
"""Helper function to check feedback with LLM and parse response."""
|
|
response = self.llm.completion(messages=[{'role': 'user', 'content': prompt}])
|
|
|
|
answer = response.choices[0].message.content.strip()
|
|
pattern = r'--- success\n*(true|false)\n*--- explanation*\n((?:.|\n)*)'
|
|
match = re.search(pattern, answer)
|
|
if match:
|
|
return match.group(1).lower() == 'true', match.group(2).strip()
|
|
return False, f'Failed to decode answer from LLM response: {answer}'
|
|
|
|
def _check_review_thread(
|
|
self,
|
|
review_thread: ReviewThread,
|
|
issues_context: str,
|
|
last_message: str,
|
|
git_patch: str | None = None,
|
|
) -> tuple[bool, str]:
|
|
"""Check if a review thread's feedback has been addressed."""
|
|
files_context = json.dumps(review_thread.files, indent=4)
|
|
|
|
with open(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
'prompts/guess_success/pr-feedback-check.jinja',
|
|
),
|
|
'r',
|
|
) as f:
|
|
template = jinja2.Template(f.read())
|
|
|
|
prompt = template.render(
|
|
issue_context=issues_context,
|
|
feedback=review_thread.comment,
|
|
files_context=files_context,
|
|
last_message=last_message,
|
|
git_patch=git_patch or self.default_git_patch,
|
|
)
|
|
|
|
return self._check_feedback_with_llm(prompt)
|
|
|
|
def _check_thread_comments(
|
|
self,
|
|
thread_comments: list[str],
|
|
issues_context: str,
|
|
last_message: str,
|
|
git_patch: str | None = None,
|
|
) -> tuple[bool, str]:
|
|
"""Check if thread comments feedback has been addressed."""
|
|
thread_context = '\n---\n'.join(thread_comments)
|
|
|
|
with open(
|
|
os.path.join(
|
|
os.path.dirname(__file__), 'prompts/guess_success/pr-thread-check.jinja'
|
|
),
|
|
'r',
|
|
) as f:
|
|
template = jinja2.Template(f.read())
|
|
|
|
prompt = template.render(
|
|
issue_context=issues_context,
|
|
thread_context=thread_context,
|
|
last_message=last_message,
|
|
git_patch=git_patch or self.default_git_patch,
|
|
)
|
|
|
|
return self._check_feedback_with_llm(prompt)
|
|
|
|
def _check_review_comments(
|
|
self,
|
|
review_comments: list[str],
|
|
issues_context: str,
|
|
last_message: str,
|
|
git_patch: str | None = None,
|
|
) -> tuple[bool, str]:
|
|
"""Check if review comments feedback has been addressed."""
|
|
review_context = '\n---\n'.join(review_comments)
|
|
|
|
with open(
|
|
os.path.join(
|
|
os.path.dirname(__file__), 'prompts/guess_success/pr-review-check.jinja'
|
|
),
|
|
'r',
|
|
) as f:
|
|
template = jinja2.Template(f.read())
|
|
|
|
prompt = template.render(
|
|
issue_context=issues_context,
|
|
review_context=review_context,
|
|
last_message=last_message,
|
|
git_patch=git_patch or self.default_git_patch,
|
|
)
|
|
|
|
return self._check_feedback_with_llm(prompt)
|
|
|
|
def guess_success(
|
|
self, issue: GithubIssue, history: list[Event], git_patch: str | None = None
|
|
) -> tuple[bool, None | list[bool], str]:
|
|
"""Guess if the issue is fixed based on the history, issue description and git patch."""
|
|
last_message = history[-1].message
|
|
|
|
issues_context = json.dumps(issue.closing_issues, indent=4)
|
|
success_list = []
|
|
explanation_list = []
|
|
|
|
|
|
if issue.review_threads:
|
|
for review_thread in issue.review_threads:
|
|
if issues_context and last_message:
|
|
success, explanation = self._check_review_thread(
|
|
review_thread, issues_context, last_message, git_patch
|
|
)
|
|
else:
|
|
success, explanation = False, 'Missing context or message'
|
|
success_list.append(success)
|
|
explanation_list.append(explanation)
|
|
|
|
elif issue.thread_comments:
|
|
if issue.thread_comments and issues_context and last_message:
|
|
success, explanation = self._check_thread_comments(
|
|
issue.thread_comments, issues_context, last_message, git_patch
|
|
)
|
|
else:
|
|
success, explanation = (
|
|
False,
|
|
'Missing thread comments, context or message',
|
|
)
|
|
success_list.append(success)
|
|
explanation_list.append(explanation)
|
|
elif issue.review_comments:
|
|
|
|
if issue.review_comments and issues_context and last_message:
|
|
success, explanation = self._check_review_comments(
|
|
issue.review_comments, issues_context, last_message, git_patch
|
|
)
|
|
else:
|
|
success, explanation = (
|
|
False,
|
|
'Missing review comments, context or message',
|
|
)
|
|
success_list.append(success)
|
|
explanation_list.append(explanation)
|
|
else:
|
|
|
|
return False, None, 'No feedback was found to process'
|
|
|
|
|
|
if not success_list:
|
|
return False, None, 'No feedback was processed'
|
|
return all(success_list), success_list, json.dumps(explanation_list)
|
|
|