#!/usr/bin/env python3 """ Kindle Clippings to Markdown Converter This script parses a "My Clippings.txt" file from Kindle and converts it to structured markdown files, creating a separate file for each book. Optionally uploads the results to a WebDAV server. """ import re from collections import defaultdict from pathlib import Path import argparse import requests from requests.auth import HTTPBasicAuth from xml.etree import ElementTree as ET import os import shutil from config import get_webdav_config def find_kindle_folder(): """ Search for Kindle folder recursively in /run/media subdirectories. Returns: str: Path to Kindle folder if found, None otherwise """ media_path = Path('/run/media') if not media_path.exists(): print("Error: /run/media directory not found") return None def search_recursive(path, max_depth=5, current_depth=0): """Recursively search for Kindle folder with depth limit.""" if current_depth > max_depth: return None try: for item in path.iterdir(): if item.is_dir(): # Check if current directory is named 'Kindle' if item.name == 'Kindle': # Verify it contains documents folder (basic validation) documents_path = item / 'documents' if documents_path.exists() and documents_path.is_dir(): print(f"Found Kindle folder: {item}") return str(item) # Continue searching in subdirectories result = search_recursive(item, max_depth, current_depth + 1) if result: return result except PermissionError: # Skip directories we can't read pass except Exception: # Skip any other errors and continue searching pass return None print("Searching for Kindle folder in /run/media...") kindle_path = search_recursive(media_path) if not kindle_path: print("Error: Kindle folder not found in /run/media") return None return kindle_path def extract_clippings_from_kindle(kindle_path, output_file='My Clippings.txt'): """ Extract My Clippings.txt from Kindle/documents folder. Args: kindle_path (str): Path to Kindle folder output_file (str): Output filename for the extracted clippings Returns: str: Path to extracted file if successful, None otherwise """ documents_path = Path(kindle_path) / 'documents' clippings_source = documents_path / 'My Clippings.txt' if not documents_path.exists(): print(f"Error: documents folder not found in {kindle_path}") return None if not clippings_source.exists(): print(f"Error: My Clippings.txt not found in {documents_path}") return None try: # Copy the file to current directory output_path = Path(output_file) shutil.copy2(clippings_source, output_path) print(f"Extracted: {clippings_source} -> {output_path}") return str(output_path) except Exception as e: print(f"Error extracting clippings file: {e}") return None def auto_parse(): """ Automatically find and extract My Clippings.txt from connected Kindle device. Returns: str: Path to extracted clippings file if successful, None otherwise """ print("Auto-parsing: Searching for Kindle device...") # Find Kindle folder kindle_path = find_kindle_folder() if not kindle_path: return None # Extract clippings file clippings_file = extract_clippings_from_kindle(kindle_path) return clippings_file def clean_title(title): """Clean and normalize book title.""" return title.strip().replace('\r', '').replace('\n', '') def extract_page_info(metadata_line): """ Extract page information from the metadata line. Args: metadata_line (str): The line containing page and position info Returns: str: Formatted page reference or empty string if not found """ metadata_line = metadata_line.lower() # Look for page information page_match = re.search(r'pagina\s+(\d+)', metadata_line) if page_match: page_num = page_match.group(1) return f"(p. {page_num})" # If no page found, look for position as fallback position_match = re.search(r'posizione\s+(\d+)', metadata_line) if position_match: position_num = position_match.group(1) return f"(pos. {position_num})" return "" def sanitize_filename(filename): """ Sanitize filename by removing/replacing invalid characters while preserving spaces. Args: filename (str): Original filename Returns: str: Sanitized filename safe for filesystem with preserved spaces """ # Replace invalid filesystem characters with safe alternatives replacements = { '<': '⟨', # Mathematical left angle bracket '>': '⟩', # Mathematical right angle bracket ':': '꞉', # Modifier letter colon '"': '"', # Right double quotation mark '/': '∕', # Division slash '\\': '∖', # Set minus '|': '❘', # Light vertical bar '?': '?', # Full-width question mark '*': '∗', # Asterisk operator } # Apply character replacements for invalid_char, safe_char in replacements.items(): filename = filename.replace(invalid_char, safe_char) # Clean up multiple spaces but preserve single spaces filename = re.sub(r'\s+', ' ', filename.strip()) # Remove leading/trailing dots and spaces that could cause issues filename = filename.strip('. ') # Limit length to avoid filesystem issues (keeping some margin for .md extension) if len(filename) > 200: filename = filename[:200].strip() # Ensure filename is not empty and doesn't end with problematic characters if not filename or filename.endswith('.'): filename = filename.rstrip('.') or 'Untitled Book' return filename def parse_clippings_file(file_path): """ Parse the My Clippings.txt file and extract book titles, citations, and page references. Args: file_path (str): Path to the My Clippings.txt file Returns: dict: Dictionary with book titles as keys and lists of (citation, page_ref) tuples as values """ try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read() except UnicodeDecodeError: # Try with different encoding if UTF-8 fails with open(file_path, 'r', encoding='latin-1') as file: content = file.read() # Split entries by the delimiter entries = content.split('==========') # Dictionary to group citations by book title books = defaultdict(list) for entry in entries: entry = entry.strip() if not entry: continue # Split entry into lines and filter empty ones lines = [line.strip() for line in entry.split('\n') if line.strip()] if len(lines) < 2: continue # First line is the book title book_title = clean_title(lines[0]) # Second line contains metadata (page, position, etc.) metadata_line = lines[1] page_ref = extract_page_info(metadata_line) # Third line onwards (if exists) contains the actual citation if len(lines) > 2: # Join all content lines (in case citation spans multiple lines) citation = ' '.join(lines[2:]).strip() # Only add non-empty citations if citation and citation != '': # Check if it's a bookmark entry (no actual text content) meta_line = lines[1].lower() if 'segnalibro' in meta_line and len(citation) < 10: # Skip bookmark entries with very short or no text continue books[book_title].append((citation, page_ref)) return dict(books) def generate_markdown_files(books_dict, output_dir="kindle_books"): """ Generate separate markdown files for each book with page references. Args: books_dict (dict): Dictionary with book titles and (citation, page_ref) tuples output_dir (str): Directory to save markdown files Returns: list: List of generated file paths """ # Create output directory if it doesn't exist output_path = Path(output_dir) output_path.mkdir(exist_ok=True) generated_files = [] # Sort books alphabetically sorted_books = sorted(books_dict.items()) for book_title, citations_with_pages in sorted_books: if not citations_with_pages: # Skip books with no citations continue # Create sanitized filename safe_filename = sanitize_filename(book_title) file_path = output_path / f"{safe_filename}.md" # Generate markdown content for this book markdown_content = [] # Add each citation as a blockquote with page reference for citation, page_ref in citations_with_pages: # Clean up citation text citation = citation.replace('\r', '').replace('\n', ' ').strip() if citation: # Format the citation with page reference if page_ref: markdown_content.append(f"> {citation} {page_ref}\n") else: markdown_content.append(f"> {citation}\n") # Write to file with open(file_path, 'w', encoding='utf-8') as f: f.write('\n'.join(markdown_content)) generated_files.append(str(file_path)) print(f"Generated: {file_path}") total_citations = sum(len(citations) for citations in books_dict.values()) print(f"\nTotal books processed: {len(generated_files)}") print(f"Total citations: {total_citations}") print(f"Files saved in: {output_path.absolute()}") return generated_files def get_webdav_directory_contents(base_url, auth): """ Get list of files in the WebDAV directory using PROPFIND request. Args: base_url (str): WebDAV base URL auth: HTTPBasicAuth object Returns: list: List of file URLs found in the directory """ try: # PROPFIND request to list directory contents headers = { 'Depth': '1', 'Content-Type': 'application/xml' } # Basic PROPFIND body to get file names propfind_body = ''' ''' response = requests.request( 'PROPFIND', base_url, data=propfind_body, headers=headers, auth=auth, timeout=30 ) if response.status_code != 207: # WebDAV Multi-Status print(f"Warning: Could not list directory contents - Status: {response.status_code}") return [] # Parse the XML response root = ET.fromstring(response.text) file_urls = [] # Define namespaces namespaces = {'D': 'DAV:'} for response_elem in root.findall('.//D:response', namespaces): href_elem = response_elem.find('D:href', namespaces) resourcetype_elem = response_elem.find('.//D:resourcetype', namespaces) if href_elem is not None and resourcetype_elem is not None: href = href_elem.text # Skip directories (they contain D:collection element) collection_elem = resourcetype_elem.find('D:collection', namespaces) if collection_elem is None and href: # It's a file, not a directory # Skip the base directory itself if not href.endswith('/'): file_urls.append(href) return file_urls except Exception as e: print(f"Error listing directory contents: {e}") return [] def delete_webdav_file(file_url, auth): """ Delete a single file from WebDAV server. Args: file_url (str): Full URL of the file to delete auth: HTTPBasicAuth object Returns: bool: True if successful, False otherwise """ try: response = requests.delete(file_url, auth=auth, timeout=30) if response.status_code in [200, 204, 404]: # 404 means already deleted return True else: print(f"Failed to delete {file_url} - Status: {response.status_code}") return False except Exception as e: print(f"Error deleting {file_url}: {e}") return False def clear_webdav_directory(): """ Clear all files from the WebDAV directory. Returns: bool: True if successful, False otherwise """ # Get WebDAV configuration webdav_config = get_webdav_config() WEBDAV_BASE_URL = webdav_config['base_url'] WEBDAV_USERNAME = webdav_config['username'] WEBDAV_PASSWORD = webdav_config['password'] # Validate credentials if not WEBDAV_USERNAME or not WEBDAV_PASSWORD: print("Error: WebDAV username or password not configured") return False auth = HTTPBasicAuth(WEBDAV_USERNAME, WEBDAV_PASSWORD) print("Clearing WebDAV directory...") # Get list of existing files file_urls = get_webdav_directory_contents(WEBDAV_BASE_URL, auth) if not file_urls: print("No files found in WebDAV directory (or could not list contents)") return True print(f"Found {len(file_urls)} files to delete") deleted_count = 0 failed_count = 0 for file_url in file_urls: # Convert relative URL to absolute if needed if file_url.startswith('/'): # Extract the base domain from WEBDAV_BASE_URL from urllib.parse import urlparse parsed_base = urlparse(WEBDAV_BASE_URL) full_url = f"{parsed_base.scheme}://{parsed_base.netloc}{file_url}" else: full_url = file_url # Extract filename for display filename = file_url.split('/')[-1] if delete_webdav_file(full_url, auth): print(f"✓ Deleted: {filename}") deleted_count += 1 else: print(f"✗ Failed to delete: {filename}") failed_count += 1 print("\nCleanup Summary:") print(f"✓ Files deleted: {deleted_count}") print(f"✗ Failed deletions: {failed_count}") return failed_count == 0 def upload_files_to_webdav(file_paths): """ Upload multiple files to WebDAV server using preconfigured settings. Args: file_paths (list): List of local file paths to upload Returns: tuple: (successful_uploads, failed_uploads) """ # Get WebDAV configuration webdav_config = get_webdav_config() WEBDAV_BASE_URL = webdav_config['base_url'] WEBDAV_USERNAME = webdav_config['username'] WEBDAV_PASSWORD = webdav_config['password'] # Validate credentials if not WEBDAV_USERNAME or not WEBDAV_PASSWORD: print("Error: WebDAV username or password not configured") return [], file_paths auth = HTTPBasicAuth(WEBDAV_USERNAME, WEBDAV_PASSWORD) # Merge local and remote changes before upload print("Merging local and remote changes...") from urllib.parse import urlparse from pathlib import Path # Retrieve list of remote files remote_urls = get_webdav_directory_contents(WEBDAV_BASE_URL, auth) parsed_base = urlparse(WEBDAV_BASE_URL) # Build mapping of sanitized stems to remote URLs remote_map = {} from urllib.parse import urlparse, unquote for url in remote_urls: if url.startswith('/'): full_url = f"{parsed_base.scheme}://{parsed_base.netloc}{url}" else: full_url = url # Decode percent-encoding for correct filename comparison raw_name = unquote(Path(url).name) stem = Path(raw_name).stem sanitized_stem = sanitize_filename(stem) remote_map[sanitized_stem] = full_url # Build mapping of local stems to file paths local_map = {Path(p).stem: p for p in file_paths} # Determine local output directory from first file path output_dir = Path(file_paths[0]).parent if file_paths else Path('.') # Download files present remotely but missing locally for stem, full_url in remote_map.items(): if stem not in local_map: try: resp = requests.get(full_url, auth=auth, timeout=30) if resp.status_code == 200: new_path = output_dir / f"{stem}.md" with open(new_path, 'wb') as f: f.write(resp.content) print(f"Downloaded remote-only file: {new_path.name}") file_paths.append(str(new_path)) local_map[stem] = str(new_path) else: print(f"Warning: Failed to download remote file {full_url} - Status: {resp.status_code}") except Exception as e: print(f"Warning: Could not download {full_url}: {e}") # Merge existing local and remote content by matching sanitized stems from difflib import SequenceMatcher for stem, full_url in remote_map.items(): if stem in local_map: file_path = local_map[stem] try: resp = requests.get(full_url, auth=auth, timeout=30) if resp.status_code == 200: # Preserve blank lines when splitting for merge remote_lines = resp.text.splitlines() with open(file_path, 'r', encoding='utf-8') as f: # Preserve blank lines when splitting for merge local_lines = f.read().splitlines() matcher = SequenceMatcher(None, local_lines, remote_lines) merged_lines = [] for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag in ('equal', 'delete'): merged_lines.extend(local_lines[i1:i2]) if tag == 'insert': merged_lines.extend(remote_lines[j1:j2]) if tag == 'replace': # Character-level check: if single-line replace local_block = local_lines[i1:i2] remote_block = remote_lines[j1:j2] if len(local_block) == len(remote_block) == 1: from difflib import SequenceMatcher as CharMatcher ratio = CharMatcher(None, local_block[0], remote_block[0]).ratio() # If remote change is minor (ratio>=0.9), keep remote; if major, keep local if ratio >= 0.9: merged_lines.append(remote_block[0]) else: merged_lines.append(local_block[0]) else: merged_lines.extend(local_block) merged_lines.extend(remote_block) # Post-process: collapse near-duplicate lines from difflib import SequenceMatcher as LineMatcher filtered = [] for line in merged_lines: if not filtered: filtered.append(line) else: prev = filtered[-1] if LineMatcher(None, prev, line).ratio() >= 0.9: filtered[-1] = line else: filtered.append(line) merged_lines = filtered # Remove any escape backslashes before square brackets import re merged_lines = [re.sub(r'\\([\[\]])', r'\1', l) for l in merged_lines] with open(file_path, 'w', encoding='utf-8') as f: f.write('\n'.join(merged_lines)) print(f"Merged file: {Path(file_path).name}") else: print(f"Warning: Failed to fetch remote file {full_url} for merging - Status: {resp.status_code}") except Exception as e: print(f"Warning: Could not merge {full_url}: {e}") successful_uploads = [] failed_uploads = [] for file_path in file_paths: filename = None # Initialize filename variable try: # Get the filename from the local path filename = Path(file_path).name # Construct the full WebDAV URL including the filename webdav_file_url = f"{WEBDAV_BASE_URL}/{filename}" # Read the file content with open(file_path, 'rb') as f: file_content = f.read() # Prepare the request headers headers = { 'Content-Type': 'text/markdown', 'Content-Length': str(len(file_content)) } # Set up authentication auth = HTTPBasicAuth(WEBDAV_USERNAME, WEBDAV_PASSWORD) print(f"Uploading: {filename}") # Make the PUT request to upload the file response = requests.put( webdav_file_url, data=file_content, headers=headers, auth=auth, timeout=30 ) if response.status_code in [200, 201, 204]: print(f"✓ Successfully uploaded: {filename}") successful_uploads.append(file_path) else: print(f"✗ Upload failed for {filename} - Status: {response.status_code}") print(f" Response: {response.text}") failed_uploads.append(file_path) except requests.exceptions.RequestException as e: display_filename = filename if filename else Path(file_path).name print(f"✗ Network error uploading {display_filename}: {e}") failed_uploads.append(file_path) except Exception as e: display_filename = filename if filename else Path(file_path).name print(f"✗ Unexpected error uploading {display_filename}: {e}") failed_uploads.append(file_path) return successful_uploads, failed_uploads def main(): """Main function to run the converter.""" parser = argparse.ArgumentParser( description="Convert Kindle My Clippings.txt to separate Markdown files for each book" ) parser.add_argument( 'input_file', nargs='?', default='My Clippings.txt', help='Path to the My Clippings.txt file (default: My Clippings.txt)' ) parser.add_argument( '-d', '--output-dir', default='kindle_books', help='Output directory for markdown files (default: kindle_books)' ) parser.add_argument( '--upload', action='store_true', help='Upload the markdown files to WebDAV server' ) parser.add_argument( '--clear', action='store_true', help='Clear WebDAV directory before upload (default: merge with existing files)' ) parser.add_argument( '--direct', action='store_true', help='Auto-parse: search for Kindle device in /run/media and extract My Clippings.txt automatically' ) args = parser.parse_args() # Handle --direct flag for auto-parsing if args.direct: clippings_file = auto_parse() if not clippings_file: print("Auto-parse failed. Could not find or extract My Clippings.txt") return 1 args.input_file = clippings_file # Check if input file exists input_path = Path(args.input_file) if not input_path.exists(): print(f"Error: Input file '{args.input_file}' not found.") return 1 try: # Parse the clippings file print(f"Parsing {args.input_file}...") books = parse_clippings_file(args.input_file) # Generate markdown files print("Generating markdown files...") generated_files = generate_markdown_files(books, args.output_dir) if not generated_files: print("No books with citations found.") return 0 # Upload to WebDAV if requested if args.upload: # Clear WebDAV directory first only if explicitly requested if args.clear: if not clear_webdav_directory(): print("Warning: Some files could not be deleted from WebDAV directory.") print("Continuing with upload (files will be overwritten)...") print() # Empty line for better readability print("Uploading to WebDAV server...") successful, failed = upload_files_to_webdav(generated_files) print("\nUpload Summary:") print(f"✓ Successful uploads: {len(successful)}") print(f"✗ Failed uploads: {len(failed)}") if failed: print("Warning: Some files failed to upload.") return 1 return 0 except Exception as e: print(f"Error processing file: {e}") return 1 if __name__ == "__main__": exit(main())