todo-to-issue-action/GitHubClient.py

412 lines
18 KiB
Python

import os
import requests
import json
import re
from urllib.parse import quote
from Client import Client
class GitHubClient(Client):
"""Basic client for getting the last diff and managing issues."""
existing_issues = []
milestones = []
def __init__(self):
self.github_url = os.getenv('INPUT_GITHUB_URL')
if not self.github_url:
raise EnvironmentError
self.base_url = f'{self.github_url}/'
self.repos_url = f'{self.base_url}repos/'
self.repo = os.getenv('INPUT_REPO')
self.target_repo_name = os.getenv('INPUT_TARGET_REPO')
self.owner = os.getenv('INPUT_OWNER')
# Construct the full target repository path
if self.target_repo_name and self.owner:
self.target_repo = f'{self.owner}/{self.target_repo_name}'
else:
self.target_repo = self.repo # Default to current repo if not specified
# Log the repository configuration for debugging
if self.target_repo == self.repo:
print(f'Creating issues in current repository: {self.repo}')
else:
print(f'Creating issues in target repository: {self.target_repo} (source: {self.repo})')
self.before = os.getenv('INPUT_BEFORE')
self.sha = os.getenv('INPUT_SHA')
self.commits = json.loads(os.getenv('INPUT_COMMITS')) or []
self.__init_diff_url__()
self.token = os.getenv('INPUT_TOKEN')
self.issues_url = f'{self.repos_url}{self.target_repo}/issues'
self.milestones_url = f'{self.repos_url}{self.target_repo}/milestones'
self.issue_headers = {
'Content-Type': 'application/json',
'Authorization': f'token {self.token}',
'X-GitHub-Api-Version': '2022-11-28',
'User-Agent': 'TODOToIssue/5.1.9'
}
self.graphql_headers = {
'Authorization': f'Bearer {os.getenv("INPUT_PROJECTS_SECRET", "")}',
'Accept': 'application/vnd.github.v4+json',
'User-Agent': 'TODOToIssue/5.1.9'
}
auto_p = os.getenv('INPUT_AUTO_P', 'true') == 'true'
self.line_break = '\n\n' if auto_p else '\n'
self.auto_assign = os.getenv('INPUT_AUTO_ASSIGN', 'false') == 'true'
self.actor = os.getenv('INPUT_ACTOR')
self.line_base_url = os.getenv('INPUT_GITHUB_SERVER_URL')
if not self.line_base_url.endswith('/'):
self.line_base_url += '/'
self.project = os.getenv('INPUT_PROJECT', None)
# Retrieve the existing repo issues now so we can easily check them later.
self._get_existing_issues()
# Populate milestones so we can perform a lookup if one is specified.
self._get_milestones()
def __init_diff_url__(self):
manual_commit_ref = os.getenv('MANUAL_COMMIT_REF')
manual_base_ref = os.getenv('MANUAL_BASE_REF')
if manual_commit_ref:
self.sha = manual_commit_ref
if manual_commit_ref and manual_base_ref:
print(f'Manually comparing {manual_base_ref}...{manual_commit_ref}')
self.diff_url = f'{self.repos_url}{self.repo}/compare/{manual_base_ref}...{manual_commit_ref}'
elif manual_commit_ref:
print(f'Manual checking {manual_commit_ref}')
self.diff_url = f'{self.repos_url}{self.repo}/commits/{manual_commit_ref}'
else:
self.diff_url = os.getenv('INPUT_DIFF_URL')
def get_last_diff(self):
"""Get the last diff."""
if self.diff_url:
# Diff url was directly passed in config, likely due to this being a PR.
diff_url = self.diff_url
pr_url_pattern = r'/pull/(\d+)\.diff$'
pr_search = re.search(pr_url_pattern, diff_url)
if pr_search:
pr_number = pr_search.group(1)
diff_url = f'{self.repos_url}{self.repo}/pulls/{pr_number}'
elif self.before != '0000000000000000000000000000000000000000':
# There is a valid before SHA to compare with, or this is a release being created.
diff_url = f'{self.repos_url}{self.repo}/compare/{self.before}...{self.sha}'
elif len(self.commits) == 1:
# There is only one commit.
diff_url = f'{self.repos_url}{self.repo}/commits/{self.sha}'
elif len(self.commits) > 1:
# There are several commits: compare with the oldest one.
oldest = sorted(self.commits, key=self._get_timestamp)[0]['id']
diff_url = f'{self.repos_url}{self.repo}/compare/{oldest}...{self.sha}'
else:
return None
diff_headers = {
'Accept': 'application/vnd.github.v3.diff',
'Authorization': f'token {self.token}',
'X-GitHub-Api-Version': '2022-11-28',
'User-Agent': 'TODOToIssue/5.1.9'
}
diff_request = requests.get(url=diff_url, headers=diff_headers)
if diff_request.status_code == 200:
return diff_request.text
error_response = [f'Could not retrieve diff',
f'URL: {diff_url}',
f'Status code: {diff_request.status_code}']
if 'application/json' in diff_request.headers['content-type']:
error_response.append(f"Server response: {json.loads(diff_request.text)['message']}")
error_response.append('Operation will abort')
if '/compare/' in diff_url:
# The before SHA may no longer be valid due to a force push, fall back to /commits/ endpoint.
diff_url = f'{self.repos_url}{self.repo}/commits/{self.sha}'
print(f'Falling back to {diff_url}')
diff_request = requests.get(url=diff_url, headers=diff_headers)
if diff_request.status_code == 200:
return diff_request.text
error_response.append('Fallback URL also failed')
raise Exception('\n'.join(error_response))
# noinspection PyMethodMayBeStatic
def _get_timestamp(self, commit):
"""Get a commit timestamp."""
return commit.get('timestamp')
def _get_milestones(self, page=1):
"""Get all the milestones."""
params = {
'per_page': 100,
'page': page,
'state': 'open'
}
milestones_request = requests.get(self.milestones_url, headers=self.issue_headers, params=params)
if milestones_request.status_code == 200:
self.milestones.extend(milestones_request.json())
links = milestones_request.links
if 'next' in links:
self._get_milestones(page + 1)
def _get_milestone(self, title):
"""Get the milestone number for the one with this title (creating one if it doesn't exist)."""
for m in self.milestones:
if m['title'] == title:
return m['number']
else:
return self._create_milestone(title)
def _create_milestone(self, title):
"""Create a new milestone with this title."""
milestone_data = {
'title': title
}
milestone_request = requests.post(self.milestones_url, headers=self.issue_headers, json=milestone_data)
return milestone_request.json()['number'] if milestone_request.status_code == 201 else None
def _get_existing_issues(self, page=1):
"""Populate the existing issues list."""
params = {
'per_page': 100,
'page': page,
'state': 'open'
}
list_issues_request = requests.get(self.issues_url, headers=self.issue_headers, params=params)
if list_issues_request.status_code == 200:
self.existing_issues.extend(list_issues_request.json())
links = list_issues_request.links
if 'next' in links:
self._get_existing_issues(page + 1)
def _get_project_id(self, project):
"""Get the project ID."""
project_type, owner, project_name = project.split('/')
if project_type == 'user':
query = """
query($owner: String!) {
user(login: $owner) {
projectsV2(first: 10) {
nodes {
id
title
}
}
}
}
"""
elif project_type == 'organization':
query = """
query($owner: String!) {
organization(login: $owner) {
projectsV2(first: 10) {
nodes {
id
title
}
}
}
}
"""
else:
print("Invalid project type")
return None
variables = {
'owner': owner,
}
project_request = requests.post('https://api.github.com/graphql',
json={'query': query, 'variables': variables},
headers=self.graphql_headers)
if project_request.status_code == 200:
projects = (project_request.json().get('data', {}).get(project_type, {}).get('projectsV2', {})
.get('nodes', []))
for project in projects:
if project['title'] == project_name:
return project['id']
return None
def _get_issue_global_id(self, owner, repo, issue_number):
"""Get the global ID for a given issue."""
query = """
query($owner: String!, $repo: String!, $issue_number: Int!) {
repository(owner: $owner, name: $repo) {
issue(number: $issue_number) {
id
}
}
}
"""
variables = {
'owner': owner,
'repo': repo,
'issue_number': issue_number
}
project_request = requests.post('https://api.github.com/graphql',
json={'query': query, 'variables': variables},
headers=self.graphql_headers)
if project_request.status_code == 200:
return project_request.json()['data']['repository']['issue']['id']
return None
def _add_issue_to_project(self, issue_id, project_id):
"""Attempt to add this issue to a project."""
mutation = """
mutation($projectId: ID!, $contentId: ID!) {
addProjectV2ItemById(input: {projectId: $projectId, contentId: $contentId}) {
item {
id
}
}
}
"""
variables = {
"projectId": project_id,
"contentId": issue_id
}
project_request = requests.post('https://api.github.com/graphql',
json={'query': mutation, 'variables': variables},
headers=self.graphql_headers)
return project_request.status_code
def _comment_issue(self, issue_number, comment):
"""Post a comment on an issue."""
issue_comment_url = f'{self.repos_url}{self.target_repo}/issues/{issue_number}/comments'
body = {'body': comment}
update_issue_request = requests.post(issue_comment_url, headers=self.issue_headers, json=body)
return update_issue_request.status_code
def create_issue(self, issue):
"""Create a dict containing the issue details and send it to GitHub."""
formatted_issue_body = self.line_break.join(issue.body)
line_num_anchor = f'#L{issue.start_line}'
if issue.num_lines > 1:
line_num_anchor += f'-L{issue.start_line + issue.num_lines - 1}'
url_to_line = f'{self.line_base_url}{self.repo}/blob/{self.sha}/{quote(issue.file_name)}{line_num_anchor}'
snippet = '```' + issue.markdown_language + '\n' + issue.hunk + '\n' + '```'
issue_template = os.getenv('INPUT_ISSUE_TEMPLATE', None)
if issue_template:
issue_contents = (issue_template.replace('{{ title }}', issue.title)
.replace('{{ body }}', formatted_issue_body)
.replace('{{ url }}', url_to_line)
.replace('{{ snippet }}', snippet)
)
elif len(issue.body) != 0:
issue_contents = formatted_issue_body + '\n\n' + url_to_line + '\n\n' + snippet
else:
issue_contents = url_to_line + '\n\n' + snippet
endpoint = self.issues_url
if issue.issue_url:
# Issue already exists, update existing rather than create new.
endpoint += f'/{issue.issue_number}'
title = issue.title
if issue.ref:
if issue.ref.startswith('@'):
# Ref = assignee.
issue.assignees.append(issue.ref.lstrip('@'))
elif issue.ref.startswith('!'):
# Ref = label.
issue.labels.append(issue.ref.lstrip('!'))
elif issue.ref.startswith('#'):
# Ref = issue number (indicating this is a comment on that issue).
issue_number = issue.ref.lstrip('#')
if issue_number.isdigit():
# Create the comment now.
return self._comment_issue(issue_number, f'{issue.title}\n\n{issue_contents}'), None
else:
# Just prepend the ref to the title.
title = f'[{issue.ref}] {issue.title}'
title = title + '...' if len(title) > 80 else title
new_issue_body = {'title': title, 'body': issue_contents, 'labels': issue.labels}
# We need to check if any assignees/milestone specified exist, otherwise issue creation will fail.
valid_assignees = []
if len(issue.assignees) == 0 and self.auto_assign:
valid_assignees.append(self.actor)
for assignee in issue.assignees:
assignee_url = f'{self.repos_url}{self.target_repo}/assignees/{assignee}'
assignee_request = requests.get(url=assignee_url, headers=self.issue_headers)
if assignee_request.status_code == 204:
valid_assignees.append(assignee)
else:
print(f'Assignee {assignee} does not exist! Dropping this assignee!')
new_issue_body['assignees'] = valid_assignees
if issue.milestone:
milestone_number = self._get_milestone(issue.milestone)
if milestone_number:
new_issue_body['milestone'] = milestone_number
else:
print(f'Milestone {issue.milestone} could not be set. Dropping this milestone!')
if issue.issue_url:
# Update existing issue.
issue_request = requests.patch(url=endpoint, headers=self.issue_headers, json=new_issue_body)
else:
# Create new issue.
issue_request = requests.post(url=endpoint, headers=self.issue_headers, json=new_issue_body)
request_status = issue_request.status_code
issue_number = issue_request.json()['number'] if request_status in [200, 201] else None
# Check if issue should be added to a project now it exists.
if issue_number and self.project:
project_id = self._get_project_id(self.project)
if project_id:
owner, repo = self.target_repo.split('/')
issue_id = self._get_issue_global_id(owner, repo, issue_number)
if issue_id:
self._add_issue_to_project(issue_id, project_id)
return request_status, issue_number
def close_issue(self, issue):
"""Check to see if this issue can be found on GitHub and if so close it."""
issue_number = None
if issue.issue_number:
# If URL insertion is enabled.
issue_number = issue.issue_number
else:
# Try simple matching.
matched = 0
for existing_issue in self.existing_issues:
if existing_issue['title'] == issue.title:
matched += 1
# If there are multiple issues with similar titles, don't try and close any.
if matched > 1:
print(f'Skipping issue (multiple matches)')
break
issue_number = existing_issue['number']
if issue_number:
update_issue_url = f'{self.issues_url}/{issue_number}'
body = {'state': 'closed'}
requests.patch(update_issue_url, headers=self.issue_headers, json=body)
request_status = self._comment_issue(issue_number, f'Closed in {self.sha}.')
# Update the description if this is a PR.
if os.getenv('GITHUB_EVENT_NAME') == 'pull_request':
pr_number = os.getenv('PR_NUMBER')
if pr_number:
request_status = self._update_pr_body(pr_number, body)
return request_status
return None
def _update_pr_body(self, pr_number, issue_number):
"""Add a close message for an issue to a PR."""
pr_url = f'{self.repos_url}{self.target_repo}/pulls/{pr_number}'
pr_request = requests.get(pr_url, headers=self.issue_headers)
if pr_request.status_code == 200:
pr_body = pr_request.json()['body']
close_message = f'Closes #{issue_number}'
if close_message not in pr_body:
updated_pr_body = f'{pr_body}\n\n{close_message}' if pr_body.strip() else close_message
body = {'body': updated_pr_body}
pr_update_request = requests.patch(pr_url, headers=self.issue_headers, json=body)
return pr_update_request.status_code
return pr_request.status_code
def get_issue_url(self, new_issue_number):
"""Get the URL for the issue."""
return f'{self.line_base_url}{self.target_repo}/issues/{new_issue_number}'