import subprocess import google.generativeai as genai import os import argparse import sys import datetime import re import logging # --- Configuration --- # Configure logging logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") # Attempt to get API key from environment variable API_KEY = os.getenv("GEMINI_API_KEY") if not API_KEY: logging.error("GEMINI_API_KEY environment variable not set.") logging.error( "Please obtain an API key from Google AI Studio (https://aistudio.google.com/app/apikey)" ) logging.error("and set it as an environment variable:") logging.error(" export GEMINI_API_KEY='YOUR_API_KEY' (Linux/macOS)") logging.error(" set GEMINI_API_KEY=YOUR_API_KEY (Windows CMD)") logging.error(" $env:GEMINI_API_KEY='YOUR_API_KEY' (Windows PowerShell)") sys.exit(1) # Configure the Gemini AI Client try: genai.configure(api_key=API_KEY) # Use a model suitable for complex reasoning like code analysis. # Adjust model name if needed (e.g., 'gemini-1.5-flash-latest'). MODEL_NAME = os.getenv("GEMINI_MODEL") if not MODEL_NAME: logging.error("GEMINI_MODEL environment variable not set.") logging.error( "Please set the desired Gemini model name (e.g., 'gemini-1.5-flash-latest')." ) logging.error(" export GEMINI_MODEL='gemini-1.5-flash-latest' (Linux/macOS)") logging.error(" set GEMINI_MODEL=gemini-1.5-flash-latest (Windows CMD)") logging.error( " $env:GEMINI_MODEL='gemini-1.5-flash-latest' (Windows PowerShell)" ) sys.exit(1) model = genai.GenerativeModel(MODEL_NAME) logging.info(f"Using Gemini model: {MODEL_NAME}") except Exception as e: logging.error(f"Error configuring Gemini AI: {e}") sys.exit(1) # --- Git Helper Functions --- def run_git_command(command_list): """ Runs a Git command as a list of arguments and returns its stdout. Handles errors and returns None on failure. """ full_command = [] try: # Prepend 'git' to the command list full_command = ["git"] + command_list logging.debug(f"Running command: {' '.join(full_command)}") result = subprocess.run( full_command, check=True, capture_output=True, text=True, encoding="utf-8", # Be explicit about encoding errors="replace", # Handle potential decoding errors ) logging.debug( f"Command successful. Output:\n{result.stdout[:200]}..." ) # Log snippet return result.stdout.strip() except subprocess.CalledProcessError as e: logging.error(f"Error executing Git command: {' '.join(full_command)}") # Log stderr, replacing potential problematic characters stderr_safe = e.stderr.strip().encode("utf-8", "replace").decode("utf-8") logging.error(f"Stderr: {stderr_safe}") return None # Indicate failure except FileNotFoundError: logging.error( "Error: 'git' command not found. Is Git installed and in your PATH?" ) sys.exit(1) # Critical error, exit except Exception as e: logging.error(f"An unexpected error occurred running git: {e}") return None def check_git_repository(): """Checks if the current directory is the root of a Git repository.""" # Use git rev-parse --is-inside-work-tree for a more reliable check output = run_git_command(["rev-parse", "--is-inside-work-tree"]) return output == "true" def get_current_branch(): """Gets the current active Git branch name.""" return run_git_command(["rev-parse", "--abbrev-ref", "HEAD"]) def create_backup_branch(branch_name): """Creates a timestamped backup branch from the given branch name.""" timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") backup_branch_name = f"{branch_name}-backup-{timestamp}" logging.info( f"Attempting to create backup branch: {backup_branch_name} from {branch_name}" ) # Use list format for run_git_command output = run_git_command(["branch", backup_branch_name, branch_name]) # run_git_command returns stdout on success (which is empty for git branch) # or None on failure. Check for None. if output is not None: logging.info(f"Successfully created backup branch: {backup_branch_name}") return backup_branch_name else: logging.error("Failed to create backup branch.") return None def get_commit_range(upstream_ref, current_branch): """ Determines the commit range (merge_base..current_branch). Returns the range string and the merge base hash. """ logging.info( f"Finding merge base between '{upstream_ref}' and '{current_branch}'..." ) merge_base = run_git_command(["merge-base", upstream_ref, current_branch]) if not merge_base: logging.error( f"Could not find merge base between '{upstream_ref}' and '{current_branch}'." ) logging.error( f"Ensure '{upstream_ref}' is a valid reference (branch, commit, tag)" ) logging.error("and that it has been fetched (e.g., 'git fetch origin').") return None, None # Indicate failure logging.info(f"Found merge base: {merge_base}") commit_range = f"{merge_base}..{current_branch}" return commit_range, merge_base def get_commits_in_range(commit_range): """Gets a list of commit hashes and subjects in the specified range (oldest first).""" # --pretty=format adds specific format, %h=short hash, %s=subject # --reverse shows oldest first, which is how rebase lists them log_output = run_git_command( ["log", "--pretty=format:%h %s", "--reverse", commit_range] ) if log_output is not None: commits = log_output.splitlines() logging.info(f"Found {len(commits)} commits in range {commit_range}.") return commits return [] # Return empty list on failure or no commits def get_changed_files_in_range(commit_range): """ Gets a list of files changed in the specified range and generates a simple directory structure string representation. """ # --name-only shows only filenames diff_output = run_git_command(["diff", "--name-only", commit_range]) if diff_output is not None: files = diff_output.splitlines() logging.info(f"Found {len(files)} changed files in range {commit_range}.") # Basic tree structure representation tree = {} for file_path in files: # Normalize path separators for consistency parts = file_path.replace("\\", "/").split("/") node = tree for i, part in enumerate(parts): if not part: continue # Skip empty parts (e.g., leading '/') if i == len(parts) - 1: # It's a file node[part] = "file" else: # It's a directory if part not in node: node[part] = {} # Ensure we don't try to treat a file as a directory if isinstance(node[part], dict): node = node[part] else: # Handle conflict (e.g., file 'a' and dir 'a/b') - less likely with git paths logging.warning( f"Path conflict building file tree for: {file_path}" ) break # Stop processing this path # Simple string representation for the prompt def format_tree(d, indent=0): lines = [] # Sort items for consistent output for key, value in sorted(d.items()): prefix = " " * indent if isinstance(value, dict): lines.append(f"{prefix}šŸ“ {key}/") lines.extend(format_tree(value, indent + 1)) else: lines.append(f"{prefix}šŸ“„ {key}") return lines tree_str = "\n".join(format_tree(tree)) return tree_str, files # Return structure string and raw list return "", [] # Return empty on failure or no changes def get_diff_in_range(commit_range): """Gets the combined diffstat and patch for the specified range.""" # Use --patch-with-stat for context (diff + stats) diff_output = run_git_command(["diff", "--patch-with-stat", commit_range]) if diff_output is not None: logging.info( f"Generated diff for range {commit_range} (length: {len(diff_output)} chars)." ) else: logging.warning(f"Could not generate diff for range {commit_range}.") return ( diff_output if diff_output is not None else "" ) # Return empty string on failure def get_file_content_at_commit(commit_hash, file_path): """Gets the content of a specific file at a specific commit hash.""" logging.info(f"Fetching content of '{file_path}' at commit {commit_hash[:7]}...") # Use 'git show' which handles paths correctly content = run_git_command(["show", f"{commit_hash}:{file_path}"]) if content is None: logging.warning( f"Could not retrieve content for {file_path} at {commit_hash[:7]}." ) return None return content # --- AI Interaction --- def generate_squash_suggestion_prompt( commit_range, merge_base, commits, file_structure, diff ): """ Creates a prompt asking the AI specifically to identify potential squash and fixup candidates within the commit range. """ commit_list_str = ( "\n".join([f"- {c}" for c in commits]) if commits else "No commits in range." ) # The merge base hash isn't strictly needed for *suggestions* but good context prompt = f""" You are an expert Git assistant. Your task is to analyze the provided Git commit history and identify commits within the range `{commit_range}` that could be logically combined using `squash` or `fixup` during an interactive rebase (`git rebase -i {merge_base}`). **Goal:** Suggest combinations that group related changes together, merge small fixes into their parent commits, or consolidate work-in-progress commits to make the history more understandable and atomic. **Git Commit Message Conventions (for context):** * Subject: Imperative, < 50 chars, capitalized, no period. Use types like `feat:`, `fix:`, `refactor:`, etc. * Body: Explain 'what' and 'why', wrap at 72 chars. **Provided Context:** 1. **Commit Range:** `{commit_range}` 2. **Merge Base Hash:** `{merge_base}` 3. **Commits in Range (Oldest First - Short Hash & Subject):** ``` {commit_list_str} ``` 4. **Changed Files Structure in Range:** ``` {file_structure if file_structure else "No files changed or unable to list."} ``` 5. **Combined Diff for the Range (`git diff --patch-with-stat {commit_range}`):** ```diff {diff if diff else "No differences found or unable to get diff."} ``` **Instructions:** 1. Analyze the commits, their messages, the changed files, and the diff. 2. Identify pairs or sequences of commits from the list above that are strong candidates for being combined using `squash` (combine changes and messages) or `fixup` (combine changes, discard message). 3. For each suggestion, clearly state: * Which commit(s) should be squashed/fixed up *into* which preceding commit. * Whether `squash` or `fixup` is more appropriate. * A brief explanation of *why* this combination makes sense (e.g., "Commit B is a minor fix for commit A", "Commits C, D, E are parts of the same feature implementation"). 4. **Focus ONLY on squash/fixup suggestions.** Do *not* suggest `reword`, `edit`, `drop`, or provide a full rebase plan/command sequence. 5. Format your response as a list of suggestions. **Example Output Format:** ```text Based on the analysis, here are potential candidates for squashing or fixing up: * **Suggestion 1:** * Action: `fixup` commit ` fix typo` into ` feat: Add initial framework`. * Reason: Commit `` appears to be a small correction directly related to the initial framework added in ``. Its message can likely be discarded. * **Suggestion 2:** * Action: `squash` commit ` Add tests` into ` feat: Implement user login`. * Reason: Commit `` adds tests specifically for the feature implemented in ``. Combining them keeps the feature and its tests together. Their messages should be combined during the rebase. * **Suggestion 3:** * Action: `squash` commits ` WIP part 2` and ` WIP part 3` into ` feat: Start implementing feature X`. * Reason: Commits `` and `` seem like incremental work-in-progress steps for the feature started in ``. Squashing them creates a single, complete commit for the feature. ``` 6. **File Content Request:** If you absolutely need the content of specific files *at specific commits* to confidently determine if they should be squashed/fixed up, ask for them clearly ONCE. List the files using this exact format at the end of your response: `REQUEST_FILES: [commit_hash1:path/to/file1.py, commit_hash2:another/path/file2.js]` Use the short commit hashes provided in the commit list. Do *not* ask for files unless essential for *this specific task* of identifying squash/fixup candidates. Now, analyze the provided context and generate *only* the squash/fixup suggestions and their reasoning. """ return prompt # --- request_files_from_user function remains the same --- def request_files_from_user(requested_files_str, commits_in_range): """ Parses AI request string "REQUEST_FILES: [hash:path, ...]", verifies hashes, asks user permission, fetches file contents, and returns formatted context. """ file_requests = [] try: # Extract the part within brackets using regex content_match = re.search( r"REQUEST_FILES:\s*\[(.*)\]", requested_files_str, re.IGNORECASE | re.DOTALL ) if not content_match: logging.warning("Could not parse file request format from AI response.") return None, None # Indicate parsing failure items_str = content_match.group(1).strip() if not items_str: logging.info("AI requested files but the list was empty.") return None, None # Empty request # Split items, handling potential spaces around commas items = [item.strip() for item in items_str.split(",") if item.strip()] # Map short hashes from the original list to verify AI request commit_hash_map = { c.split()[0]: c.split()[0] for c in commits_in_range } # short_hash: short_hash for item in items: if ":" not in item: logging.warning( f"Invalid format in requested file item (missing ':'): {item}" ) continue commit_hash, file_path = item.split(":", 1) commit_hash = commit_hash.strip() file_path = file_path.strip() # Verify the short hash exists in our original list if commit_hash not in commit_hash_map: logging.warning( f"AI requested file for unknown/out-of-range commit hash '{commit_hash}'. Skipping." ) continue file_requests.append({"hash": commit_hash, "path": file_path}) except Exception as e: logging.error(f"Error parsing requested files string: {e}") return None, None # Indicate parsing error if not file_requests: logging.info("No valid file requests found after parsing AI response.") return None, None # No valid requests print("\n----------------------------------------") print("ā“ AI Request for File Content ā“") print("----------------------------------------") print("The AI needs the content of the following files at specific commits") print("to provide more accurate squash/fixup suggestions:") files_to_fetch = [] for i, req in enumerate(file_requests): print(f" {i + 1}. File: '{req['path']}' at commit {req['hash']}") files_to_fetch.append(req) # Add to list if valid if not files_to_fetch: print("\nNo valid files to fetch based on the request.") return None, None # No files remain after validation print("----------------------------------------") while True: try: answer = ( input("Allow fetching these file contents? (yes/no): ").lower().strip() ) except EOFError: # Handle case where input stream is closed (e.g., piping) logging.warning("Input stream closed. Assuming 'no'.") answer = "no" if answer == "yes": logging.info("User approved fetching file content.") fetched_content_list = [] for req in files_to_fetch: content = get_file_content_at_commit(req["hash"], req["path"]) if content is not None: # Format for the AI prompt fetched_content_list.append( f"--- Content of '{req['path']}' at commit {req['hash']} ---\n" f"```\n{content}\n```\n" f"--- End Content for {req['path']} at {req['hash']} ---" ) else: # Inform AI that content couldn't be fetched fetched_content_list.append( f"--- Could not fetch content of '{req['path']}' at commit {req['hash']} ---" ) # Return the combined content and the original request string for context return "\n\n".join(fetched_content_list), requested_files_str elif answer == "no": logging.info("User denied fetching file content.") # Return None for content, but still return the request string return None, requested_files_str else: print("Please answer 'yes' or 'no'.") # --- Main Execution --- def main(): """Main function to orchestrate Git analysis and AI interaction.""" parser = argparse.ArgumentParser( description="Uses Gemini AI to suggest potential Git squash/fixup candidates.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "upstream_ref", nargs="?", # Default to common upstream names, user MUST ensure one exists default="upstream/main", help="The upstream reference point or commit hash to compare against " "(e.g., 'origin/main', 'upstream/develop', specific_commit_hash). " "Ensure this reference exists and is fetched.", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Enable verbose debug logging." ) args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) logging.debug("Verbose logging enabled.") if not check_git_repository(): logging.error("This script must be run from within a Git repository.") sys.exit(1) current_branch = get_current_branch() if not current_branch: logging.error("Could not determine the current Git branch.") sys.exit(1) logging.info(f"Current branch: {current_branch}") upstream_ref = args.upstream_ref logging.info(f"Comparing against reference: {upstream_ref}") # --- Safety: Create Backup Branch --- backup_branch = create_backup_branch(current_branch) if not backup_branch: # Ask user if they want to continue without a backup try: confirm = input( "āš ļø Failed to create backup branch. Continue without backup? (yes/no): " ).lower() except EOFError: logging.warning("Input stream closed. Aborting.") confirm = "no" if confirm != "yes": logging.info("Aborting.") sys.exit(1) else: logging.warning("Proceeding without a backup branch. Be careful!") else: print("-" * 40) print(f"āœ… Backup branch created: {backup_branch}") print( " If anything goes wrong during manual rebase later, you can restore using:" ) print(f" git checkout {current_branch}") print(f" git reset --hard {backup_branch}") print("-" * 40) # --- Gather Git Context --- print("\nGathering Git context...") commit_range, merge_base = get_commit_range(upstream_ref, current_branch) if not commit_range: # Error handled in get_commit_range sys.exit(1) logging.info(f"Analyzing commit range: {commit_range} (Merge Base: {merge_base})") commits = get_commits_in_range(commit_range) if not commits: logging.info( f"No commits found between '{merge_base}' and '{current_branch}'. Nothing to suggest." ) sys.exit(0) file_structure, changed_files_list = get_changed_files_in_range(commit_range) diff = get_diff_in_range(commit_range) if not diff and not changed_files_list: logging.warning( f"No file changes or diff found between '{merge_base}' and '{current_branch}'," ) logging.warning("even though commits exist. AI suggestions might be limited.") print("Commits found:") for c in commits: print(f"- {c}") try: confirm_proceed = input( "Proceed with AI analysis despite no diff? (yes/no): " ).lower() except EOFError: confirm_proceed = "no" if confirm_proceed != "yes": logging.info("Aborting analysis.") sys.exit(0) # --- Interact with AI --- print("\nGenerating prompt for AI squash/fixup suggestions...") # *** Use the new prompt function *** initial_prompt = generate_squash_suggestion_prompt( commit_range, merge_base, commits, file_structure, diff ) logging.debug("\n--- Initial AI Prompt Snippet ---") logging.debug(initial_prompt[:1000] + "...") # Log beginning of prompt logging.debug("--- End Prompt Snippet ---\n") print(f"Sending request to Gemini AI ({MODEL_NAME})... This may take a moment.") try: # Start a chat session for potential follow-ups (file requests) convo = model.start_chat(history=[]) response = convo.send_message(initial_prompt) ai_response_text = response.text # Loop to handle potential file requests (still relevant for squash decisions) while "REQUEST_FILES:" in ai_response_text.upper(): logging.info("AI requested additional file content.") additional_context, original_request = request_files_from_user( ai_response_text, commits ) if additional_context: logging.info("Sending fetched file content back to AI...") # Construct follow-up prompt for squash suggestions follow_up_prompt = f""" Okay, here is the content of the files you requested: {additional_context} Please use this new information to refine your **squash/fixup suggestions** based on the original request and context. Provide the final list of suggestions now. Remember to *only* suggest squash/fixup actions and explain why. Do not provide a full rebase plan. Do not ask for more files. """ logging.debug("\n--- Follow-up AI Prompt Snippet ---") logging.debug(follow_up_prompt[:500] + "...") logging.debug("--- End Follow-up Snippet ---\n") response = convo.send_message(follow_up_prompt) ai_response_text = response.text else: logging.info( "Proceeding without providing files as requested by AI or user." ) # Tell the AI to proceed without the files it asked for no_files_prompt = f""" I cannot provide the content for the files you requested ({original_request}). Please proceed with generating the **squash/fixup suggestions** based *only* on the initial context (commit list, file structure, diff) I provided earlier. Make your best suggestions without the file content. Provide the final list of suggestions now. Remember to *only* suggest squash/fixup actions. """ logging.debug("\n--- No-Files AI Prompt ---") logging.debug(no_files_prompt) logging.debug("--- End No-Files Prompt ---\n") response = convo.send_message(no_files_prompt) ai_response_text = response.text # Break the loop as we've instructed AI to proceed without files break print("\nšŸ’” --- AI Squash/Fixup Suggestions --- šŸ’”") # Basic cleanup: remove potential markdown code block fences if AI adds them unnecessarily suggestion = ai_response_text.strip() suggestion = re.sub(r"^```(?:bash|text|)\n", "", suggestion, flags=re.MULTILINE) suggestion = re.sub(r"\n```$", "", suggestion, flags=re.MULTILINE) print(suggestion) print("šŸ’” --- End AI Suggestions --- šŸ’”") print("\n" + "=" * 60) print("šŸ“ NEXT STEPS šŸ“") print("=" * 60) print("1. REVIEW the suggestions above carefully.") print("2. These are *only suggestions* for potential squashes/fixups.") print(" No changes have been made to your Git history.") print("3. If you want to apply these (or other) changes, you can:") print(f" a. Manually run `git rebase -i {merge_base}`.") print(" b. Edit the 'pick' lines in the editor based on these suggestions") print(" (changing 'pick' to 'squash' or 'fixup' as appropriate).") print(" c. Save the editor and follow Git's instructions.") # Optional: Could add a suggestion to run the original script version # print(" d. Alternatively, run a version of this script that asks the AI") # print(" for a full rebase plan.") if backup_branch: print(f"4. Remember your backup branch is: {backup_branch}") print( f" If needed, restore with: git checkout {current_branch} && git reset --hard {backup_branch}" ) else: print( "4. WARNING: No backup branch was created. Proceed with extra caution if rebasing." ) print("=" * 60) except Exception as e: logging.error(f"\nAn error occurred during AI interaction: {e}") # Attempt to print feedback if available in the response object try: if response and hasattr(response, "prompt_feedback"): logging.error(f"AI Prompt Feedback: {response.prompt_feedback}") if response and hasattr(response, "candidates"): # Log candidate details, potentially including finish reason for candidate in response.candidates: logging.error( f"AI Candidate Finish Reason: {candidate.finish_reason}" ) # Safety details if available if hasattr(candidate, "safety_ratings"): logging.error(f"AI Safety Ratings: {candidate.safety_ratings}") except Exception as feedback_e: logging.error( f"Could not retrieve detailed feedback from AI response: {feedback_e}" ) if __name__ == "__main__": main()