Rewrite getChangelog.py in GraphQL

This commit is contained in:
Keunes 2024-06-04 22:54:30 +02:00
parent d455323ca8
commit 56e8a3b899
1 changed files with 560 additions and 80 deletions

View File

@ -1,90 +1,570 @@
#!/usr/bin/env python3
import requests
try:
from graphqlclient import GraphQLClient
import json
import time
import re
from datetime import datetime
import csv
from collections import OrderedDict
import sys
import os
import getpass
import threading
import urllib.error
import textwrap
except ModuleNotFoundError as e:
print(f"The '{e.name}' module is not installed. Please install it using 'pip install {e.name}' and try again.")
exit()
REPO = "AntennaPod/AntennaPod"
# Define variables
owner = os.getenv('OWNER', "AntennaPod") # The owner (organisation or user) of the repository
repo = os.getenv('REPO', "AntennaPod") # The repository name
token = os.getenv('GITHUB_API_TOKEN') # The GitHub API token [evnironment variable, otherwise user input]
base_ref = os.getenv('BASE') # The base reference (release code or branch); point of reference [user input]
head_ref = os.getenv('HEAD') # The head reference (release code or branch); environment containing the changes [user input]
max_associatedPRs = 5 # The maximum number of pull requests that the script will fetch per commit
client = GraphQLClient('https://api.github.com/graphql')
filename = None
def set_filename(base_ref, head_ref):
global filename
filename = f'{base_ref} - {head_ref} changelog.csv'
print("Hello, welcome to the AntennaPod PR list generator!")
# Function: Handle exceptions
def handle_exception(error_message: str, data: dict, error_introduction="Error"):
print(f"\n{error_introduction}: {str(error_message)}")
if data:
print("JSON data:")
print(json.dumps(data, indent=2))
# Function: Display processing animation
animation_state = {"text": "Loading", "stop": False}
def display_processing_animation(state):
print(f"{state['text']}...", end="")
characters = ["", "", "", "", "", "", "", ""]
while not state['stop']:
for char in characters:
print(f"\r{char} {state['text']}... ", end="", flush=True)
time.sleep(0.5)
print("\r ", flush=True)
# Function: Get list of PRs for a given commit
# Note: this might be unnecessary in case commits can only have one PR
def get_associated_prs(commitid):
"""
Called in case the number of PRs linked to a commit exceeds the maximum number of PRs that the script will fetch per commit.
"""
global animation_text
animation_text = "Get PRs linked to commit"
query = '''
query ($cursor: String, $owner: String!, $repo: String!, $commitid: GitObjectID!) {
repository(name: $repo, owner: $owner) {
object(oid: $commitid) {
... on Commit {
associatedPullRequests(first: 100, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
nodes {
number
}
}
}
}
}
}
'''
# Variable definition for GraphQL Explorer:
# {"commitid": "863d4c3b611df83389d82958114bfd2d1204e457", "cursor": null, "owner": "AntennaPod", "repo": "AntennaPod"}
has_next_pr_page = True
cursor = None
pr_numbers = [] # Create PR list
while has_next_pr_page:
variables = {"cursor": cursor, "owner": owner, "repo": repo, "commitid": commitid }
result = client.execute(query, variables)
data = json.loads(result)
pr_numbers.extend([pr['number'] for pr in data['data']['repository']['commit']['associatedPullRequests']['nodes']])
page_info = data['data']['repository']['commit']['associatedPullRequests']['pageInfo']
has_next_pr_page = page_info['hasNextPage']
cursor = page_info['endCursor']
time.sleep(1) # To prevent hitting rate limits
return pr_numbers
try: # Catch KeyboardInterrupt
# Define animation thread to avoid errors
animation_thread = None
# Define token
print(f"Hello, welcome to the {owner} PR list generator!")
time.sleep(0.5)
print("First, please enter your GitHub API token.")
print("If you don't have one yet, create it at https://github.com/settings/tokens")
def get_token():
TOKEN = ""
while not TOKEN:
TOKEN = input('Token: ').strip()
return TOKEN
global token
data = None
while True:
if not token:
token = getpass.getpass(prompt='Token: ')
try:
client.inject_token('Bearer ' + token)
query = '''
query {
viewer {
login
}
}
'''
response = client.execute(query) # This prints the response in case of error, despite any try/except blocks
data = json.loads(response)
if 'login' in data['data']['viewer']:
print(f"Grand, thank you @{data['data']['viewer']['login']}!")
print("Do you want learn how to save this token for future use? [y/N]")
save = input()
if save.lower() == "y":
print(textwrap.dedent(f"""
To save this token for future use, you need to set it as an environment variable on your system. Here's how you can do it:
TOKEN = get_token()
print("Grand, thank you! (" + TOKEN + " is noted)")
If you're using bash or zsh, you can add the following line to your shell profile file (.bashrc, .bash_profile or .zshrc):
export GITHUB_API_TOKEN='{token}'
If you're using Fish shell, you can add the following line to your config.fish file:
set -x GITHUB_API_TOKEN '{token}'
After adding this line, you'll need to restart your terminal or run 'source ~/.bashrc' (or the appropriate file for your shell) for the changes to take effect.
On Windows, you can set environment variables through the System Properties. Here's how:
1. Right-click on Computer on the desktop or in the Start menu.
2. Choose Properties.
3. Click on Advanced system settings.
4. Click on Environment Variables.
5. Click on New under the User or System variables sections.
6. Enter 'GITHUB_API_TOKEN' as the variable name and '{token}' as the variable value.
7. Click OK in all windows.
Please note that setting environment variables this way will make them available in all future terminal sessions and scripts. Use with caution.
"""))
else:
print("Ok, moving on.")
break
except urllib.error.HTTPError as error_message:
if error_message.code == 401:
handle_exception(Exception("Invalid GitHub API token, please try again."), data)
token = ""
else:
handle_exception(Exception("Issue executing GraphQL query"), data)
token = ""
except Exception as error_message:
handle_exception(error_message, data)
token = ""
return token
if not token:
token = get_token()
# Assuming authentication is successful, we will no longer check for/catch authentication errors.
# Define base_ref
if not base_ref:
print()
print("Now, what do you want to compare?")
print("Now, what should be our point of reference?")
print("Please enter a release code or branch")
print("[default: latest GitHub release]")
BASE = input('Base: ')
if BASE == "":
response = requests.get("https://api.github.com/repos/" + REPO + "/releases/latest", headers={'Authorization': 'token ' + TOKEN})
while response.status_code == 401:
print("Error: Invalid GitHub API token.")
TOKEN = get_token()
response = requests.get("https://api.github.com/repos/" + REPO + "/releases/latest", headers={'Authorization': 'token ' + TOKEN})
release = response.json()
BASE = release["tag_name"]
print("Okido, latest release (" + BASE + ") it is!")
else:
print("Noted")
print("[default: latest (in other words: previous) GitHub release]")
base_ref = input('Base: ')
print()
print("Then, what should be our endpoint?")
if base_ref == "latest" or not base_ref:
query = '''
query ($owner: String!, $repo: String!) {
repository(owner: $owner, name: $repo) {
latestRelease {
tagName
}
}
}
'''
while True:
try:
variables = {"owner": owner, "repo": repo}
response = client.execute(query, variables)
data = json.loads(response)
base_ref = data['data']['repository']['latestRelease']['tagName']
print("\nOkido, let's get the latest release (" + base_ref + ")!")
break
except Exception as e:
handle_exception(e, data)
print("Does your token have enough permissions?")
sys.exit(1)
else:
query = '''
query ($owner: String!, $repo: String!, $ref: String!) {
repository(owner: $owner, name: $repo) {
ref(qualifiedName:"$ref") {
name
}
}
}
'''
while True:
try:
variables = {"owner": owner, "repo": repo, "ref": base_ref}
response = client.execute(query, variables)
data = json.loads(response)
if data['data']['repository']['ref'] == None:
print("\nError: Invalid release code or branch.")
print("Please try again.")
base_ref = input('Base: ')
continue
base_ref = data['data']['repository']['ref']['name']
print(f"\nNoted, {base_ref} it is.")
break
except Exception as e:
handle_exception(e, data)
print("Does your token have enough permissions?")
sys.exit(1)
# Define head_ref
if not head_ref:
print("\nThen, from which environment would you like to see the changes (the head)?")
print("Please enter a release code or branch")
print("[default: 'master']")
HEAD = input('Head: ')
if HEAD == "":
print("Righty, master it is!")
HEAD="master"
head_ref = input('Head: ')
if head_ref == "master" or not head_ref:
print("\nRighty, master it is!")
if not head_ref:
head_ref = "master"
else:
print("Roger that.")
query = '''
query ($owner: String!, $repo: String!, $ref: String!) {
repository(owner: $owner, name: $repo) {
ref(qualifiedName: $ref) {
id
name
}
}
}
'''
while True:
try:
variables = {"owner": owner, "repo": repo, "ref": head_ref}
response = client.execute(query, variables)
data = json.loads(response)
if data['data']['repository']['ref'] == None:
print("\nError: Invalid release code or branch.")
print("Please try again.")
head_ref = input('Head: ')
continue
head_ref = data['data']['repository']['ref']['name']
print(f"\nNoted, {head_ref} it is.")
break
except Exception as e:
handle_exception(e, data)
print("Does your token have enough permissions?")
sys.exit(1)
def print_seen():
print(" [already seen] " + pr_details["title"] + " (#" + str(pr_details["number"]) + ")")
# Start the animation in a separate thread
animation_thread = threading.Thread(target=display_processing_animation, args=(animation_state,))
animation_thread.start()
print()
prsSeen = set()
filename = BASE + " - " + HEAD + " changelog.csv"
outputFile = open(filename, 'w')
outputFile.write("Type,Merge date,URL,Title,Author,Type,Functionality group\n")
commits = requests.get("https://api.github.com/repos/" + REPO + "/compare/" + BASE + "..." + HEAD, headers={'Authorization': 'token ' + TOKEN}).json()
numCommits = len(commits["commits"])
for i in range(numCommits):
sha = commits["commits"][i]["sha"]
commit = commits["commits"][i]
print("Commit "+ str(i+1) + " of " + str(numCommits))
if "Merge pull request #" in commit["commit"]["message"] or "Merge branch" in commit["commit"]["message"]:
print(" [is merge commit]")
continue
pr_match = re.search(r'\(#(\d{4})\)', commit["commit"]["message"])
if pr_match:
pr_number = pr_match.group(1)
if pr_number in prsSeen:
print_seen()
continue
pr_details = requests.get("https://api.github.com/repos/" + REPO + "/pulls/" + pr_number, headers={'Authorization': 'token ' + TOKEN}).json()
outputFile.write("PR," + pr_details["merged_at"] + "," + pr_details["html_url"] + ",\"" + pr_details["title"] + "\"," + pr_details["user"]["login"] + "\n")
print(" " + pr_details["title"] + " (#" + str(pr_details["number"]) + ")")
prsSeen.add(pr_number)
continue
time.sleep(1.5) # Avoid rate limit
prs = requests.get("https://api.github.com/search/issues?q=repo:" + REPO + "+type:pr+is:merged+" + sha, headers={'Authorization': 'token ' + TOKEN}).json()
if len(prs["items"]) == 0:
outputFile.write("Commit," + commit["commit"]["committer"]["date"] + "," + commit["html_url"] + ",\"" + commit["commit"]["message"].splitlines()[0] + "\"," + commit["committer"]["login"] + "\n")
print(" [orphan] " + commit["commit"]["message"].splitlines()[0])
continue
pr_details = prs["items"][0]
if pr_details["number"] in prsSeen:
print_seen()
continue
outputFile.write("PR," + pr_details["pull_request"]["merged_at"] + "," + pr_details["html_url"] + ",\"" + pr_details["title"] + "\"," + pr_details["user"]["login"] + "\n")
print(" " + pr_details["title"] + " (#" + str(pr_details["number"]) + ")")
prsSeen.add(pr_details["number"])
outputFile.close()
# Set filename
set_filename(base_ref, head_ref)
# Get list of commits & associated PRs, comparing base & head
animation_state['text'] = "Get list of commits & PRs"
query = '''
query($cursor: String, $owner: String!, $repo: String!, $baseRef: String!, $headRef: String!, $maxPRs: Int!) {
repository(name: $repo, owner: $owner) {
ref(qualifiedName: $baseRef) {
compare(headRef: $headRef) {
commits(first: 100, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
nodes {
oid
associatedPullRequests(first: $maxPRs) {
totalCount
nodes {
number
}
}
}
}
}
}
}
}
'''
# Variable definition for GraphQL Explorer:
# {"cursor": null, "owner": "AntennaPod", "repo": "AntennaPod", "baseRef": "master", "headRef": "develop", "maxPRs": 5}
has_next_page = True
cursor = None
commits = [] # Create commit list
while has_next_page:
variables = {"cursor": cursor, "owner": owner, "repo": repo, "baseRef": base_ref, "headRef": head_ref, "maxPRs": max_associatedPRs}
result = client.execute(query, variables)
data = json.loads(result)
commit_data = data['data']['repository']['ref']['compare']['commits']['nodes']
for commit in commit_data:
if commit['associatedPullRequests']['totalCount'] > max_associatedPRs: # Request a list of PRs for the commit, if there are many.
commitid = commit['oid']
pr_numbers = get_associated_prs(commitid)
else:
pr_numbers = [pr['number'] for pr in commit['associatedPullRequests']['nodes']]
commits.append({ # Store commit information in list
'sha': commit['oid'],
'pr_count': commit['associatedPullRequests']['totalCount'],
'pr_numbers': pr_numbers,
})
page_info = data['data']['repository']['ref']['compare']['commits']['pageInfo']
has_next_page = page_info['hasNextPage']
cursor = page_info['endCursor']
time.sleep(1) # To prevent hitting rate limits
# Create set of unique PRs from the list of commit dictionaries
animation_state['text'] = "Identify unique PRs"
unique_pr_numbers = set()
for commit in commits:
for pr_number in commit['pr_numbers']:
unique_pr_numbers.add(pr_number)
# Create a list of dictionaries with PR metadata for in CSV file
animation_state['text'] = "Get PR metadata"
## Combined GraphQL call: get relevant PRs and the issues they close
query = f'''
query ($owner: String!, $repo: String!) {{
repository (name: $repo, owner: $owner) {{
'''
for n, pr_number in enumerate(unique_pr_numbers):
query += f'''
pr{n}:pullRequest (number: {pr_number}) {{
url
title
author {{
login
}}
mergedAt
labels (first: 10) {{
totalCount
nodes {{
id
name
}}
}}
closingIssuesReferences(first: 10) {{
totalCount
nodes {{
number
title
labels(first: 10) {{
totalCount
nodes {{
id
name
}}
}}
}}
}}
}}
'''
query += f'''
}}
}}
'''
# Variable definition for GraphQL Explorer:
# {"prnumber": 7053, "owner": "AntennaPod", "repo": "AntennaPod"}
## Submit call
variables = {"owner": owner, "repo": repo}
result = client.execute(query, variables)
data = json.loads(result)
## Parse response and save PR data
animation_state['text'] = "Parse PR metadata"
prs_for_csv = []
for n, pr_number in enumerate(unique_pr_numbers):
prdata = data['data']['repository'][f'pr{n}']
# Create string with related issues
maximum_hit = False
related_issue_numbers = [relatedIssue['number'] for relatedIssue in prdata['closingIssuesReferences']['nodes']]
related_issues_string = ', '.join(map(str, related_issue_numbers))
if prdata['closingIssuesReferences']['totalCount'] > 10:
related_issues_string += " and more"
# Create string with labels of the PR and its associated issues
unique_labels = set()
maximum_hit = False
if prdata['labels']['totalCount'] > 10:
maximum_hit = True
for label in prdata['labels']['nodes']:
unique_labels.add(label['name'])
for relatedIssue in prdata['closingIssuesReferences']['nodes']:
if relatedIssue['labels']['totalCount'] > 10:
maximum_hit = True
for label in relatedIssue['labels']['nodes']:
unique_labels.add(label['name'])
unique_labels_list = list(unique_labels)
unique_labels_string = ', '.join(unique_labels_list)
if maximum_hit:
unique_labels_string += " and more (probably)"
# Create string with issue & PR number(s) that need review replies
numbers = []
maximum_hit = False
if any(label['id'] == 'LA_kwDOAFAGHc8AAAABoGK6aw' for label in prdata['labels']['nodes']):
numbers.append(pr_number)
if prdata['closingIssuesReferences']['totalCount'] > 10 or prdata['labels']['totalCount'] > 10:
maximum_hit = True
for relatedIssue in prdata['closingIssuesReferences']['nodes']:
if any(label['id'] == 'LA_kwDOAFAGHc8AAAABoGK6aw' for label in relatedIssue['labels']['nodes']):
numbers.append(relatedIssue['number'])
if relatedIssue['labels']['totalCount'] > 10:
maximum_hit = True
numbers_str = ', '.join(map(str, numbers))
if maximum_hit:
numbers_str += " and more, possibly"
if numbers_str:
needs_review_reply_string = f"Yes ({numbers_str})"
else:
needs_review_reply_string = "No"
# Store pr information in list
prs_for_csv.append({
'mergedAt': prdata['mergedAt'],
'URL': prdata['url'],
'title': prdata['title'],
'author': prdata['author']['login'],
'relatedIssues': related_issues_string,
'labels': unique_labels_string,
'needsReviewReplies': needs_review_reply_string,
})
# Create a list of dictionaries with commits for in CSV file
# /// NOTE maybe it's better to move this up to before the PR list generation (but after the list of unique PRs has been created) as it clears some of the memory used
animation_state['text'] = "Clean up commit list"
## Filter list with commit dictionaries so only ones without any associated PRs are left
commits = [commit for commit in commits if commit['pr_count'] == 0]
## Expand list with commit dictionaries to contain commit metadata
animation_state['text'] = "Get commit metadata"
### Loop through commits to construct GraphQL query
query = f'''
query ($owner: String!, $repo: String!) {{
repository (name: $repo, owner: $owner) {{
'''
for n, commit in enumerate(commits):
query += f'''
commit{n}:object(oid: "{commit['sha']}") {{
... on Commit {{
message
committedDate
url
authors(first:3) {{
totalCount
nodes {{
user {{
login
}}
}}
}}
}}
}}
'''
query += f'''
}}
}}
'''
# Variable definition for GraphQL Explorer:
# {"sha": "863d4c3b611df83389d82958114bfd2d1204e457", "owner": "AntennaPod", "repo": "AntennaPod"}
## Submit call
variables = {"owner": owner, "repo": repo}
result = client.execute(query, variables)
data = json.loads(result)
## Parse response and expand commit data
animation_state['text'] = "Parse commit metadata"
for n, commit in enumerate(commits):
commit_data = data['data']['repository'][f'commit{n}']
# Create string with authors
authors = [author['user']['login'] for author in commit_data['authors']['nodes']]
authors_string = ', '.join(authors)
commit['committedDate'] = commit_data['committedDate']
commit['URL'] = commit_data['url']
commit['title'] = commit_data['message']
commit['author'] = authors_string
# Combine commit & PR lists & sort by mergedAt (for PRs)/ committedDate (for commits)
animation_state['text'] = "Combine and sort PR and commit lists"
commits = [{**commit, 'datetime': datetime.strptime(commit['committedDate'], '%Y-%m-%dT%H:%M:%SZ'), 'entitytype': 'commit'} for commit in commits]
prs_for_csv = [{**pr, 'datetime': datetime.strptime(pr['mergedAt'], '%Y-%m-%dT%H:%M:%SZ'), 'entitytype': 'pr'} for pr in prs_for_csv]
for commit in commits:
del commit['committedDate']
for pr in prs_for_csv:
del pr['mergedAt']
combined = commits + prs_for_csv
combined.sort(key=lambda x: x['datetime'])
for row in combined:
row.pop('sha', None)
row.pop('pr_count', None)
row.pop('pr_numbers', None)
row['empty1'] = ''
row['empty2'] = ''
# Define your fieldnames and their human-readable counterparts
animation_state['text'] = "Save changelog as CSV"
fields = [
('entitytype', 'Entity'),
('datetime', 'Merge/Committed date'),
('URL', 'URL'),
('title', 'Title'),
('author', 'Author(s)'),
('empty1', 'Type'),
('empty2', 'Functionality group'),
('relatedIssues', 'Related issue(s)'),
('labels', 'Related label(s)'),
('needsReviewReplies', 'Needs review replies?'),
]
# Create an OrderedDict from the fields
fieldnames = OrderedDict(fields)
header = dict(zip(fieldnames.keys(), fieldnames.values()))
with open(f'{filename}', 'w', newline='') as outputFile:
# Use the OrderedDict as the fieldnames argument
writer = csv.DictWriter(outputFile, fieldnames=fieldnames)
writer.writerow(header)
writer.writerows(combined) # Writes all the dictionaries in the list to the CSV
# Stop the animation
animation_state['stop'] = True
animation_thread.join()
print("✅ The changelog has been saved as a CSV file.")
print(f"📂 The file is named '{filename}'.")
except KeyboardInterrupt:
animation_state['text'] = "Ending"
if animation_thread:
animation_state['stop'] = True
try:
animation_thread.join()
except KeyboardInterrupt:
animation_state['text'] = "Still wrapping up"
if filename and os.path.exists(filename):
os.remove(filename)
print(" The requested changelog file was deleted.")
print("\n⚠️ The script was interrupted by the user.")