Dieser Commit ist enthalten in:
Claude Project Manager
2025-07-07 22:11:38 +02:00
Commit ec92da8a64
73 geänderte Dateien mit 16367 neuen und 0 gelöschten Zeilen

14
src/gitea/__init__.py Normale Datei
Datei anzeigen

@ -0,0 +1,14 @@
from .gitea_client import GiteaClient, GiteaConfig
from .git_operations import GitOperationsManager
from .repository_manager import RepositoryManager
from .issue_pr_manager import IssueManager, PullRequestManager, IssuePRManager
__all__ = [
'GiteaClient',
'GiteaConfig',
'GitOperationsManager',
'RepositoryManager',
'IssueManager',
'PullRequestManager',
'IssuePRManager'
]

603
src/gitea/git_operations.py Normale Datei
Datei anzeigen

@ -0,0 +1,603 @@
import os
import subprocess
import logging
from typing import Optional, List, Tuple
from pathlib import Path
from dataclasses import dataclass
import tkinter as tk
from tkinter import filedialog
from datetime import datetime
from utils.logger import logger
@dataclass
class GitCredentials:
username: str
token: str
def get_remote_url(self, base_url: str, repo_path: str) -> str:
from urllib.parse import urlparse
parsed = urlparse(base_url)
url = f"https://{self.username}:{self.token}@{parsed.netloc}/{repo_path}.git"
logger.info(f"Generated remote URL: https://{self.username}:****@{parsed.netloc}/{repo_path}.git")
return url
class GitOperationsManager:
def __init__(self, base_url: str, token: str, username: str = "claude-project-manager"):
self.base_url = base_url
self.credentials = GitCredentials(username=username, token=token)
self.default_clone_dir = Path.home() / "GiteaRepos"
def select_directory(self, title: str = "Select Clone Directory") -> Optional[Path]:
root = tk.Tk()
root.withdraw()
root.attributes('-topmost', True)
directory = filedialog.askdirectory(
title=title,
initialdir=str(self.default_clone_dir),
mustexist=False
)
root.destroy()
if directory:
path = Path(directory)
path.mkdir(parents=True, exist_ok=True)
return path
return None
def _run_git_command(self, cmd: List[str], cwd: Optional[Path] = None) -> Tuple[bool, str, str]:
try:
# Log the command being executed
cmd_str = ' '.join(cmd)
logger.info(f"Executing git command: {cmd_str}")
if cwd:
logger.debug(f"Working directory: {cwd}")
# Hide console window on Windows
startupinfo = None
if os.name == 'nt': # Windows
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
timeout=60,
startupinfo=startupinfo
)
success = result.returncode == 0
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if success:
logger.debug(f"Git command successful: {cmd_str}")
if stdout:
logger.debug(f"Output: {stdout[:500]}...") # Log first 500 chars
else:
logger.error(f"Git command failed: {cmd_str}")
logger.error(f"Error: {stderr}")
return success, stdout, stderr
except subprocess.TimeoutExpired:
logger.error(f"Git command timed out: {' '.join(cmd)}")
return False, "", "Command timed out"
except Exception as e:
logger.error(f"Git command error: {e}")
return False, "", str(e)
def clone_repository(self, owner: str, repo: str, clone_dir: Optional[Path] = None) -> Tuple[bool, Path]:
if clone_dir is None:
# Use default clone directory if none specified
clone_dir = self.default_clone_dir
clone_dir.mkdir(parents=True, exist_ok=True)
repo_path = f"{owner}/{repo}"
remote_url = self.credentials.get_remote_url(self.base_url, repo_path)
local_path = clone_dir / repo
if local_path.exists():
logger.warning(f"Repository already exists at {local_path}")
return True, local_path # Return success if already exists
cmd = ["git", "clone", remote_url, str(local_path)]
success, stdout, stderr = self._run_git_command(cmd)
if success:
logger.info(f"Successfully cloned {repo} to {local_path}")
self._configure_git_user(local_path)
else:
logger.error(f"Clone failed: {stderr}")
return success, local_path
def _configure_git_user(self, repo_path: Path) -> None:
self._run_git_command(["git", "config", "user.name", "Claude Project Manager"], cwd=repo_path)
self._run_git_command(["git", "config", "user.email", "claude@intelsight.de"], cwd=repo_path)
def fetch(self, repo_path: Path, remote: str = "origin") -> Tuple[bool, str]:
cmd = ["git", "fetch", remote]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def pull(self, repo_path: Path, remote: str = "origin", branch: Optional[str] = None) -> Tuple[bool, str]:
cmd = ["git", "pull", remote]
if branch:
cmd.append(branch)
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def add(self, repo_path: Path, files: List[str] = None) -> Tuple[bool, str]:
if files is None:
files = ["."]
cmd = ["git", "add"] + files
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def commit(self, repo_path: Path, message: str) -> Tuple[bool, str]:
cmd = ["git", "commit", "-m", message]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
if not success and "nothing to commit" in stderr:
return True, "Nothing to commit, working tree clean"
return success, stdout or stderr
def split_large_file(self, file_path: Path, chunk_size_mb: int = 45) -> List[Path]:
"""Split a large file into smaller chunks"""
chunk_size = chunk_size_mb * 1024 * 1024 # Convert to bytes
chunks = []
try:
file_size = file_path.stat().st_size
with open(file_path, 'rb') as f:
chunk_num = 0
while True:
chunk_data = f.read(chunk_size)
if not chunk_data:
break
chunk_path = file_path.parent / f"{file_path.stem}.part{chunk_num:03d}{file_path.suffix}"
with open(chunk_path, 'wb') as chunk_file:
chunk_file.write(chunk_data)
chunks.append(chunk_path)
chunk_num += 1
logger.info(f"Split {file_path.name} into {len(chunks)} chunks")
return chunks
except Exception as e:
logger.error(f"Error splitting file: {e}")
return []
def create_join_script(self, original_file: Path, chunks: List[Path]) -> Path:
"""Create a script to rejoin file chunks"""
script_path = original_file.parent / f"join_{original_file.stem}.bat"
with open(script_path, 'w') as f:
f.write("@echo off\n")
f.write(f"echo Joining {original_file.name}...\n")
f.write(f"copy /b ")
# Add all chunks
for i, chunk in enumerate(chunks):
if i > 0:
f.write("+")
f.write(f'"{chunk.name}"')
f.write(f' "{original_file.name}"\n')
f.write("echo Done!\n")
f.write("pause\n")
# Also create a shell script for Unix
sh_script_path = original_file.parent / f"join_{original_file.stem}.sh"
with open(sh_script_path, 'w') as f:
f.write("#!/bin/bash\n")
f.write(f"echo 'Joining {original_file.name}...'\n")
f.write("cat ")
for chunk in chunks:
f.write(f'"{chunk.name}" ')
f.write(f'> "{original_file.name}"\n')
f.write("echo 'Done!'\n")
return script_path
def check_large_files(self, repo_path: Path, size_limit_mb: int = 100) -> List[Tuple[str, int]]:
"""Check for files larger than size_limit_mb in the repository"""
large_files = []
try:
# Get list of all tracked files
cmd = ["git", "ls-files", "-z"]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
if success and stdout:
files = stdout.split('\0')
for file in files:
if file:
file_path = repo_path / file
if file_path.exists():
size = file_path.stat().st_size
if size > size_limit_mb * 1024 * 1024:
large_files.append((file, size))
except Exception as e:
logger.error(f"Error checking large files: {e}")
return large_files
def push(self, repo_path: Path, remote: str = "origin", branch: Optional[str] = None) -> Tuple[bool, str]:
# First try regular push
cmd = ["git", "push", remote]
if branch:
cmd.append(branch)
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
# If push failed due to no upstream, try with --set-upstream
if not success and ("has no upstream branch" in stderr or "The current branch" in stderr or "--set-upstream" in stderr):
# Get current branch
branch_cmd = ["git", "branch", "--show-current"]
branch_success, current_branch, _ = self._run_git_command(branch_cmd, cwd=repo_path)
if branch_success and current_branch.strip():
# Retry with --set-upstream
cmd = ["git", "push", "--set-upstream", remote, current_branch.strip()]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
# Check for specific errors
if not success:
if "HTTP 413" in stderr or "Request Entity Too Large" in stderr:
# Extract file info from error if possible
import re
file_match = re.search(r'/([^/]+)/(\d+)$', stderr)
file_size = None
if file_match:
file_hash = file_match.group(1)
file_size = int(file_match.group(2))
# Check for large files
large_files = self.check_large_files(repo_path, 50) # Check for files > 50MB
if large_files:
error_msg = "Push fehlgeschlagen: Dateien zu groß für Gitea!\n\n"
error_msg += "Folgende Dateien überschreiten das Limit:\n"
for file, size in large_files[:5]: # Show first 5
size_mb = size / (1024 * 1024)
error_msg += f" - {file} ({size_mb:.1f} MB)\n"
if len(large_files) > 5:
error_msg += f" ... und {len(large_files) - 5} weitere\n"
if file_size:
error_msg += f"\nGitea versucht eine {file_size / (1024*1024):.1f} MB große Datei hochzuladen!\n"
error_msg += "\nLösungen:\n"
error_msg += "1. Große Dateien zur .gitignore hinzufügen\n"
error_msg += "2. Git LFS verwenden (empfohlen für große Dateien)\n"
error_msg += "3. Dateien komprimieren oder entfernen\n\n"
error_msg += "HINWEIS: Git LFS muss auf dem Server aktiviert sein!"
return False, error_msg
else:
return False, f"Push fehlgeschlagen (HTTP 413): Repository oder einzelne Commits zu groß"
elif "Locking support detected" in stderr and "HTTP 413" not in stderr:
# Just LFS warning, not an error
return True, "Push erfolgreich (mit LFS-Warnung)"
# Check if push was successful even with warnings
if success or "Everything up-to-date" in (stdout + stderr):
return True, "Push erfolgreich"
return success, stdout or stderr
def status(self, repo_path: Path) -> Tuple[bool, str]:
cmd = ["git", "status", "--porcelain"]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def diff(self, repo_path: Path, staged: bool = False) -> Tuple[bool, str]:
cmd = ["git", "diff"]
if staged:
cmd.append("--staged")
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def branch(self, repo_path: Path, list_all: bool = False) -> Tuple[bool, str]:
cmd = ["git", "branch"]
if list_all:
cmd.append("-a")
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def checkout(self, repo_path: Path, branch: str, create: bool = False) -> Tuple[bool, str]:
cmd = ["git", "checkout"]
if create:
cmd.append("-b")
cmd.append(branch)
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def merge(self, repo_path: Path, branch: str) -> Tuple[bool, str]:
cmd = ["git", "merge", branch]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def log(self, repo_path: Path, n: int = 10, oneline: bool = True) -> Tuple[bool, str]:
cmd = ["git", "log", f"-{n}"]
if oneline:
cmd.append("--oneline")
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def remote_add(self, repo_path: Path, name: str, url: str) -> Tuple[bool, str]:
cmd = ["git", "remote", "add", name, url]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def remote_remove(self, repo_path: Path, name: str) -> Tuple[bool, str]:
cmd = ["git", "remote", "remove", name]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def remote_list(self, repo_path: Path) -> Tuple[bool, str]:
cmd = ["git", "remote", "-v"]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def stash(self, repo_path: Path, message: Optional[str] = None) -> Tuple[bool, str]:
cmd = ["git", "stash", "push"]
if message:
cmd.extend(["-m", message])
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def stash_pop(self, repo_path: Path) -> Tuple[bool, str]:
cmd = ["git", "stash", "pop"]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def reset(self, repo_path: Path, hard: bool = False, commit: Optional[str] = None) -> Tuple[bool, str]:
cmd = ["git", "reset"]
if hard:
cmd.append("--hard")
if commit:
cmd.append(commit)
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return success, stdout or stderr
def init_repository(self, repo_path: Path) -> Tuple[bool, str]:
"""Initialize a new git repository"""
cmd = ["git", "init"]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
if success:
self._configure_git_user(repo_path)
# Check if .gitattributes exists with LFS entries
gitattributes = repo_path / ".gitattributes"
if gitattributes.exists():
with open(gitattributes, 'r') as f:
content = f.read()
if 'filter=lfs' in content:
logger.info("Git LFS entries found in .gitattributes")
return success, stdout or stderr
def disable_lfs_for_push(self, repo_path: Path) -> Tuple[bool, str]:
"""Temporarily disable LFS for push"""
# Backup .gitattributes if it exists
gitattributes = repo_path / ".gitattributes"
backup_path = repo_path / ".gitattributes.backup"
if gitattributes.exists():
import shutil
shutil.copy2(gitattributes, backup_path)
# Remove LFS entries from .gitattributes
with open(gitattributes, 'r') as f:
lines = f.readlines()
with open(gitattributes, 'w') as f:
for line in lines:
if 'filter=lfs' not in line:
f.write(line)
return True, "LFS temporarily disabled"
return True, "No LFS configuration found"
def restore_lfs_config(self, repo_path: Path) -> None:
"""Restore LFS configuration"""
backup_path = repo_path / ".gitattributes.backup"
gitattributes = repo_path / ".gitattributes"
if backup_path.exists():
import shutil
shutil.move(backup_path, gitattributes)
def add_remote_to_existing_repo(self, repo_path: Path, owner: str, repo_name: str,
remote_name: str = "origin") -> Tuple[bool, str]:
"""Add Gitea remote to an existing local repository"""
repo_path_str = f"{owner}/{repo_name}"
remote_url = self.credentials.get_remote_url(self.base_url, repo_path_str)
# First check if remote already exists
success, remotes, _ = self._run_git_command(["git", "remote"], cwd=repo_path)
if success and remote_name in remotes.split('\n'):
# Remove existing remote
self.remote_remove(repo_path, remote_name)
# Add new remote
return self.remote_add(repo_path, remote_name, remote_url)
def push_existing_repo_to_gitea(self, repo_path: Path, owner: str, repo_name: str,
branch: str = "main", force: bool = False) -> Tuple[bool, str]:
"""Push an existing local repository to Gitea"""
logger.info(f"=== Starting push_existing_repo_to_gitea ===")
logger.info(f"Repo path: {repo_path}")
logger.info(f"Owner: {owner}")
logger.info(f"Repo name: {repo_name}")
logger.info(f"Target branch: {branch}")
# Create debug info file
debug_file = repo_path / "gitea_push_debug.txt"
debug_info = []
debug_info.append(f"Push Debug Info - {datetime.now()}")
debug_info.append(f"Repository: {repo_name}")
debug_info.append(f"Owner: {owner}")
debug_info.append(f"Path: {repo_path}")
# First check current branch
success, current_branch_output, _ = self._run_git_command(
["git", "branch", "--show-current"], cwd=repo_path
)
current_branch = current_branch_output.strip() if success else ""
debug_info.append(f"Current branch: {current_branch}")
# If no current branch (empty repo), use the specified branch
if not current_branch:
current_branch = branch
# Add the remote
success, msg = self.add_remote_to_existing_repo(repo_path, owner, repo_name)
if not success:
error_msg = f"Failed to add remote: {msg}"
debug_info.append(f"ERROR: {error_msg}")
with open(debug_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(debug_info))
return False, error_msg
# Log the remote URL for debugging
success, remotes, _ = self._run_git_command(["git", "remote", "-v"], cwd=repo_path)
if success:
logger.info(f"Git remotes after adding: {remotes}")
debug_info.append(f"Git remotes:\n{remotes}")
# Check git status before push
success, status, _ = self._run_git_command(["git", "status", "--porcelain"], cwd=repo_path)
debug_info.append(f"Git status before push:\n{status if status else 'Clean'}")
# Push the current branch with upstream setting
cmd = ["git", "push", "--set-upstream", "origin", f"{current_branch}:{branch}", "-v"]
if force:
cmd.insert(2, "--force")
logger.info(f"Executing push command: {' '.join(cmd)}")
debug_info.append(f"Push command: {' '.join(cmd)}")
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
logger.info(f"Push command result - Success: {success}")
logger.info(f"Push stdout: {stdout}")
logger.info(f"Push stderr: {stderr}")
debug_info.append(f"Push result: {'Success' if success else 'Failed'}")
debug_info.append(f"Push stdout:\n{stdout}")
debug_info.append(f"Push stderr:\n{stderr}")
# Save debug info
with open(debug_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(debug_info))
# Check if push was successful even with warnings
if success or "Locking support detected" in stderr:
# Also check if objects were written
if "objects" in stderr or "objects" in stdout or success:
# Verify the push by checking the remote
verify_cmd = ["git", "ls-remote", "origin", branch]
verify_success, verify_out, _ = self._run_git_command(verify_cmd, cwd=repo_path)
if verify_success and verify_out.strip():
logger.info(f"Push verified - remote has branch {branch}")
return True, f"Successfully pushed {current_branch} to {branch}\n\nDebug-Datei: {debug_file}"
else:
logger.warning(f"Push reported success but could not verify remote branch")
return True, f"Push completed (verification pending)\n\nDebug-Datei: {debug_file}"
# Check for common push success indicators
if any(indicator in (stdout + stderr).lower() for indicator in ["everything up-to-date", "branch", "->", "create mode", "writing objects"]):
return True, f"Successfully pushed {current_branch} to {branch}\n\nDebug-Datei: {debug_file}"
return False, f"{stderr or stdout}\n\nDebug-Datei: {debug_file}"
def setup_lfs(self, repo_path: Path) -> Tuple[bool, str]:
"""Setup Git LFS for the repository"""
# First check if git-lfs is installed
cmd = ["git", "lfs", "version"]
success, stdout, stderr = self._run_git_command(cmd)
if not success:
return False, "Git LFS ist nicht installiert. Bitte installieren Sie es von: https://git-lfs.github.com/"
# Install LFS hooks in the repository
cmd = ["git", "lfs", "install"]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
if not success:
return False, f"Git LFS Installation fehlgeschlagen: {stderr}"
# Configure LFS for the remote
cmd = ["git", "config", "lfs.https://gitea-undso.intelsight.de/.locksverify", "true"]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
return True, "Git LFS wurde erfolgreich eingerichtet"
def track_with_lfs(self, repo_path: Path, patterns: List[str]) -> Tuple[bool, str]:
"""Track file patterns with Git LFS"""
results = []
for pattern in patterns:
cmd = ["git", "lfs", "track", pattern]
success, stdout, stderr = self._run_git_command(cmd, cwd=repo_path)
if success:
results.append(f"{pattern}")
else:
results.append(f"{pattern}: {stderr}")
# Add .gitattributes to git
cmd = ["git", "add", ".gitattributes"]
self._run_git_command(cmd, cwd=repo_path)
return True, "LFS Tracking konfiguriert:\n" + "\n".join(results)
def migrate_to_lfs(self, repo_path: Path, file_paths: List[str]) -> Tuple[bool, str]:
"""Migrate existing large files to Git LFS"""
# First setup LFS
success, msg = self.setup_lfs(repo_path)
if not success:
return False, msg
# Track the files
patterns = []
for file_path in file_paths:
# Create pattern from file path
if "/" in file_path:
# Track specific file
patterns.append(file_path)
else:
# Track by extension
ext = Path(file_path).suffix
if ext:
patterns.append(f"*{ext}")
if patterns:
success, msg = self.track_with_lfs(repo_path, patterns)
if not success:
return False, msg
# Remove files from git cache and re-add them
for file_path in file_paths:
# Remove from cache
cmd = ["git", "rm", "--cached", file_path]
self._run_git_command(cmd, cwd=repo_path)
# Re-add (will now use LFS)
cmd = ["git", "add", file_path]
self._run_git_command(cmd, cwd=repo_path)
return True, "Dateien wurden zu Git LFS migriert. Bitte committen Sie die Änderungen."

207
src/gitea/gitea_client.py Normale Datei
Datei anzeigen

@ -0,0 +1,207 @@
import requests
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class GiteaConfig:
base_url: str = "https://gitea-undso.intelsight.de"
api_token: str = "3b4a6ba1ade3f34640f3c85d2333b4a3a0627471"
api_version: str = "v1"
username: str = "StuXn3t"
@property
def api_url(self) -> str:
return f"{self.base_url}/api/{self.api_version}"
class GiteaClient:
def __init__(self, config: Optional[GiteaConfig] = None):
self.config = config or GiteaConfig()
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"token {self.config.api_token}",
"Content-Type": "application/json"
})
def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
url = f"{self.config.api_url}/{endpoint}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json() if response.content else {}
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise
def get_user_info(self) -> Dict[str, Any]:
return self._request("GET", "user")
def list_user_organizations(self) -> List[Dict[str, Any]]:
"""List all organizations the user is a member of"""
return self._request("GET", "user/orgs")
def get_user_teams_in_org(self, org_name: str) -> List[Dict[str, Any]]:
"""Get user's teams in a specific organization"""
try:
return self._request("GET", f"user/teams", params={"org": org_name})
except Exception as e:
logger.error(f"Failed to get teams for org {org_name}: {e}")
return []
def list_repositories(self, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
params = {"page": page, "limit": limit}
return self._request("GET", "user/repos", params=params)
def create_repository(self, name: str, description: str = "", private: bool = False,
auto_init: bool = True, gitignores: str = "", license: str = "") -> Dict[str, Any]:
data = {
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"gitignores": gitignores,
"license": license,
"default_branch": "main" # Use main instead of master
}
return self._request("POST", "user/repos", json=data)
def delete_repository(self, owner: str, repo: str) -> None:
self._request("DELETE", f"repos/{owner}/{repo}")
def get_repository(self, owner: str, repo: str) -> Dict[str, Any]:
return self._request("GET", f"repos/{owner}/{repo}")
def fork_repository(self, owner: str, repo: str, organization: Optional[str] = None) -> Dict[str, Any]:
data = {"organization": organization} if organization else {}
return self._request("POST", f"repos/{owner}/{repo}/forks", json=data)
def list_issues(self, owner: str, repo: str, state: str = "open",
labels: Optional[List[str]] = None, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
params = {
"state": state,
"page": page,
"limit": limit
}
if labels:
params["labels"] = ",".join(labels)
return self._request("GET", f"repos/{owner}/{repo}/issues", params=params)
def create_issue(self, owner: str, repo: str, title: str, body: str = "",
assignees: Optional[List[str]] = None, labels: Optional[List[int]] = None) -> Dict[str, Any]:
data = {
"title": title,
"body": body,
"assignees": assignees or [],
"labels": labels or []
}
return self._request("POST", f"repos/{owner}/{repo}/issues", json=data)
def update_issue(self, owner: str, repo: str, index: int, **kwargs) -> Dict[str, Any]:
return self._request("PATCH", f"repos/{owner}/{repo}/issues/{index}", json=kwargs)
def close_issue(self, owner: str, repo: str, index: int) -> Dict[str, Any]:
return self.update_issue(owner, repo, index, state="closed")
def list_pull_requests(self, owner: str, repo: str, state: str = "open",
page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
params = {
"state": state,
"page": page,
"limit": limit
}
return self._request("GET", f"repos/{owner}/{repo}/pulls", params=params)
def create_pull_request(self, owner: str, repo: str, title: str, head: str, base: str,
body: str = "", assignees: Optional[List[str]] = None) -> Dict[str, Any]:
data = {
"title": title,
"head": head,
"base": base,
"body": body,
"assignees": assignees or []
}
return self._request("POST", f"repos/{owner}/{repo}/pulls", json=data)
def merge_pull_request(self, owner: str, repo: str, index: int,
merge_style: str = "merge") -> Dict[str, Any]:
data = {"Do": merge_style}
return self._request("POST", f"repos/{owner}/{repo}/pulls/{index}/merge", json=data)
def list_branches(self, owner: str, repo: str) -> List[Dict[str, Any]]:
return self._request("GET", f"repos/{owner}/{repo}/branches")
def create_branch(self, owner: str, repo: str, branch_name: str,
old_branch_name: str = "main") -> Dict[str, Any]:
data = {
"new_branch_name": branch_name,
"old_branch_name": old_branch_name
}
return self._request("POST", f"repos/{owner}/{repo}/branches", json=data)
def delete_branch(self, owner: str, repo: str, branch_name: str) -> None:
self._request("DELETE", f"repos/{owner}/{repo}/branches/{branch_name}")
def list_releases(self, owner: str, repo: str, page: int = 1, limit: int = 50) -> List[Dict[str, Any]]:
params = {"page": page, "limit": limit}
return self._request("GET", f"repos/{owner}/{repo}/releases", params=params)
def create_release(self, owner: str, repo: str, tag_name: str, name: str,
body: str = "", target: str = "main", draft: bool = False,
prerelease: bool = False) -> Dict[str, Any]:
data = {
"tag_name": tag_name,
"name": name,
"body": body,
"target_commitish": target,
"draft": draft,
"prerelease": prerelease
}
return self._request("POST", f"repos/{owner}/{repo}/releases", json=data)
def list_webhooks(self, owner: str, repo: str) -> List[Dict[str, Any]]:
return self._request("GET", f"repos/{owner}/{repo}/hooks")
def create_webhook(self, owner: str, repo: str, url: str, events: List[str],
active: bool = True) -> Dict[str, Any]:
data = {
"type": "gitea",
"config": {
"url": url,
"content_type": "json"
},
"events": events,
"active": active
}
return self._request("POST", f"repos/{owner}/{repo}/hooks", json=data)
def get_repository_contents(self, owner: str, repo: str, filepath: str = "",
ref: str = "main") -> Dict[str, Any]:
params = {"ref": ref}
return self._request("GET", f"repos/{owner}/{repo}/contents/{filepath}", params=params)
def create_or_update_file(self, owner: str, repo: str, filepath: str, content: str,
message: str, branch: str = "main", sha: Optional[str] = None) -> Dict[str, Any]:
import base64
data = {
"content": base64.b64encode(content.encode()).decode(),
"message": message,
"branch": branch
}
if sha:
data["sha"] = sha
return self._request("PUT", f"repos/{owner}/{repo}/contents/{filepath}", json=data)
def delete_file(self, owner: str, repo: str, filepath: str, message: str,
sha: str, branch: str = "main") -> Dict[str, Any]:
data = {
"message": message,
"sha": sha,
"branch": branch
}
return self._request("DELETE", f"repos/{owner}/{repo}/contents/{filepath}", json=data)

509
src/gitea/gitea_ui.py Normale Datei
Datei anzeigen

@ -0,0 +1,509 @@
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import threading
from pathlib import Path
from typing import Optional, Dict, Any, List
from .repository_manager import RepositoryManager
from .issue_pr_manager import IssuePRManager
class GiteaIntegrationUI:
def __init__(self, parent_frame: tk.Frame):
self.parent = parent_frame
self.repo_manager = RepositoryManager()
self.issue_pr_manager = IssuePRManager(self.repo_manager.client)
self.selected_repo = None
self.selected_repo_path = None
self.setup_ui()
self.refresh_repositories()
def setup_ui(self):
main_paned = ttk.PanedWindow(self.parent, orient=tk.HORIZONTAL)
main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
left_frame = ttk.Frame(main_paned)
self.setup_left_panel(left_frame)
main_paned.add(left_frame, weight=1)
right_frame = ttk.Frame(main_paned)
self.setup_right_panel(right_frame)
main_paned.add(right_frame, weight=2)
def setup_left_panel(self, parent):
ttk.Label(parent, text="Repositories", font=("Arial", 12, "bold")).pack(pady=5)
button_frame = ttk.Frame(parent)
button_frame.pack(fill=tk.X, padx=5)
ttk.Button(button_frame, text="🔄 Refresh", command=self.refresh_repositories).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text=" New", command=self.create_repository_dialog).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text="🗑️ Delete", command=self.delete_repository).pack(side=tk.LEFT, padx=2)
list_frame = ttk.Frame(parent)
list_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
scrollbar = ttk.Scrollbar(list_frame)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.repo_listbox = tk.Listbox(list_frame, yscrollcommand=scrollbar.set)
self.repo_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.repo_listbox.bind('<<ListboxSelect>>', self.on_repo_select)
scrollbar.config(command=self.repo_listbox.yview)
def setup_right_panel(self, parent):
self.notebook = ttk.Notebook(parent)
self.notebook.pack(fill=tk.BOTH, expand=True)
self.git_tab = ttk.Frame(self.notebook)
self.setup_git_operations_tab(self.git_tab)
self.notebook.add(self.git_tab, text="Git Operations")
self.issues_tab = ttk.Frame(self.notebook)
self.setup_issues_tab(self.issues_tab)
self.notebook.add(self.issues_tab, text="Issues")
self.pr_tab = ttk.Frame(self.notebook)
self.setup_pr_tab(self.pr_tab)
self.notebook.add(self.pr_tab, text="Pull Requests")
self.info_tab = ttk.Frame(self.notebook)
self.setup_info_tab(self.info_tab)
self.notebook.add(self.info_tab, text="Repository Info")
def setup_git_operations_tab(self, parent):
button_frame = ttk.LabelFrame(parent, text="Repository Actions", padding=10)
button_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(button_frame, text="📥 Clone", command=self.clone_repository).grid(row=0, column=0, padx=5, pady=2)
ttk.Button(button_frame, text="🔄 Fetch", command=self.fetch_repository).grid(row=0, column=1, padx=5, pady=2)
ttk.Button(button_frame, text="⬇️ Pull", command=self.pull_repository).grid(row=0, column=2, padx=5, pady=2)
ttk.Button(button_frame, text="📊 Status", command=self.show_status).grid(row=0, column=3, padx=5, pady=2)
ttk.Button(button_frame, text=" Add All", command=self.add_all_files).grid(row=1, column=0, padx=5, pady=2)
ttk.Button(button_frame, text="💾 Commit", command=self.commit_dialog).grid(row=1, column=1, padx=5, pady=2)
ttk.Button(button_frame, text="⬆️ Push", command=self.push_repository).grid(row=1, column=2, padx=5, pady=2)
ttk.Button(button_frame, text="🌿 Branches", command=self.show_branches).grid(row=1, column=3, padx=5, pady=2)
output_frame = ttk.LabelFrame(parent, text="Output", padding=10)
output_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.git_output = scrolledtext.ScrolledText(output_frame, height=20, wrap=tk.WORD)
self.git_output.pack(fill=tk.BOTH, expand=True)
def setup_issues_tab(self, parent):
button_frame = ttk.Frame(parent)
button_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(button_frame, text="🔄 Refresh", command=self.refresh_issues).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text=" New Issue", command=self.create_issue_dialog).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text="✏️ Edit", command=self.edit_issue).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text="✅ Close", command=self.close_issue).pack(side=tk.LEFT, padx=2)
columns = ('ID', 'Title', 'State', 'Author', 'Created')
self.issues_tree = ttk.Treeview(parent, columns=columns, show='tree headings')
for col in columns:
self.issues_tree.heading(col, text=col)
self.issues_tree.column(col, width=100)
self.issues_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.issues_tree.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.issues_tree.configure(yscrollcommand=scrollbar.set)
def setup_pr_tab(self, parent):
button_frame = ttk.Frame(parent)
button_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(button_frame, text="🔄 Refresh", command=self.refresh_prs).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text=" New PR", command=self.create_pr_dialog).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text="🔀 Merge", command=self.merge_pr).pack(side=tk.LEFT, padx=2)
ttk.Button(button_frame, text="❌ Close", command=self.close_pr).pack(side=tk.LEFT, padx=2)
columns = ('ID', 'Title', 'State', 'Author', 'Head', 'Base')
self.pr_tree = ttk.Treeview(parent, columns=columns, show='tree headings')
for col in columns:
self.pr_tree.heading(col, text=col)
self.pr_tree.column(col, width=100)
self.pr_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.pr_tree.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.pr_tree.configure(yscrollcommand=scrollbar.set)
def setup_info_tab(self, parent):
self.info_text = scrolledtext.ScrolledText(parent, height=20, wrap=tk.WORD)
self.info_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
def refresh_repositories(self):
def fetch():
try:
repos = self.repo_manager.list_all_repositories()
self.parent.after(0, lambda: self.update_repo_list(repos))
except Exception as e:
self.parent.after(0, lambda: messagebox.showerror("Error", f"Failed to fetch repositories: {e}"))
threading.Thread(target=fetch, daemon=True).start()
def update_repo_list(self, repos):
self.repo_listbox.delete(0, tk.END)
for repo in repos:
display_name = f"{repo['name']} {'🔒' if repo['private'] else '🌐'}"
self.repo_listbox.insert(tk.END, display_name)
self.repositories = repos
def on_repo_select(self, event):
selection = self.repo_listbox.curselection()
if selection:
index = selection[0]
self.selected_repo = self.repositories[index]
self.update_repo_info()
self.refresh_issues()
self.refresh_prs()
def update_repo_info(self):
if not self.selected_repo:
return
info = f"Repository: {self.selected_repo['name']}\n"
info += f"Description: {self.selected_repo.get('description', 'No description')}\n"
info += f"Private: {'Yes' if self.selected_repo['private'] else 'No'}\n"
info += f"Default Branch: {self.selected_repo.get('default_branch', 'main')}\n"
info += f"Clone URL: {self.selected_repo['clone_url']}\n"
info += f"Created: {self.selected_repo['created_at']}\n"
info += f"Updated: {self.selected_repo['updated_at']}\n"
self.info_text.delete(1.0, tk.END)
self.info_text.insert(1.0, info)
def clone_repository(self):
if not self.selected_repo:
messagebox.showwarning("Warning", "Please select a repository")
return
def clone():
try:
success, path = self.repo_manager.clone_repository(self.selected_repo['name'])
if success:
self.selected_repo_path = path
self.parent.after(0, lambda: self.git_output.insert(tk.END, f"Repository cloned to: {path}\n"))
else:
self.parent.after(0, lambda: self.git_output.insert(tk.END, f"Failed to clone repository\n"))
except Exception as e:
self.parent.after(0, lambda: messagebox.showerror("Error", f"Clone failed: {e}"))
threading.Thread(target=clone, daemon=True).start()
def fetch_repository(self):
if not self.selected_repo_path:
messagebox.showwarning("Warning", "Please clone the repository first")
return
success, result = self.repo_manager.git_ops.fetch(self.selected_repo_path)
self.git_output.insert(tk.END, f"Fetch: {result}\n")
def pull_repository(self):
if not self.selected_repo_path:
messagebox.showwarning("Warning", "Please clone the repository first")
return
success, result = self.repo_manager.git_ops.pull(self.selected_repo_path)
self.git_output.insert(tk.END, f"Pull: {result}\n")
def push_repository(self):
if not self.selected_repo_path:
messagebox.showwarning("Warning", "Please clone the repository first")
return
success, result = self.repo_manager.git_ops.push(self.selected_repo_path)
self.git_output.insert(tk.END, f"Push: {result}\n")
def show_status(self):
if not self.selected_repo_path:
messagebox.showwarning("Warning", "Please clone the repository first")
return
status = self.repo_manager.get_repository_status(self.selected_repo_path)
self.git_output.insert(tk.END, f"\n--- Repository Status ---\n")
self.git_output.insert(tk.END, f"Current Branch: {status['current_branch']}\n")
self.git_output.insert(tk.END, f"Has Changes: {status['has_changes']}\n")
self.git_output.insert(tk.END, f"Status:\n{status['status']}\n")
self.git_output.insert(tk.END, f"Remotes:\n{status['remotes']}\n")
def add_all_files(self):
if not self.selected_repo_path:
messagebox.showwarning("Warning", "Please clone the repository first")
return
success, result = self.repo_manager.git_ops.add(self.selected_repo_path)
self.git_output.insert(tk.END, f"Add all: {result}\n")
def commit_dialog(self):
if not self.selected_repo_path:
messagebox.showwarning("Warning", "Please clone the repository first")
return
dialog = tk.Toplevel(self.parent)
dialog.title("Commit Changes")
dialog.geometry("400x200")
ttk.Label(dialog, text="Commit Message:").pack(pady=5)
message_text = tk.Text(dialog, height=5, width=50)
message_text.pack(padx=10, pady=5)
def do_commit():
message = message_text.get(1.0, tk.END).strip()
if not message:
messagebox.showwarning("Warning", "Please enter a commit message")
return
success, result = self.repo_manager.git_ops.commit(self.selected_repo_path, message)
self.git_output.insert(tk.END, f"Commit: {result}\n")
dialog.destroy()
ttk.Button(dialog, text="Commit", command=do_commit).pack(pady=10)
def show_branches(self):
if not self.selected_repo_path:
messagebox.showwarning("Warning", "Please clone the repository first")
return
success, branches = self.repo_manager.git_ops.branch(self.selected_repo_path, list_all=True)
self.git_output.insert(tk.END, f"\n--- Branches ---\n{branches}\n")
def create_repository_dialog(self):
dialog = tk.Toplevel(self.parent)
dialog.title("Create New Repository")
dialog.geometry("400x300")
ttk.Label(dialog, text="Repository Name:").grid(row=0, column=0, sticky=tk.W, padx=10, pady=5)
name_entry = ttk.Entry(dialog, width=30)
name_entry.grid(row=0, column=1, padx=10, pady=5)
ttk.Label(dialog, text="Description:").grid(row=1, column=0, sticky=tk.W, padx=10, pady=5)
desc_text = tk.Text(dialog, height=3, width=30)
desc_text.grid(row=1, column=1, padx=10, pady=5)
private_var = tk.BooleanVar()
ttk.Checkbutton(dialog, text="Private Repository", variable=private_var).grid(row=2, column=1, sticky=tk.W, padx=10, pady=5)
auto_init_var = tk.BooleanVar(value=True)
ttk.Checkbutton(dialog, text="Initialize with README", variable=auto_init_var).grid(row=3, column=1, sticky=tk.W, padx=10, pady=5)
def create():
name = name_entry.get().strip()
desc = desc_text.get(1.0, tk.END).strip()
if not name:
messagebox.showwarning("Warning", "Please enter a repository name")
return
try:
self.repo_manager.create_repository(
name, desc, private_var.get(), auto_init_var.get()
)
messagebox.showinfo("Success", f"Repository '{name}' created successfully")
dialog.destroy()
self.refresh_repositories()
except Exception as e:
messagebox.showerror("Error", f"Failed to create repository: {e}")
ttk.Button(dialog, text="Create", command=create).grid(row=4, column=1, pady=20)
def delete_repository(self):
if not self.selected_repo:
messagebox.showwarning("Warning", "Please select a repository")
return
if messagebox.askyesno("Confirm", f"Are you sure you want to delete '{self.selected_repo['name']}'?"):
if self.repo_manager.delete_repository(self.selected_repo['name']):
messagebox.showinfo("Success", "Repository deleted successfully")
self.refresh_repositories()
def refresh_issues(self):
if not self.selected_repo:
return
self.issues_tree.delete(*self.issues_tree.get_children())
try:
manager = self.issue_pr_manager.get_issue_manager(self.selected_repo['name'])
issues = manager.list_issues()
for issue in issues:
self.issues_tree.insert('', tk.END, values=(
issue['number'],
issue['title'],
issue['state'],
issue['user']['username'],
issue['created_at'][:10]
))
except Exception as e:
print(f"Error fetching issues: {e}")
def refresh_prs(self):
if not self.selected_repo:
return
self.pr_tree.delete(*self.pr_tree.get_children())
try:
manager = self.issue_pr_manager.get_pr_manager(self.selected_repo['name'])
prs = manager.list_pull_requests()
for pr in prs:
self.pr_tree.insert('', tk.END, values=(
pr['number'],
pr['title'],
pr['state'],
pr['user']['username'],
pr['head']['ref'],
pr['base']['ref']
))
except Exception as e:
print(f"Error fetching PRs: {e}")
def create_issue_dialog(self):
if not self.selected_repo:
messagebox.showwarning("Warning", "Please select a repository")
return
dialog = tk.Toplevel(self.parent)
dialog.title("Create New Issue")
dialog.geometry("500x400")
ttk.Label(dialog, text="Title:").grid(row=0, column=0, sticky=tk.W, padx=10, pady=5)
title_entry = ttk.Entry(dialog, width=50)
title_entry.grid(row=0, column=1, padx=10, pady=5)
ttk.Label(dialog, text="Description:").grid(row=1, column=0, sticky=tk.NW, padx=10, pady=5)
desc_text = tk.Text(dialog, height=10, width=50)
desc_text.grid(row=1, column=1, padx=10, pady=5)
def create():
title = title_entry.get().strip()
body = desc_text.get(1.0, tk.END).strip()
if not title:
messagebox.showwarning("Warning", "Please enter a title")
return
try:
manager = self.issue_pr_manager.get_issue_manager(self.selected_repo['name'])
manager.create_issue(title, body)
messagebox.showinfo("Success", "Issue created successfully")
dialog.destroy()
self.refresh_issues()
except Exception as e:
messagebox.showerror("Error", f"Failed to create issue: {e}")
ttk.Button(dialog, text="Create", command=create).grid(row=2, column=1, pady=20)
def edit_issue(self):
messagebox.showinfo("Info", "Edit issue functionality coming soon!")
def close_issue(self):
selection = self.issues_tree.selection()
if not selection:
messagebox.showwarning("Warning", "Please select an issue")
return
item = self.issues_tree.item(selection[0])
issue_id = item['values'][0]
if messagebox.askyesno("Confirm", f"Close issue #{issue_id}?"):
try:
manager = self.issue_pr_manager.get_issue_manager(self.selected_repo['name'])
manager.close_issue(issue_id)
messagebox.showinfo("Success", "Issue closed successfully")
self.refresh_issues()
except Exception as e:
messagebox.showerror("Error", f"Failed to close issue: {e}")
def create_pr_dialog(self):
if not self.selected_repo:
messagebox.showwarning("Warning", "Please select a repository")
return
dialog = tk.Toplevel(self.parent)
dialog.title("Create Pull Request")
dialog.geometry("500x500")
ttk.Label(dialog, text="Title:").grid(row=0, column=0, sticky=tk.W, padx=10, pady=5)
title_entry = ttk.Entry(dialog, width=50)
title_entry.grid(row=0, column=1, padx=10, pady=5)
ttk.Label(dialog, text="Head Branch:").grid(row=1, column=0, sticky=tk.W, padx=10, pady=5)
head_entry = ttk.Entry(dialog, width=50)
head_entry.grid(row=1, column=1, padx=10, pady=5)
ttk.Label(dialog, text="Base Branch:").grid(row=2, column=0, sticky=tk.W, padx=10, pady=5)
base_entry = ttk.Entry(dialog, width=50, )
base_entry.insert(0, "main")
base_entry.grid(row=2, column=1, padx=10, pady=5)
ttk.Label(dialog, text="Description:").grid(row=3, column=0, sticky=tk.NW, padx=10, pady=5)
desc_text = tk.Text(dialog, height=10, width=50)
desc_text.grid(row=3, column=1, padx=10, pady=5)
def create():
title = title_entry.get().strip()
head = head_entry.get().strip()
base = base_entry.get().strip()
body = desc_text.get(1.0, tk.END).strip()
if not all([title, head, base]):
messagebox.showwarning("Warning", "Please fill all required fields")
return
try:
manager = self.issue_pr_manager.get_pr_manager(self.selected_repo['name'])
manager.create_pull_request(title, head, base, body)
messagebox.showinfo("Success", "Pull request created successfully")
dialog.destroy()
self.refresh_prs()
except Exception as e:
messagebox.showerror("Error", f"Failed to create PR: {e}")
ttk.Button(dialog, text="Create", command=create).grid(row=4, column=1, pady=20)
def merge_pr(self):
selection = self.pr_tree.selection()
if not selection:
messagebox.showwarning("Warning", "Please select a pull request")
return
item = self.pr_tree.item(selection[0])
pr_id = item['values'][0]
if messagebox.askyesno("Confirm", f"Merge pull request #{pr_id}?"):
try:
manager = self.issue_pr_manager.get_pr_manager(self.selected_repo['name'])
manager.merge_pull_request(pr_id)
messagebox.showinfo("Success", "Pull request merged successfully")
self.refresh_prs()
except Exception as e:
messagebox.showerror("Error", f"Failed to merge PR: {e}")
def close_pr(self):
selection = self.pr_tree.selection()
if not selection:
messagebox.showwarning("Warning", "Please select a pull request")
return
item = self.pr_tree.item(selection[0])
pr_id = item['values'][0]
if messagebox.askyesno("Confirm", f"Close pull request #{pr_id}?"):
try:
manager = self.issue_pr_manager.get_pr_manager(self.selected_repo['name'])
manager.close_pull_request(pr_id)
messagebox.showinfo("Success", "Pull request closed successfully")
self.refresh_prs()
except Exception as e:
messagebox.showerror("Error", f"Failed to close PR: {e}")

984
src/gitea/gitea_ui_ctk.py Normale Datei
Datei anzeigen

@ -0,0 +1,984 @@
import customtkinter as ctk
from tkinter import messagebox, filedialog
import threading
from pathlib import Path
from typing import Optional, Dict, Any, List
from .repository_manager import RepositoryManager
from .issue_pr_manager import IssuePRManager
class GiteaIntegrationUI:
def __init__(self, parent_window):
self.window = parent_window
self.repo_manager = RepositoryManager()
self.issue_pr_manager = IssuePRManager(self.repo_manager.client)
self.selected_repo = None
self.selected_repo_path = None
# Import styles from main app
from gui.styles import COLORS, FONTS
self.COLORS = COLORS
self.FONTS = FONTS
self.setup_ui()
self.refresh_repositories()
def setup_ui(self):
# Main container
main_frame = ctk.CTkFrame(self.window, fg_color=self.COLORS['bg_primary'])
main_frame.pack(fill="both", expand=True, padx=10, pady=10)
# Left panel - Repository list
left_frame = ctk.CTkFrame(main_frame, width=300, fg_color=self.COLORS['bg_secondary'])
left_frame.pack(side="left", fill="y", padx=(0, 10))
left_frame.pack_propagate(False)
# Repository list header
header_frame = ctk.CTkFrame(left_frame, fg_color="transparent", height=50)
header_frame.pack(fill="x", padx=10, pady=(10, 0))
header_frame.pack_propagate(False)
ctk.CTkLabel(
header_frame,
text="Repositories",
font=self.FONTS['title'],
text_color=self.COLORS['text_primary']
).pack(side="left", pady=10)
# Buttons
button_frame = ctk.CTkFrame(left_frame, fg_color="transparent")
button_frame.pack(fill="x", padx=10, pady=5)
ctk.CTkButton(
button_frame,
text="🔄 Refresh",
command=self.refresh_repositories,
width=100,
height=30,
fg_color=self.COLORS['accent_primary'],
hover_color=self.COLORS['accent_hover']
).pack(side="left", padx=2)
ctk.CTkButton(
button_frame,
text=" New Repo",
command=self.create_repository_dialog,
width=100,
height=30,
fg_color=self.COLORS['accent_success'],
hover_color=self.COLORS['accent_hover']
).pack(side="left", padx=2)
ctk.CTkButton(
button_frame,
text="🗑️ Delete",
command=self.delete_repository,
width=100,
height=30,
fg_color=self.COLORS['accent_error'],
hover_color=self.COLORS['accent_hover']
).pack(side="left", padx=2)
ctk.CTkButton(
button_frame,
text="📤 Push Local",
command=self.push_local_repo_dialog,
width=100,
height=30,
fg_color=self.COLORS['accent_warning'],
hover_color=self.COLORS['accent_hover']
).pack(side="left", padx=2)
# Repository list
self.repo_list_frame = ctk.CTkScrollableFrame(
left_frame,
fg_color=self.COLORS['bg_primary']
)
self.repo_list_frame.pack(fill="both", expand=True, padx=10, pady=(0, 10))
# Right panel - Tab view
right_frame = ctk.CTkFrame(main_frame, fg_color=self.COLORS['bg_secondary'])
right_frame.pack(side="right", fill="both", expand=True)
self.tabview = ctk.CTkTabview(right_frame, fg_color=self.COLORS['bg_secondary'])
self.tabview.pack(fill="both", expand=True, padx=10, pady=10)
# Create tabs
self.tabview.add("Git Operations")
self.tabview.add("Issues")
self.tabview.add("Pull Requests")
self.tabview.add("Repository Info")
self.setup_git_operations_tab()
self.setup_issues_tab()
self.setup_pr_tab()
self.setup_info_tab()
def setup_git_operations_tab(self):
tab = self.tabview.tab("Git Operations")
# Operations buttons
operations_frame = ctk.CTkFrame(tab, fg_color=self.COLORS['bg_tile'])
operations_frame.pack(fill="x", padx=10, pady=10)
# Row 1
row1 = ctk.CTkFrame(operations_frame, fg_color="transparent")
row1.pack(fill="x", padx=10, pady=5)
ctk.CTkButton(
row1, text="📥 Clone", command=self.clone_repository,
fg_color=self.COLORS['accent_primary'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
row1, text="🔄 Fetch", command=self.fetch_repository,
fg_color=self.COLORS['accent_primary'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
row1, text="⬇️ Pull", command=self.pull_repository,
fg_color=self.COLORS['accent_primary'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
row1, text="📊 Status", command=self.show_status,
fg_color=self.COLORS['accent_secondary'], width=100
).pack(side="left", padx=5)
# Row 2
row2 = ctk.CTkFrame(operations_frame, fg_color="transparent")
row2.pack(fill="x", padx=10, pady=5)
ctk.CTkButton(
row2, text=" Add All", command=self.add_all_files,
fg_color=self.COLORS['accent_success'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
row2, text="💾 Commit", command=self.commit_dialog,
fg_color=self.COLORS['accent_success'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
row2, text="⬆️ Push", command=self.push_repository,
fg_color=self.COLORS['accent_success'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
row2, text="🌿 Branches", command=self.show_branches,
fg_color=self.COLORS['accent_secondary'], width=100
).pack(side="left", padx=5)
# Output area
output_label = ctk.CTkLabel(
tab, text="Output:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
)
output_label.pack(anchor="w", padx=10, pady=(10, 5))
self.git_output = ctk.CTkTextbox(
tab,
fg_color=self.COLORS['bg_primary'],
text_color=self.COLORS['text_primary'],
font=self.FONTS['code']
)
self.git_output.pack(fill="both", expand=True, padx=10, pady=(0, 10))
def setup_issues_tab(self):
tab = self.tabview.tab("Issues")
# Issue buttons
button_frame = ctk.CTkFrame(tab, fg_color="transparent")
button_frame.pack(fill="x", padx=10, pady=10)
ctk.CTkButton(
button_frame, text="🔄 Refresh", command=self.refresh_issues,
fg_color=self.COLORS['accent_primary'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
button_frame, text=" New Issue", command=self.create_issue_dialog,
fg_color=self.COLORS['accent_success'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
button_frame, text="✅ Close", command=self.close_issue,
fg_color=self.COLORS['accent_error'], width=100
).pack(side="left", padx=5)
# Issues list
self.issues_frame = ctk.CTkScrollableFrame(
tab,
fg_color=self.COLORS['bg_primary']
)
self.issues_frame.pack(fill="both", expand=True, padx=10, pady=(0, 10))
self.issue_widgets = []
def setup_pr_tab(self):
tab = self.tabview.tab("Pull Requests")
# PR buttons
button_frame = ctk.CTkFrame(tab, fg_color="transparent")
button_frame.pack(fill="x", padx=10, pady=10)
ctk.CTkButton(
button_frame, text="🔄 Refresh", command=self.refresh_prs,
fg_color=self.COLORS['accent_primary'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
button_frame, text=" New PR", command=self.create_pr_dialog,
fg_color=self.COLORS['accent_success'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
button_frame, text="🔀 Merge", command=self.merge_pr,
fg_color=self.COLORS['accent_success'], width=100
).pack(side="left", padx=5)
ctk.CTkButton(
button_frame, text="❌ Close", command=self.close_pr,
fg_color=self.COLORS['accent_error'], width=100
).pack(side="left", padx=5)
# PR list
self.pr_frame = ctk.CTkScrollableFrame(
tab,
fg_color=self.COLORS['bg_primary']
)
self.pr_frame.pack(fill="both", expand=True, padx=10, pady=(0, 10))
self.pr_widgets = []
def setup_info_tab(self):
tab = self.tabview.tab("Repository Info")
self.info_text = ctk.CTkTextbox(
tab,
fg_color=self.COLORS['bg_primary'],
text_color=self.COLORS['text_primary'],
font=self.FONTS['body']
)
self.info_text.pack(fill="both", expand=True, padx=10, pady=10)
def refresh_repositories(self):
def fetch():
try:
repos = self.repo_manager.list_all_repositories()
self.window.after(0, lambda: self.update_repo_list(repos))
except Exception as e:
self.window.after(0, lambda: self.show_error(f"Failed to fetch repositories: {e}"))
threading.Thread(target=fetch, daemon=True).start()
def update_repo_list(self, repos):
# Clear existing
for widget in self.repo_list_frame.winfo_children():
widget.destroy()
self.repo_buttons = {}
self.repositories = repos
for repo in repos:
btn = ctk.CTkButton(
self.repo_list_frame,
text=f"{'🔒' if repo['private'] else '🌐'} {repo['name']}",
command=lambda r=repo: self.select_repository(r),
fg_color=self.COLORS['bg_tile'],
hover_color=self.COLORS['bg_tile_hover'],
text_color=self.COLORS['text_primary'],
anchor="w"
)
btn.pack(fill="x", pady=2)
self.repo_buttons[repo['id']] = btn
def select_repository(self, repo):
self.selected_repo = repo
# Update button colors
for repo_id, btn in self.repo_buttons.items():
if repo_id == repo['id']:
btn.configure(fg_color=self.COLORS['accent_primary'])
else:
btn.configure(fg_color=self.COLORS['bg_tile'])
self.update_repo_info()
self.refresh_issues()
self.refresh_prs()
def update_repo_info(self):
if not self.selected_repo:
return
info = f"Repository: {self.selected_repo['name']}\n"
info += f"Description: {self.selected_repo.get('description', 'No description')}\n"
info += f"Private: {'Yes' if self.selected_repo['private'] else 'No'}\n"
info += f"Default Branch: {self.selected_repo.get('default_branch', 'main')}\n"
info += f"Clone URL: {self.selected_repo['clone_url']}\n"
info += f"Created: {self.selected_repo['created_at']}\n"
info += f"Updated: {self.selected_repo['updated_at']}\n"
self.info_text.delete("0.0", "end")
self.info_text.insert("0.0", info)
def clone_repository(self):
if not self.selected_repo:
self.show_warning("Please select a repository")
return
def clone():
try:
success, path = self.repo_manager.clone_repository(self.selected_repo['name'])
if success:
self.selected_repo_path = path
self.window.after(0, lambda: self.append_output(f"Repository cloned to: {path}\n"))
else:
self.window.after(0, lambda: self.append_output(f"Failed to clone repository\n"))
except Exception as e:
self.window.after(0, lambda: self.show_error(f"Clone failed: {e}"))
threading.Thread(target=clone, daemon=True).start()
def fetch_repository(self):
if not self.selected_repo_path:
self.show_warning("Please clone the repository first")
return
success, result = self.repo_manager.git_ops.fetch(self.selected_repo_path)
self.append_output(f"Fetch: {result}\n")
def pull_repository(self):
if not self.selected_repo_path:
self.show_warning("Please clone the repository first")
return
success, result = self.repo_manager.git_ops.pull(self.selected_repo_path)
self.append_output(f"Pull: {result}\n")
def push_repository(self):
if not self.selected_repo_path:
self.show_warning("Please clone the repository first")
return
success, result = self.repo_manager.git_ops.push(self.selected_repo_path)
self.append_output(f"Push: {result}\n")
def show_status(self):
if not self.selected_repo_path:
self.show_warning("Please clone the repository first")
return
status = self.repo_manager.get_repository_status(self.selected_repo_path)
self.append_output(f"\n--- Repository Status ---\n")
self.append_output(f"Current Branch: {status['current_branch']}\n")
self.append_output(f"Has Changes: {status['has_changes']}\n")
self.append_output(f"Status:\n{status['status']}\n")
self.append_output(f"Remotes:\n{status['remotes']}\n")
def add_all_files(self):
if not self.selected_repo_path:
self.show_warning("Please clone the repository first")
return
success, result = self.repo_manager.git_ops.add(self.selected_repo_path)
self.append_output(f"Add all: {result}\n")
def commit_dialog(self):
if not self.selected_repo_path:
self.show_warning("Please clone the repository first")
return
dialog = ctk.CTkToplevel(self.window)
dialog.title("Commit Changes")
dialog.geometry("500x300")
dialog.configure(fg_color=self.COLORS['bg_primary'])
# Center dialog
dialog.transient(self.window)
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() - 500) // 2
y = (dialog.winfo_screenheight() - 300) // 2
dialog.geometry(f"500x300+{x}+{y}")
ctk.CTkLabel(
dialog,
text="Commit Message:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 10))
message_text = ctk.CTkTextbox(
dialog,
height=150,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
message_text.pack(padx=20, pady=10, fill="both", expand=True)
def do_commit():
message = message_text.get("0.0", "end").strip()
if not message:
self.show_warning("Please enter a commit message")
return
success, result = self.repo_manager.git_ops.commit(self.selected_repo_path, message)
self.append_output(f"Commit: {result}\n")
dialog.destroy()
ctk.CTkButton(
dialog,
text="Commit",
command=do_commit,
fg_color=self.COLORS['accent_success']
).pack(pady=20)
def show_branches(self):
if not self.selected_repo_path:
self.show_warning("Please clone the repository first")
return
success, branches = self.repo_manager.git_ops.branch(self.selected_repo_path, list_all=True)
self.append_output(f"\n--- Branches ---\n{branches}\n")
def create_repository_dialog(self):
dialog = ctk.CTkToplevel(self.window)
dialog.title("Create New Repository")
dialog.geometry("500x400")
dialog.configure(fg_color=self.COLORS['bg_primary'])
# Center dialog
dialog.transient(self.window)
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() - 500) // 2
y = (dialog.winfo_screenheight() - 400) // 2
dialog.geometry(f"500x400+{x}+{y}")
# Name
ctk.CTkLabel(
dialog, text="Repository Name:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
name_entry = ctk.CTkEntry(
dialog, width=400,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
name_entry.pack(pady=5)
# Description
ctk.CTkLabel(
dialog, text="Description:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
desc_text = ctk.CTkTextbox(
dialog, height=100, width=400,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
desc_text.pack(pady=5)
# Options
private_var = ctk.BooleanVar()
ctk.CTkCheckBox(
dialog, text="Private Repository",
variable=private_var,
text_color=self.COLORS['text_primary']
).pack(pady=10)
auto_init_var = ctk.BooleanVar(value=True)
ctk.CTkCheckBox(
dialog, text="Initialize with README",
variable=auto_init_var,
text_color=self.COLORS['text_primary']
).pack(pady=5)
def create():
name = name_entry.get().strip()
desc = desc_text.get("0.0", "end").strip()
if not name:
self.show_warning("Please enter a repository name")
return
try:
self.repo_manager.create_repository(
name, desc, private_var.get(), auto_init_var.get()
)
self.show_info(f"Repository '{name}' created successfully")
dialog.destroy()
self.refresh_repositories()
except Exception as e:
self.show_error(f"Failed to create repository: {e}")
ctk.CTkButton(
dialog, text="Create", command=create,
fg_color=self.COLORS['accent_success']
).pack(pady=20)
def delete_repository(self):
if not self.selected_repo:
self.show_warning("Please select a repository")
return
if messagebox.askyesno("Confirm", f"Are you sure you want to delete '{self.selected_repo['name']}'?"):
if self.repo_manager.delete_repository(self.selected_repo['name']):
self.show_info("Repository deleted successfully")
self.refresh_repositories()
def refresh_issues(self):
if not self.selected_repo:
return
# Clear existing
for widget in self.issue_widgets:
widget.destroy()
self.issue_widgets.clear()
try:
manager = self.issue_pr_manager.get_issue_manager(self.selected_repo['name'])
issues = manager.list_issues()
for issue in issues:
frame = ctk.CTkFrame(self.issues_frame, fg_color=self.COLORS['bg_tile'])
frame.pack(fill="x", pady=5)
# Issue info
info_frame = ctk.CTkFrame(frame, fg_color="transparent")
info_frame.pack(fill="x", padx=10, pady=10)
ctk.CTkLabel(
info_frame,
text=f"#{issue['number']}: {issue['title']}",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(anchor="w")
ctk.CTkLabel(
info_frame,
text=f"State: {issue['state']} | Author: {issue['user']['username']} | Created: {issue['created_at'][:10]}",
font=self.FONTS['small'],
text_color=self.COLORS['text_secondary']
).pack(anchor="w")
self.issue_widgets.append(frame)
except Exception as e:
print(f"Error fetching issues: {e}")
def refresh_prs(self):
if not self.selected_repo:
return
# Clear existing
for widget in self.pr_widgets:
widget.destroy()
self.pr_widgets.clear()
try:
manager = self.issue_pr_manager.get_pr_manager(self.selected_repo['name'])
prs = manager.list_pull_requests()
for pr in prs:
frame = ctk.CTkFrame(self.pr_frame, fg_color=self.COLORS['bg_tile'])
frame.pack(fill="x", pady=5)
# PR info
info_frame = ctk.CTkFrame(frame, fg_color="transparent")
info_frame.pack(fill="x", padx=10, pady=10)
ctk.CTkLabel(
info_frame,
text=f"#{pr['number']}: {pr['title']}",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(anchor="w")
ctk.CTkLabel(
info_frame,
text=f"State: {pr['state']} | {pr['head']['ref']}{pr['base']['ref']} | Author: {pr['user']['username']}",
font=self.FONTS['small'],
text_color=self.COLORS['text_secondary']
).pack(anchor="w")
self.pr_widgets.append(frame)
except Exception as e:
print(f"Error fetching PRs: {e}")
def create_issue_dialog(self):
if not self.selected_repo:
self.show_warning("Please select a repository")
return
dialog = ctk.CTkToplevel(self.window)
dialog.title("Create New Issue")
dialog.geometry("600x500")
dialog.configure(fg_color=self.COLORS['bg_primary'])
# Center dialog
dialog.transient(self.window)
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() - 600) // 2
y = (dialog.winfo_screenheight() - 500) // 2
dialog.geometry(f"600x500+{x}+{y}")
# Title
ctk.CTkLabel(
dialog, text="Title:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
title_entry = ctk.CTkEntry(
dialog, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
title_entry.pack(pady=5)
# Description
ctk.CTkLabel(
dialog, text="Description:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
desc_text = ctk.CTkTextbox(
dialog, height=250, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
desc_text.pack(pady=5)
def create():
title = title_entry.get().strip()
body = desc_text.get("0.0", "end").strip()
if not title:
self.show_warning("Please enter a title")
return
try:
manager = self.issue_pr_manager.get_issue_manager(self.selected_repo['name'])
manager.create_issue(title, body)
self.show_info("Issue created successfully")
dialog.destroy()
self.refresh_issues()
except Exception as e:
self.show_error(f"Failed to create issue: {e}")
ctk.CTkButton(
dialog, text="Create", command=create,
fg_color=self.COLORS['accent_success']
).pack(pady=20)
def close_issue(self):
# TODO: Need to implement issue selection first
self.show_info("Please select an issue from the list to close")
def create_pr_dialog(self):
if not self.selected_repo:
self.show_warning("Please select a repository")
return
dialog = ctk.CTkToplevel(self.window)
dialog.title("Create Pull Request")
dialog.geometry("600x600")
dialog.configure(fg_color=self.COLORS['bg_primary'])
# Center dialog
dialog.transient(self.window)
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() - 600) // 2
y = (dialog.winfo_screenheight() - 600) // 2
dialog.geometry(f"600x600+{x}+{y}")
# Title
ctk.CTkLabel(
dialog, text="Title:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
title_entry = ctk.CTkEntry(
dialog, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
title_entry.pack(pady=5)
# Branches
ctk.CTkLabel(
dialog, text="Head Branch:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
head_entry = ctk.CTkEntry(
dialog, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
head_entry.pack(pady=5)
ctk.CTkLabel(
dialog, text="Base Branch:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
base_entry = ctk.CTkEntry(
dialog, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
base_entry.insert(0, "main")
base_entry.pack(pady=5)
# Description
ctk.CTkLabel(
dialog, text="Description:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
desc_text = ctk.CTkTextbox(
dialog, height=200, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
desc_text.pack(pady=5)
def create():
title = title_entry.get().strip()
head = head_entry.get().strip()
base = base_entry.get().strip()
body = desc_text.get("0.0", "end").strip()
if not all([title, head, base]):
self.show_warning("Please fill all required fields")
return
try:
manager = self.issue_pr_manager.get_pr_manager(self.selected_repo['name'])
manager.create_pull_request(title, head, base, body)
self.show_info("Pull request created successfully")
dialog.destroy()
self.refresh_prs()
except Exception as e:
self.show_error(f"Failed to create PR: {e}")
ctk.CTkButton(
dialog, text="Create", command=create,
fg_color=self.COLORS['accent_success']
).pack(pady=20)
def merge_pr(self):
# TODO: Need to implement PR selection first
self.show_info("Please select a pull request from the list to merge")
def close_pr(self):
# TODO: Need to implement PR selection first
self.show_info("Please select a pull request from the list to close")
# Helper methods
def append_output(self, text):
self.git_output.insert("end", text)
self.git_output.see("end")
def show_error(self, message):
messagebox.showerror("Error", message)
def show_warning(self, message):
messagebox.showwarning("Warning", message)
def show_info(self, message):
messagebox.showinfo("Info", message)
def push_local_repo_dialog(self, project_name=None, project_path=None):
"""Dialog to push an existing local repository to Gitea"""
dialog = ctk.CTkToplevel(self.window)
dialog.title("Push Local Repository to Gitea")
dialog.geometry("600x600")
dialog.configure(fg_color=self.COLORS['bg_primary'])
# Center dialog
dialog.transient(self.window)
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() - 600) // 2
y = (dialog.winfo_screenheight() - 600) // 2
dialog.geometry(f"600x600+{x}+{y}")
# Add description label
desc_label = ctk.CTkLabel(
dialog,
text="Push an existing local Git repository to your Gitea server",
font=self.FONTS['body'],
text_color=self.COLORS['text_secondary']
)
desc_label.pack(pady=(10, 5))
# Local repository path
ctk.CTkLabel(
dialog, text="Local Repository Path:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
path_frame = ctk.CTkFrame(dialog, fg_color="transparent")
path_frame.pack(pady=5, padx=20, fill="x")
path_var = ctk.StringVar()
path_entry = ctk.CTkEntry(
path_frame, width=400,
textvariable=path_var,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
path_entry.pack(side="left", padx=(0, 10))
def browse_folder():
from tkinter import filedialog
folder = filedialog.askdirectory()
if folder:
path_var.set(folder)
# Update repo name field with folder name if empty
if not name_entry.get().strip():
import os
folder_name = os.path.basename(folder)
name_entry.delete(0, 'end')
name_entry.insert(0, folder_name)
# Set initial values if provided
if project_path:
path_var.set(project_path)
ctk.CTkButton(
path_frame, text="Browse", command=browse_folder,
fg_color=self.COLORS['accent_primary'], width=100
).pack(side="left")
# Repository name
ctk.CTkLabel(
dialog, text="Repository Name:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
name_entry = ctk.CTkEntry(
dialog, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
name_entry.pack(pady=5, padx=20)
# Set initial name if provided
if project_name:
name_entry.insert(0, project_name)
# Description
ctk.CTkLabel(
dialog, text="Description:",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(20, 5))
desc_text = ctk.CTkTextbox(
dialog, height=100, width=500,
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
desc_text.pack(pady=5, padx=20)
# Options
private_var = ctk.BooleanVar()
ctk.CTkCheckBox(
dialog, text="Private Repository",
variable=private_var,
text_color=self.COLORS['text_primary']
).pack(pady=10)
# Branch name
ctk.CTkLabel(
dialog, text="Branch to push (default: main):",
font=self.FONTS['subtitle'],
text_color=self.COLORS['text_primary']
).pack(pady=(10, 5))
branch_entry = ctk.CTkEntry(
dialog, width=200,
placeholder_text="main",
fg_color=self.COLORS['bg_secondary'],
text_color=self.COLORS['text_primary']
)
branch_entry.pack(pady=5)
# Status label for feedback
status_label = ctk.CTkLabel(
dialog, text="",
font=self.FONTS['body'],
text_color=self.COLORS['text_secondary']
)
status_label.pack(pady=10)
def push_repo():
local_path = path_var.get().strip()
repo_name = name_entry.get().strip()
desc = desc_text.get("0.0", "end").strip()
branch = branch_entry.get().strip() or "main"
if not local_path:
self.show_warning("Please select a local repository path")
return
if not repo_name:
self.show_warning("Please enter a repository name")
return
# Check if it's a valid git repository
from pathlib import Path
repo_path = Path(local_path)
if not (repo_path / ".git").exists():
self.show_warning("Selected path is not a git repository")
return
def do_push():
try:
# Update status
self.window.after(0, lambda: status_label.configure(
text="Creating repository on Gitea...",
text_color=self.COLORS['accent_primary']
))
success, message = self.repo_manager.push_local_repo_to_gitea(
repo_path, repo_name, desc, private_var.get(), branch
)
if success:
self.window.after(0, lambda: self.show_info(message))
self.window.after(0, dialog.destroy)
self.window.after(0, self.refresh_repositories)
else:
self.window.after(0, lambda: status_label.configure(
text=f"Error: {message}",
text_color=self.COLORS['accent_error']
))
except Exception as e:
self.window.after(0, lambda: status_label.configure(
text=f"Error: {str(e)}",
text_color=self.COLORS['accent_error']
))
# Run in thread to avoid blocking UI
import threading
threading.Thread(target=do_push, daemon=True).start()
ctk.CTkButton(
dialog, text="Push to Gitea", command=push_repo,
fg_color=self.COLORS['accent_success']
).pack(pady=20)

299
src/gitea/issue_pr_manager.py Normale Datei
Datei anzeigen

@ -0,0 +1,299 @@
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime
from .gitea_client import GiteaClient
logger = logging.getLogger(__name__)
class IssueManager:
def __init__(self, gitea_client: GiteaClient, owner: str, repo: str):
self.client = gitea_client
self.owner = owner
self.repo = repo
def list_issues(self, state: str = "open", labels: Optional[List[str]] = None) -> List[Dict[str, Any]]:
all_issues = []
page = 1
while True:
issues = self.client.list_issues(
self.owner, self.repo, state=state, labels=labels, page=page
)
if not issues:
break
all_issues.extend(issues)
page += 1
return all_issues
def create_issue(self, title: str, body: str = "",
assignees: Optional[List[str]] = None,
labels: Optional[List[int]] = None) -> Optional[Dict[str, Any]]:
try:
issue = self.client.create_issue(
self.owner, self.repo, title, body, assignees, labels
)
logger.info(f"Issue '{title}' created successfully")
return issue
except Exception as e:
logger.error(f"Failed to create issue: {e}")
return None
def update_issue(self, index: int, **kwargs) -> Optional[Dict[str, Any]]:
try:
issue = self.client.update_issue(self.owner, self.repo, index, **kwargs)
logger.info(f"Issue #{index} updated successfully")
return issue
except Exception as e:
logger.error(f"Failed to update issue #{index}: {e}")
return None
def close_issue(self, index: int) -> bool:
try:
self.client.close_issue(self.owner, self.repo, index)
logger.info(f"Issue #{index} closed successfully")
return True
except Exception as e:
logger.error(f"Failed to close issue #{index}: {e}")
return False
def add_comment(self, index: int, comment: str) -> Optional[Dict[str, Any]]:
try:
data = {"body": comment}
comment_data = self.client._request(
"POST",
f"repos/{self.owner}/{self.repo}/issues/{index}/comments",
json=data
)
logger.info(f"Comment added to issue #{index}")
return comment_data
except Exception as e:
logger.error(f"Failed to add comment to issue #{index}: {e}")
return None
def get_issue_comments(self, index: int) -> List[Dict[str, Any]]:
try:
return self.client._request(
"GET",
f"repos/{self.owner}/{self.repo}/issues/{index}/comments"
)
except Exception as e:
logger.error(f"Failed to get comments for issue #{index}: {e}")
return []
def search_issues(self, query: str) -> List[Dict[str, Any]]:
all_issues = self.list_issues(state="all")
query_lower = query.lower()
return [
issue for issue in all_issues
if query_lower in issue["title"].lower() or
query_lower in issue.get("body", "").lower()
]
class PullRequestManager:
def __init__(self, gitea_client: GiteaClient, owner: str, repo: str):
self.client = gitea_client
self.owner = owner
self.repo = repo
def list_pull_requests(self, state: str = "open") -> List[Dict[str, Any]]:
all_prs = []
page = 1
while True:
prs = self.client.list_pull_requests(
self.owner, self.repo, state=state, page=page
)
if not prs:
break
all_prs.extend(prs)
page += 1
return all_prs
def create_pull_request(self, title: str, head: str, base: str,
body: str = "", assignees: Optional[List[str]] = None) -> Optional[Dict[str, Any]]:
try:
pr = self.client.create_pull_request(
self.owner, self.repo, title, head, base, body, assignees
)
logger.info(f"Pull request '{title}' created successfully")
return pr
except Exception as e:
logger.error(f"Failed to create pull request: {e}")
return None
def update_pull_request(self, index: int, **kwargs) -> Optional[Dict[str, Any]]:
try:
pr = self.client._request(
"PATCH",
f"repos/{self.owner}/{self.repo}/pulls/{index}",
json=kwargs
)
logger.info(f"Pull request #{index} updated successfully")
return pr
except Exception as e:
logger.error(f"Failed to update pull request #{index}: {e}")
return None
def merge_pull_request(self, index: int, merge_style: str = "merge") -> bool:
try:
self.client.merge_pull_request(self.owner, self.repo, index, merge_style)
logger.info(f"Pull request #{index} merged successfully")
return True
except Exception as e:
logger.error(f"Failed to merge pull request #{index}: {e}")
return False
def close_pull_request(self, index: int) -> bool:
return self.update_pull_request(index, state="closed") is not None
def get_pull_request_diff(self, index: int) -> Optional[str]:
try:
response = self.client.session.get(
f"{self.client.config.api_url}/repos/{self.owner}/{self.repo}/pulls/{index}.diff",
headers={"Accept": "text/plain"}
)
response.raise_for_status()
return response.text
except Exception as e:
logger.error(f"Failed to get diff for PR #{index}: {e}")
return None
def get_pull_request_commits(self, index: int) -> List[Dict[str, Any]]:
try:
return self.client._request(
"GET",
f"repos/{self.owner}/{self.repo}/pulls/{index}/commits"
)
except Exception as e:
logger.error(f"Failed to get commits for PR #{index}: {e}")
return []
def add_comment(self, index: int, comment: str) -> Optional[Dict[str, Any]]:
try:
data = {"body": comment}
comment_data = self.client._request(
"POST",
f"repos/{self.owner}/{self.repo}/pulls/{index}/reviews",
json=data
)
logger.info(f"Comment added to pull request #{index}")
return comment_data
except Exception as e:
logger.error(f"Failed to add comment to PR #{index}: {e}")
return None
def get_pull_request_reviews(self, index: int) -> List[Dict[str, Any]]:
try:
return self.client._request(
"GET",
f"repos/{self.owner}/{self.repo}/pulls/{index}/reviews"
)
except Exception as e:
logger.error(f"Failed to get reviews for PR #{index}: {e}")
return []
def approve_pull_request(self, index: int, comment: str = "") -> Optional[Dict[str, Any]]:
try:
data = {
"body": comment,
"event": "APPROVE"
}
review = self.client._request(
"POST",
f"repos/{self.owner}/{self.repo}/pulls/{index}/reviews",
json=data
)
logger.info(f"Pull request #{index} approved")
return review
except Exception as e:
logger.error(f"Failed to approve PR #{index}: {e}")
return None
def request_changes(self, index: int, comment: str) -> Optional[Dict[str, Any]]:
try:
data = {
"body": comment,
"event": "REQUEST_CHANGES"
}
review = self.client._request(
"POST",
f"repos/{self.owner}/{self.repo}/pulls/{index}/reviews",
json=data
)
logger.info(f"Changes requested for PR #{index}")
return review
except Exception as e:
logger.error(f"Failed to request changes for PR #{index}: {e}")
return None
class IssuePRManager:
def __init__(self, gitea_client: Optional[GiteaClient] = None):
self.client = gitea_client or GiteaClient()
self._current_user = None
self._issue_managers = {}
self._pr_managers = {}
@property
def current_user(self) -> Dict[str, Any]:
if self._current_user is None:
self._current_user = self.client.get_user_info()
return self._current_user
def get_issue_manager(self, repo_name: str) -> IssueManager:
if repo_name not in self._issue_managers:
owner = self.current_user["username"]
self._issue_managers[repo_name] = IssueManager(self.client, owner, repo_name)
return self._issue_managers[repo_name]
def get_pr_manager(self, repo_name: str) -> PullRequestManager:
if repo_name not in self._pr_managers:
owner = self.current_user["username"]
self._pr_managers[repo_name] = PullRequestManager(self.client, owner, repo_name)
return self._pr_managers[repo_name]
def list_all_issues(self, state: str = "open") -> Dict[str, List[Dict[str, Any]]]:
repos = self.client.list_repositories()
all_issues = {}
for repo in repos:
repo_name = repo["name"]
manager = self.get_issue_manager(repo_name)
issues = manager.list_issues(state=state)
if issues:
all_issues[repo_name] = issues
return all_issues
def list_all_pull_requests(self, state: str = "open") -> Dict[str, List[Dict[str, Any]]]:
repos = self.client.list_repositories()
all_prs = {}
for repo in repos:
repo_name = repo["name"]
manager = self.get_pr_manager(repo_name)
prs = manager.list_pull_requests(state=state)
if prs:
all_prs[repo_name] = prs
return all_prs
def get_activity_summary(self) -> Dict[str, Any]:
open_issues = self.list_all_issues("open")
open_prs = self.list_all_pull_requests("open")
total_issues = sum(len(issues) for issues in open_issues.values())
total_prs = sum(len(prs) for prs in open_prs.values())
return {
"open_issues": total_issues,
"open_pull_requests": total_prs,
"repositories_with_issues": len(open_issues),
"repositories_with_prs": len(open_prs),
"details": {
"issues_by_repo": {repo: len(issues) for repo, issues in open_issues.items()},
"prs_by_repo": {repo: len(prs) for repo, prs in open_prs.items()}
}
}

Datei anzeigen

@ -0,0 +1,274 @@
import logging
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
from .gitea_client import GiteaClient, GiteaConfig
from .git_operations import GitOperationsManager
logger = logging.getLogger(__name__)
class RepositoryManager:
def __init__(self, gitea_client: Optional[GiteaClient] = None):
self.client = gitea_client or GiteaClient()
self._current_user = None
try:
# Get the actual username from Gitea
user_info = self.client.get_user_info()
actual_username = user_info.get('username', self.client.config.username)
self._current_user = user_info
except Exception as e:
logger.warning(f"Failed to get user info from Gitea: {e}")
# Fall back to config username
actual_username = self.client.config.username
self.git_ops = GitOperationsManager(
base_url=self.client.config.base_url,
token=self.client.config.api_token,
username=actual_username
)
@property
def current_user(self) -> Dict[str, Any]:
if self._current_user is None:
self._current_user = self.client.get_user_info()
return self._current_user
def list_all_repositories(self) -> List[Dict[str, Any]]:
all_repos = []
page = 1
while True:
repos = self.client.list_repositories(page=page)
if not repos:
break
all_repos.extend(repos)
page += 1
return all_repos
def list_organization_repositories(self, org_name: str, page: int = None, per_page: int = None) -> List[Dict[str, Any]]:
"""List all repositories for an organization"""
if page is not None and per_page is not None:
# Single page request
try:
repos = self.client._request("GET", f"orgs/{org_name}/repos", params={"page": page, "limit": per_page})
return repos if repos else []
except Exception as e:
logger.error(f"Failed to list org repositories: {e}")
return []
else:
# Get all pages
all_repos = []
current_page = 1
while True:
try:
repos = self.client._request("GET", f"orgs/{org_name}/repos", params={"page": current_page, "limit": 50})
if not repos:
break
all_repos.extend(repos)
current_page += 1
except Exception as e:
logger.error(f"Failed to list org repositories: {e}")
break
return all_repos
def create_repository(self, name: str, description: str = "", private: bool = False,
auto_init: bool = True, gitignore: str = "", license: str = "",
organization: str = None) -> Dict[str, Any]:
try:
# Try to create in organization first
if organization:
try:
data = {
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"gitignores": gitignore,
"license": license,
"default_branch": "main" # Use main instead of master
}
repo = self.client._request("POST", f"orgs/{organization}/repos", json=data)
logger.info(f"Repository '{name}' created successfully in organization '{organization}'")
return repo
except Exception as org_error:
logger.error(f"Failed to create in organization {organization}: {org_error}")
# Don't fall back - raise the error so user knows what happened
raise Exception(f"Konnte Repository nicht in Organisation '{organization}' erstellen: {str(org_error)}")
# Create as user repository
repo = self.client.create_repository(
name=name,
description=description,
private=private,
auto_init=auto_init,
gitignores=gitignore,
license=license
)
logger.info(f"Repository '{name}' created successfully as user repository")
return repo
except Exception as e:
logger.error(f"Failed to create repository '{name}': {e}")
raise
def delete_repository(self, repo_name: str) -> bool:
try:
owner = self.current_user["username"]
self.client.delete_repository(owner, repo_name)
logger.info(f"Repository '{repo_name}' deleted successfully")
return True
except Exception as e:
logger.error(f"Failed to delete repository '{repo_name}': {e}")
return False
def clone_repository(self, repo_name: str, clone_dir: Optional[Path] = None) -> Tuple[bool, Path]:
owner = self.current_user["username"]
return self.git_ops.clone_repository(owner, repo_name, clone_dir)
def get_repository_info(self, repo_name: str) -> Optional[Dict[str, Any]]:
try:
owner = self.current_user["username"]
return self.client.get_repository(owner, repo_name)
except Exception as e:
logger.error(f"Failed to get repository info for '{repo_name}': {e}")
return None
def fork_repository(self, owner: str, repo_name: str) -> Optional[Dict[str, Any]]:
try:
fork = self.client.fork_repository(owner, repo_name)
logger.info(f"Repository '{owner}/{repo_name}' forked successfully")
return fork
except Exception as e:
logger.error(f"Failed to fork repository '{owner}/{repo_name}': {e}")
return None
def search_repositories(self, query: str) -> List[Dict[str, Any]]:
all_repos = self.list_all_repositories()
query_lower = query.lower()
return [
repo for repo in all_repos
if query_lower in repo["name"].lower() or
query_lower in repo.get("description", "").lower()
]
def sync_repository(self, repo_path: Path) -> Tuple[bool, str]:
success, fetch_result = self.git_ops.fetch(repo_path)
if not success:
return False, f"Fetch failed: {fetch_result}"
success, pull_result = self.git_ops.pull(repo_path)
if not success:
return False, f"Pull failed: {pull_result}"
return True, "Repository synchronized successfully"
def commit_and_push(self, repo_path: Path, message: str,
files: Optional[List[str]] = None) -> Tuple[bool, str]:
success, add_result = self.git_ops.add(repo_path, files)
if not success:
return False, f"Add failed: {add_result}"
success, commit_result = self.git_ops.commit(repo_path, message)
if not success:
return False, f"Commit failed: {commit_result}"
success, push_result = self.git_ops.push(repo_path)
if not success:
return False, f"Push failed: {push_result}"
return True, "Changes committed and pushed successfully"
def get_repository_status(self, repo_path: Path) -> Dict[str, Any]:
success, status = self.git_ops.status(repo_path)
success_branch, branches = self.git_ops.branch(repo_path)
success_remote, remotes = self.git_ops.remote_list(repo_path)
current_branch = None
if success_branch:
for line in branches.split('\n'):
if line.startswith('*'):
current_branch = line[2:].strip()
break
return {
"has_changes": bool(status.strip()) if success else None,
"status": status if success else "Unable to get status",
"current_branch": current_branch,
"remotes": remotes if success_remote else "Unable to get remotes"
}
def create_branch(self, repo_path: Path, branch_name: str) -> Tuple[bool, str]:
return self.git_ops.checkout(repo_path, branch_name, create=True)
def switch_branch(self, repo_path: Path, branch_name: str) -> Tuple[bool, str]:
return self.git_ops.checkout(repo_path, branch_name)
def list_branches(self, repo_name: str) -> List[Dict[str, Any]]:
try:
owner = self.current_user["username"]
return self.client.list_branches(owner, repo_name)
except Exception as e:
logger.error(f"Failed to list branches for '{repo_name}': {e}")
return []
def create_remote_branch(self, repo_name: str, branch_name: str,
base_branch: str = "main") -> Optional[Dict[str, Any]]:
try:
owner = self.current_user["username"]
return self.client.create_branch(owner, repo_name, branch_name, base_branch)
except Exception as e:
logger.error(f"Failed to create branch '{branch_name}' in '{repo_name}': {e}")
return None
def delete_remote_branch(self, repo_name: str, branch_name: str) -> bool:
try:
owner = self.current_user["username"]
self.client.delete_branch(owner, repo_name, branch_name)
logger.info(f"Branch '{branch_name}' deleted from '{repo_name}'")
return True
except Exception as e:
logger.error(f"Failed to delete branch '{branch_name}' from '{repo_name}': {e}")
return False
def push_local_repo_to_gitea(self, local_repo_path: Path, repo_name: str,
description: str = "", private: bool = False,
branch: str = "main", organization: str = None) -> Tuple[bool, str]:
"""Create a new repo on Gitea and push an existing local repository to it"""
try:
# First create the repository on Gitea
repo = self.create_repository(
name=repo_name,
description=description,
private=private,
auto_init=False, # Important: don't initialize since we're pushing existing code
organization=organization
)
# Determine the correct owner
if organization:
owner = organization
elif 'owner' in repo and repo['owner']:
owner = repo['owner']['username'] if 'username' in repo['owner'] else repo['owner']['login']
else:
owner = self.current_user["username"]
logger.info(f"Repository created, owner determined as: {owner}")
# Then push the local repository
success, result = self.git_ops.push_existing_repo_to_gitea(
local_repo_path, owner, repo_name, branch
)
if success:
logger.info(f"Successfully pushed local repository to '{repo_name}'")
return True, f"Repository '{repo_name}' created and pushed successfully"
else:
logger.error(f"Failed to push to '{repo_name}': {result}")
return False, f"Repository created but push failed: {result}"
except Exception as e:
logger.error(f"Failed to push local repository: {e}")
return False, str(e)