Files
2025-06-10 15:01:21 +02:00

748 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Kindle Clippings to Markdown Converter
This script parses a "My Clippings.txt" file from Kindle and converts it
to structured markdown files, creating a separate file for each book.
Optionally uploads the results to a WebDAV server.
"""
import re
from collections import defaultdict
from pathlib import Path
import argparse
import requests
from requests.auth import HTTPBasicAuth
from xml.etree import ElementTree as ET
import os
import shutil
from config import get_webdav_config
def find_kindle_folder():
"""
Search for Kindle folder recursively in /run/media subdirectories.
Returns:
str: Path to Kindle folder if found, None otherwise
"""
media_path = Path('/run/media')
if not media_path.exists():
print("Error: /run/media directory not found")
return None
def search_recursive(path, max_depth=5, current_depth=0):
"""Recursively search for Kindle folder with depth limit."""
if current_depth > max_depth:
return None
try:
for item in path.iterdir():
if item.is_dir():
# Check if current directory is named 'Kindle'
if item.name == 'Kindle':
# Verify it contains documents folder (basic validation)
documents_path = item / 'documents'
if documents_path.exists() and documents_path.is_dir():
print(f"Found Kindle folder: {item}")
return str(item)
# Continue searching in subdirectories
result = search_recursive(item, max_depth, current_depth + 1)
if result:
return result
except PermissionError:
# Skip directories we can't read
pass
except Exception:
# Skip any other errors and continue searching
pass
return None
print("Searching for Kindle folder in /run/media...")
kindle_path = search_recursive(media_path)
if not kindle_path:
print("Error: Kindle folder not found in /run/media")
return None
return kindle_path
def extract_clippings_from_kindle(kindle_path, output_file='My Clippings.txt'):
"""
Extract My Clippings.txt from Kindle/documents folder.
Args:
kindle_path (str): Path to Kindle folder
output_file (str): Output filename for the extracted clippings
Returns:
str: Path to extracted file if successful, None otherwise
"""
documents_path = Path(kindle_path) / 'documents'
clippings_source = documents_path / 'My Clippings.txt'
if not documents_path.exists():
print(f"Error: documents folder not found in {kindle_path}")
return None
if not clippings_source.exists():
print(f"Error: My Clippings.txt not found in {documents_path}")
return None
try:
# Copy the file to current directory
output_path = Path(output_file)
shutil.copy2(clippings_source, output_path)
print(f"Extracted: {clippings_source} -> {output_path}")
return str(output_path)
except Exception as e:
print(f"Error extracting clippings file: {e}")
return None
def auto_parse():
"""
Automatically find and extract My Clippings.txt from connected Kindle device.
Returns:
str: Path to extracted clippings file if successful, None otherwise
"""
print("Auto-parsing: Searching for Kindle device...")
# Find Kindle folder
kindle_path = find_kindle_folder()
if not kindle_path:
return None
# Extract clippings file
clippings_file = extract_clippings_from_kindle(kindle_path)
return clippings_file
def clean_title(title):
"""Clean and normalize book title."""
return title.strip().replace('\r', '').replace('\n', '')
def extract_page_info(metadata_line):
"""
Extract page information from the metadata line.
Args:
metadata_line (str): The line containing page and position info
Returns:
str: Formatted page reference or empty string if not found
"""
metadata_line = metadata_line.lower()
# Look for page information
page_match = re.search(r'pagina\s+(\d+)', metadata_line)
if page_match:
page_num = page_match.group(1)
return f"(p. {page_num})"
# If no page found, look for position as fallback
position_match = re.search(r'posizione\s+(\d+)', metadata_line)
if position_match:
position_num = position_match.group(1)
return f"(pos. {position_num})"
return ""
def sanitize_filename(filename):
"""
Sanitize filename by removing/replacing invalid characters while preserving spaces.
Args:
filename (str): Original filename
Returns:
str: Sanitized filename safe for filesystem with preserved spaces
"""
# Replace invalid filesystem characters with safe alternatives
replacements = {
'<': '', # Mathematical left angle bracket
'>': '', # Mathematical right angle bracket
':': '', # Modifier letter colon
'"': '"', # Right double quotation mark
'/': '', # Division slash
'\\': '', # Set minus
'|': '', # Light vertical bar
'?': '', # Full-width question mark
'*': '', # Asterisk operator
}
# Apply character replacements
for invalid_char, safe_char in replacements.items():
filename = filename.replace(invalid_char, safe_char)
# Clean up multiple spaces but preserve single spaces
filename = re.sub(r'\s+', ' ', filename.strip())
# Remove leading/trailing dots and spaces that could cause issues
filename = filename.strip('. ')
# Limit length to avoid filesystem issues (keeping some margin for .md extension)
if len(filename) > 200:
filename = filename[:200].strip()
# Ensure filename is not empty and doesn't end with problematic characters
if not filename or filename.endswith('.'):
filename = filename.rstrip('.') or 'Untitled Book'
return filename
def parse_clippings_file(file_path):
"""
Parse the My Clippings.txt file and extract book titles, citations, and page references.
Args:
file_path (str): Path to the My Clippings.txt file
Returns:
dict: Dictionary with book titles as keys and lists of (citation, page_ref) tuples as values
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
except UnicodeDecodeError:
# Try with different encoding if UTF-8 fails
with open(file_path, 'r', encoding='latin-1') as file:
content = file.read()
# Split entries by the delimiter
entries = content.split('==========')
# Dictionary to group citations by book title
books = defaultdict(list)
for entry in entries:
entry = entry.strip()
if not entry:
continue
# Split entry into lines and filter empty ones
lines = [line.strip() for line in entry.split('\n') if line.strip()]
if len(lines) < 2:
continue
# First line is the book title
book_title = clean_title(lines[0])
# Second line contains metadata (page, position, etc.)
metadata_line = lines[1]
page_ref = extract_page_info(metadata_line)
# Third line onwards (if exists) contains the actual citation
if len(lines) > 2:
# Join all content lines (in case citation spans multiple lines)
citation = ' '.join(lines[2:]).strip()
# Only add non-empty citations
if citation and citation != '':
# Check if it's a bookmark entry (no actual text content)
meta_line = lines[1].lower()
if 'segnalibro' in meta_line and len(citation) < 10:
# Skip bookmark entries with very short or no text
continue
books[book_title].append((citation, page_ref))
return dict(books)
def generate_markdown_files(books_dict, output_dir="kindle_books"):
"""
Generate separate markdown files for each book with page references.
Args:
books_dict (dict): Dictionary with book titles and (citation, page_ref) tuples
output_dir (str): Directory to save markdown files
Returns:
list: List of generated file paths
"""
# Create output directory if it doesn't exist
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
generated_files = []
# Sort books alphabetically
sorted_books = sorted(books_dict.items())
for book_title, citations_with_pages in sorted_books:
if not citations_with_pages: # Skip books with no citations
continue
# Create sanitized filename
safe_filename = sanitize_filename(book_title)
file_path = output_path / f"{safe_filename}.md"
# Generate markdown content for this book
markdown_content = []
# Add each citation as a blockquote with page reference
for citation, page_ref in citations_with_pages:
# Clean up citation text
citation = citation.replace('\r', '').replace('\n', ' ').strip()
if citation:
# Format the citation with page reference
if page_ref:
markdown_content.append(f"> {citation} {page_ref}\n")
else:
markdown_content.append(f"> {citation}\n")
# Write to file
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(markdown_content))
generated_files.append(str(file_path))
print(f"Generated: {file_path}")
total_citations = sum(len(citations) for citations in books_dict.values())
print(f"\nTotal books processed: {len(generated_files)}")
print(f"Total citations: {total_citations}")
print(f"Files saved in: {output_path.absolute()}")
return generated_files
def get_webdav_directory_contents(base_url, auth):
"""
Get list of files in the WebDAV directory using PROPFIND request.
Args:
base_url (str): WebDAV base URL
auth: HTTPBasicAuth object
Returns:
list: List of file URLs found in the directory
"""
try:
# PROPFIND request to list directory contents
headers = {
'Depth': '1',
'Content-Type': 'application/xml'
}
# Basic PROPFIND body to get file names
propfind_body = '''<?xml version="1.0" encoding="utf-8" ?>
<D:propfind xmlns:D="DAV:">
<D:prop>
<D:displayname/>
<D:resourcetype/>
</D:prop>
</D:propfind>'''
response = requests.request(
'PROPFIND',
base_url,
data=propfind_body,
headers=headers,
auth=auth,
timeout=30
)
if response.status_code != 207: # WebDAV Multi-Status
print(f"Warning: Could not list directory contents - Status: {response.status_code}")
return []
# Parse the XML response
root = ET.fromstring(response.text)
file_urls = []
# Define namespaces
namespaces = {'D': 'DAV:'}
for response_elem in root.findall('.//D:response', namespaces):
href_elem = response_elem.find('D:href', namespaces)
resourcetype_elem = response_elem.find('.//D:resourcetype', namespaces)
if href_elem is not None and resourcetype_elem is not None:
href = href_elem.text
# Skip directories (they contain D:collection element)
collection_elem = resourcetype_elem.find('D:collection', namespaces)
if collection_elem is None and href: # It's a file, not a directory
# Skip the base directory itself
if not href.endswith('/'):
file_urls.append(href)
return file_urls
except Exception as e:
print(f"Error listing directory contents: {e}")
return []
def delete_webdav_file(file_url, auth):
"""
Delete a single file from WebDAV server.
Args:
file_url (str): Full URL of the file to delete
auth: HTTPBasicAuth object
Returns:
bool: True if successful, False otherwise
"""
try:
response = requests.delete(file_url, auth=auth, timeout=30)
if response.status_code in [200, 204, 404]: # 404 means already deleted
return True
else:
print(f"Failed to delete {file_url} - Status: {response.status_code}")
return False
except Exception as e:
print(f"Error deleting {file_url}: {e}")
return False
def clear_webdav_directory():
"""
Clear all files from the WebDAV directory.
Returns:
bool: True if successful, False otherwise
"""
# Get WebDAV configuration
webdav_config = get_webdav_config()
WEBDAV_BASE_URL = webdav_config['base_url']
WEBDAV_USERNAME = webdav_config['username']
WEBDAV_PASSWORD = webdav_config['password']
# Validate credentials
if not WEBDAV_USERNAME or not WEBDAV_PASSWORD:
print("Error: WebDAV username or password not configured")
return False
auth = HTTPBasicAuth(WEBDAV_USERNAME, WEBDAV_PASSWORD)
print("Clearing WebDAV directory...")
# Get list of existing files
file_urls = get_webdav_directory_contents(WEBDAV_BASE_URL, auth)
if not file_urls:
print("No files found in WebDAV directory (or could not list contents)")
return True
print(f"Found {len(file_urls)} files to delete")
deleted_count = 0
failed_count = 0
for file_url in file_urls:
# Convert relative URL to absolute if needed
if file_url.startswith('/'):
# Extract the base domain from WEBDAV_BASE_URL
from urllib.parse import urlparse
parsed_base = urlparse(WEBDAV_BASE_URL)
full_url = f"{parsed_base.scheme}://{parsed_base.netloc}{file_url}"
else:
full_url = file_url
# Extract filename for display
filename = file_url.split('/')[-1]
if delete_webdav_file(full_url, auth):
print(f"✓ Deleted: {filename}")
deleted_count += 1
else:
print(f"✗ Failed to delete: {filename}")
failed_count += 1
print("\nCleanup Summary:")
print(f"✓ Files deleted: {deleted_count}")
print(f"✗ Failed deletions: {failed_count}")
return failed_count == 0
def upload_files_to_webdav(file_paths):
"""
Upload multiple files to WebDAV server using preconfigured settings.
Args:
file_paths (list): List of local file paths to upload
Returns:
tuple: (successful_uploads, failed_uploads)
"""
# Get WebDAV configuration
webdav_config = get_webdav_config()
WEBDAV_BASE_URL = webdav_config['base_url']
WEBDAV_USERNAME = webdav_config['username']
WEBDAV_PASSWORD = webdav_config['password']
# Validate credentials
if not WEBDAV_USERNAME or not WEBDAV_PASSWORD:
print("Error: WebDAV username or password not configured")
return [], file_paths
auth = HTTPBasicAuth(WEBDAV_USERNAME, WEBDAV_PASSWORD)
# Merge local and remote changes before upload
print("Merging local and remote changes...")
from urllib.parse import urlparse
from pathlib import Path
# Retrieve list of remote files
remote_urls = get_webdav_directory_contents(WEBDAV_BASE_URL, auth)
parsed_base = urlparse(WEBDAV_BASE_URL)
# Build mapping of sanitized stems to remote URLs
remote_map = {}
from urllib.parse import urlparse, unquote
for url in remote_urls:
if url.startswith('/'):
full_url = f"{parsed_base.scheme}://{parsed_base.netloc}{url}"
else:
full_url = url
# Decode percent-encoding for correct filename comparison
raw_name = unquote(Path(url).name)
stem = Path(raw_name).stem
sanitized_stem = sanitize_filename(stem)
remote_map[sanitized_stem] = full_url
# Build mapping of local stems to file paths
local_map = {Path(p).stem: p for p in file_paths}
# Determine local output directory from first file path
output_dir = Path(file_paths[0]).parent if file_paths else Path('.')
# Download files present remotely but missing locally
for stem, full_url in remote_map.items():
if stem not in local_map:
try:
resp = requests.get(full_url, auth=auth, timeout=30)
if resp.status_code == 200:
new_path = output_dir / f"{stem}.md"
with open(new_path, 'wb') as f:
f.write(resp.content)
print(f"Downloaded remote-only file: {new_path.name}")
file_paths.append(str(new_path))
local_map[stem] = str(new_path)
else:
print(f"Warning: Failed to download remote file {full_url} - Status: {resp.status_code}")
except Exception as e:
print(f"Warning: Could not download {full_url}: {e}")
# Merge existing local and remote content by matching sanitized stems
from difflib import SequenceMatcher
for stem, full_url in remote_map.items():
if stem in local_map:
file_path = local_map[stem]
try:
resp = requests.get(full_url, auth=auth, timeout=30)
if resp.status_code == 200:
# Preserve blank lines when splitting for merge
remote_lines = resp.text.splitlines()
with open(file_path, 'r', encoding='utf-8') as f:
# Preserve blank lines when splitting for merge
local_lines = f.read().splitlines()
matcher = SequenceMatcher(None, local_lines, remote_lines)
merged_lines = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag in ('equal', 'delete'):
merged_lines.extend(local_lines[i1:i2])
if tag == 'insert':
merged_lines.extend(remote_lines[j1:j2])
if tag == 'replace':
# Character-level check: if single-line replace
local_block = local_lines[i1:i2]
remote_block = remote_lines[j1:j2]
if len(local_block) == len(remote_block) == 1:
from difflib import SequenceMatcher as CharMatcher
ratio = CharMatcher(None, local_block[0], remote_block[0]).ratio()
# If remote change is minor (ratio>=0.9), keep remote; if major, keep local
if ratio >= 0.9:
merged_lines.append(remote_block[0])
else:
merged_lines.append(local_block[0])
else:
merged_lines.extend(local_block)
merged_lines.extend(remote_block)
# Post-process: collapse near-duplicate lines
from difflib import SequenceMatcher as LineMatcher
filtered = []
for line in merged_lines:
if not filtered:
filtered.append(line)
else:
prev = filtered[-1]
if LineMatcher(None, prev, line).ratio() >= 0.9:
filtered[-1] = line
else:
filtered.append(line)
merged_lines = filtered
# Remove any escape backslashes before square brackets
import re
merged_lines = [re.sub(r'\\([\[\]])', r'\1', l) for l in merged_lines]
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(merged_lines))
print(f"Merged file: {Path(file_path).name}")
else:
print(f"Warning: Failed to fetch remote file {full_url} for merging - Status: {resp.status_code}")
except Exception as e:
print(f"Warning: Could not merge {full_url}: {e}")
successful_uploads = []
failed_uploads = []
for file_path in file_paths:
filename = None # Initialize filename variable
try:
# Get the filename from the local path
filename = Path(file_path).name
# Construct the full WebDAV URL including the filename
webdav_file_url = f"{WEBDAV_BASE_URL}/{filename}"
# Read the file content
with open(file_path, 'rb') as f:
file_content = f.read()
# Prepare the request headers
headers = {
'Content-Type': 'text/markdown',
'Content-Length': str(len(file_content))
}
# Set up authentication
auth = HTTPBasicAuth(WEBDAV_USERNAME, WEBDAV_PASSWORD)
print(f"Uploading: {filename}")
# Make the PUT request to upload the file
response = requests.put(
webdav_file_url,
data=file_content,
headers=headers,
auth=auth,
timeout=30
)
if response.status_code in [200, 201, 204]:
print(f"✓ Successfully uploaded: {filename}")
successful_uploads.append(file_path)
else:
print(f"✗ Upload failed for {filename} - Status: {response.status_code}")
print(f" Response: {response.text}")
failed_uploads.append(file_path)
except requests.exceptions.RequestException as e:
display_filename = filename if filename else Path(file_path).name
print(f"✗ Network error uploading {display_filename}: {e}")
failed_uploads.append(file_path)
except Exception as e:
display_filename = filename if filename else Path(file_path).name
print(f"✗ Unexpected error uploading {display_filename}: {e}")
failed_uploads.append(file_path)
return successful_uploads, failed_uploads
def main():
"""Main function to run the converter."""
parser = argparse.ArgumentParser(
description="Convert Kindle My Clippings.txt to separate Markdown files for each book"
)
parser.add_argument(
'input_file',
nargs='?',
default='My Clippings.txt',
help='Path to the My Clippings.txt file (default: My Clippings.txt)'
)
parser.add_argument(
'-d', '--output-dir',
default='kindle_books',
help='Output directory for markdown files (default: kindle_books)'
)
parser.add_argument(
'--upload',
action='store_true',
help='Upload the markdown files to WebDAV server'
)
parser.add_argument(
'--clear',
action='store_true',
help='Clear WebDAV directory before upload (default: merge with existing files)'
)
parser.add_argument(
'--direct',
action='store_true',
help='Auto-parse: search for Kindle device in /run/media and extract My Clippings.txt automatically'
)
args = parser.parse_args()
# Handle --direct flag for auto-parsing
if args.direct:
clippings_file = auto_parse()
if not clippings_file:
print("Auto-parse failed. Could not find or extract My Clippings.txt")
return 1
args.input_file = clippings_file
# Check if input file exists
input_path = Path(args.input_file)
if not input_path.exists():
print(f"Error: Input file '{args.input_file}' not found.")
return 1
try:
# Parse the clippings file
print(f"Parsing {args.input_file}...")
books = parse_clippings_file(args.input_file)
# Generate markdown files
print("Generating markdown files...")
generated_files = generate_markdown_files(books, args.output_dir)
if not generated_files:
print("No books with citations found.")
return 0
# Upload to WebDAV if requested
if args.upload:
# Clear WebDAV directory first only if explicitly requested
if args.clear:
if not clear_webdav_directory():
print("Warning: Some files could not be deleted from WebDAV directory.")
print("Continuing with upload (files will be overwritten)...")
print() # Empty line for better readability
print("Uploading to WebDAV server...")
successful, failed = upload_files_to_webdav(generated_files)
print("\nUpload Summary:")
print(f"✓ Successful uploads: {len(successful)}")
print(f"✗ Failed uploads: {len(failed)}")
if failed:
print("Warning: Some files failed to upload.")
return 1
return 0
except Exception as e:
print(f"Error processing file: {e}")
return 1
if __name__ == "__main__":
exit(main())