#!/usr/bin/env python3 """ Seed Tracker - Match torrent files against qBittorrent session. Compares {id}.torrent files against qBittorrent's BT_backup folder to determine which torrents are actively seeding, then updates a NocoDB database with the seeding user's name. Used for tracking who is seeding shared backup torrents. Matching is done by: 1. Info hash (exact match) 2. Name + size fallback (for re-created torrents with different hashes) Requirements: - Python 3.10+ - No external dependencies (uses only stdlib) Usage: python seed_tracker.py --id-folder ./torrents --bt-backup /path/to/BT_backup \\ --nocodb-url https://noco.example.com --table-id tblXXX --api-token xc-xxx \\ --id-field cXXX --seeding-field cYYY To find NocoDB IDs: - Table ID: Click ... next to table name → Copy Table ID - Field IDs: Click field header dropdown → Copy Field ID """ import argparse import csv import hashlib import json import sys import time import urllib.request import urllib.error from pathlib import Path # --- Pure Python Bencode Decoder --- def decode_bencode(data: bytes): """Decode bencoded data. Returns (decoded_value, remaining_bytes).""" def decode_next(data: bytes, index: int): if index >= len(data): raise ValueError("Unexpected end of data") char = chr(data[index]) # Integer: ie if char == 'i': end = data.index(b'e', index) return int(data[index + 1:end]), end + 1 # List: le elif char == 'l': result = [] index += 1 while chr(data[index]) != 'e': item, index = decode_next(data, index) result.append(item) return result, index + 1 # Dictionary: d...e elif char == 'd': result = {} index += 1 while chr(data[index]) != 'e': key, index = decode_next(data, index) if isinstance(key, bytes): key = key.decode('utf-8', errors='replace') value, index = decode_next(data, index) result[key] = value return result, index + 1 # String: : elif char.isdigit(): colon = data.index(b':', index) length = int(data[index:colon]) start = colon + 1 return data[start:start + length], start + length else: raise ValueError(f"Invalid bencode at position {index}: {char}") result, _ = decode_next(data, 0) return result def encode_bencode(data) -> bytes: """Encode data to bencode format.""" if isinstance(data, int): return f"i{data}e".encode() elif isinstance(data, bytes): return f"{len(data)}:".encode() + data elif isinstance(data, str): encoded = data.encode('utf-8') return f"{len(encoded)}:".encode() + encoded elif isinstance(data, list): return b'l' + b''.join(encode_bencode(item) for item in data) + b'e' elif isinstance(data, dict): result = b'd' for key in sorted(data.keys()): key_bytes = key.encode('utf-8') if isinstance(key, str) else key result += encode_bencode(key_bytes) result += encode_bencode(data[key]) result += b'e' return result else: raise TypeError(f"Cannot encode type: {type(data)}") # --- Core Functions --- def get_torrent_info(torrent_path: Path) -> tuple[str | None, str | None, int | None]: """Extract info_hash, name, and total size from a .torrent file.""" try: with open(torrent_path, 'rb') as f: data = f.read() decoded = decode_bencode(data) if 'info' not in decoded: return None, None, None info = decoded['info'] # Get name name = info.get('name') if isinstance(name, bytes): name = name.decode('utf-8', errors='replace') # Get total size if 'length' in info: # Single file torrent total_size = info['length'] elif 'files' in info: # Multi-file torrent total_size = sum(f['length'] for f in info['files']) else: total_size = None # Re-encode the info dict and hash it info_bencoded = encode_bencode(info) info_hash = hashlib.sha1(info_bencoded).hexdigest().upper() return info_hash, name, total_size except Exception as e: print(f" Warning: Could not read {torrent_path.name}: {e}", file=sys.stderr) return None, None, None def get_bt_backup_data(bt_backup_path: Path) -> tuple[set[str], dict[tuple[str, int], str]]: """Get info hashes and name+size lookup from qBittorrent's BT_backup folder. Returns: - Set of info hashes (for fast hash matching) - Dict of (name, size) -> hash (for fallback matching) """ hashes = set() name_size_lookup = {} if not bt_backup_path.exists(): print(f"Warning: BT_backup path does not exist: {bt_backup_path}", file=sys.stderr) return hashes, name_size_lookup # First pass: collect all hashes from filenames (fast) for file in bt_backup_path.iterdir(): if file.suffix in ('.torrent', '.fastresume'): hashes.add(file.stem.upper()) # Second pass: parse .torrent files for name+size fallback for file in bt_backup_path.iterdir(): if file.suffix == '.torrent': try: with open(file, 'rb') as f: data = f.read() decoded = decode_bencode(data) if 'info' not in decoded: continue info = decoded['info'] # Get name name = info.get('name') if isinstance(name, bytes): name = name.decode('utf-8', errors='replace') # Get total size if 'length' in info: total_size = info['length'] elif 'files' in info: total_size = sum(f['length'] for f in info['files']) else: continue if name and total_size: name_size_lookup[(name, total_size)] = file.stem.upper() except Exception: continue return hashes, name_size_lookup # --- NocoDB API Functions --- class NocoDBClient: """Simple NocoDB API client.""" def __init__(self, base_url: str, table_id: str, api_token: str, id_field: str, seeding_field: str, debug: bool = False): self.base_url = base_url.rstrip('/') self.table_id = table_id self.api_token = api_token self.id_field = id_field self.seeding_field = seeding_field self.debug = debug self.endpoint = f"{self.base_url}/api/v2/tables/{table_id}/records" def _request(self, method: str, data: dict | None = None, params: dict | None = None) -> dict: """Make an API request.""" url = self.endpoint if params: query = "&".join(f"{k}={urllib.request.quote(str(v))}" for k, v in params.items()) url = f"{url}?{query}" headers = { "xc-token": self.api_token, "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "application/json", } body = json.dumps(data).encode('utf-8') if data else None req = urllib.request.Request(url, data=body, headers=headers, method=method) if self.debug: print(f" DEBUG: {method} {url}", file=sys.stderr) if body: print(f" DEBUG: Body: {body.decode()}", file=sys.stderr) try: with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read().decode('utf-8')) if self.debug: print(f" DEBUG: Response: {json.dumps(result)[:500]}", file=sys.stderr) return result except urllib.error.HTTPError as e: error_body = e.read().decode('utf-8') if e.fp else "" raise Exception(f"HTTP {e.code}: {error_body}") def get_record(self, record_id: int | str) -> dict | None: """Get a single record by ID.""" try: params = {"where": f"({self.id_field},eq,{record_id})", "limit": "1"} result = self._request("GET", params=params) records = result.get("list", []) return records[0] if records else None except Exception as e: if self.debug: print(f" DEBUG: get_record error: {e}", file=sys.stderr) return None def update_record(self, record_id: int | str, value: str) -> bool: """Update the seeding_users field on a record.""" try: data = {self.id_field: int(record_id), self.seeding_field: value} self._request("PATCH", data=data) return True except Exception as e: print(f" API error: {e}", file=sys.stderr) return False def parse_multiselect(value) -> set[str]: """Parse NocoDB multi-select field into a set of values.""" if value is None: return set() if isinstance(value, list): return set(value) if isinstance(value, str): if not value.strip(): return set() return set(v.strip() for v in value.split(',')) return set() def format_multiselect(values: set[str]) -> str: """Format a set of values as NocoDB multi-select string.""" if not values: return "" return ",".join(sorted(values)) def main(): parser = argparse.ArgumentParser( description='Match torrent files against qBittorrent session and update NocoDB.', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # API mode - update NocoDB directly: %(prog)s --id-folder ./torrents --bt-backup ~/.local/share/qBittorrent/BT_backup \\ --nocodb-url https://noco.example.com --table-id tblXXXXX --api-token xc-xxxx \\ --id-field cXXXXX --seeding-field cYYYYY # CSV mode - just output a file: %(prog)s --id-folder ./torrents --bt-backup /path/to/BT_backup --csv-only To find field IDs in NocoDB: click field header dropdown → Copy Field ID (starts with "c") """ ) parser.add_argument('--id-folder', required=True, type=Path, help='Path to folder containing {id}.torrent files') parser.add_argument('--bt-backup', required=True, type=Path, help="Path to qBittorrent's BT_backup folder") # NocoDB API options parser.add_argument('--nocodb-url', type=str, default=None, help='NocoDB base URL (e.g., http://localhost:8080)') parser.add_argument('--table-id', type=str, default=None, help='NocoDB table ID (starts with "tbl")') parser.add_argument('--api-token', type=str, default=None, help='NocoDB API token (xc-token)') parser.add_argument('--id-field', type=str, default=None, help='Field ID for the Id column (starts with "c")') parser.add_argument('--seeding-field', type=str, default=None, help='Field ID for the seeding_users column (starts with "c")') # CSV fallback parser.add_argument('--csv-only', action='store_true', help='Skip API, just output CSV') parser.add_argument('--output', type=Path, default=Path('seeding_update.csv'), help='Output CSV path (default: seeding_update.csv)') parser.add_argument('--debug', action='store_true', help='Print debug info for API calls') args = parser.parse_args() # Validate paths if not args.id_folder.exists(): print(f"Error: ID folder does not exist: {args.id_folder}", file=sys.stderr) sys.exit(1) if not args.bt_backup.exists(): print(f"Error: BT_backup folder does not exist: {args.bt_backup}", file=sys.stderr) sys.exit(1) # Determine mode use_api = not args.csv_only # Show banner print("=" * 50) print("Seed Tracker") print("=" * 50) print(f"Mode: {'NocoDB API' if use_api else 'CSV output'}") if use_api: if not all([args.nocodb_url, args.table_id, args.api_token, args.id_field, args.seeding_field]): print("Error: API mode requires --nocodb-url, --table-id, --api-token, --id-field, and --seeding-field", file=sys.stderr) print(" Use --csv-only to skip API and just output CSV", file=sys.stderr) sys.exit(1) noco = NocoDBClient( args.nocodb_url, args.table_id, args.api_token, args.id_field, args.seeding_field, args.debug ) # Test connection print(f"\nTesting NocoDB connection...") try: test_result = noco._request("GET", params={"limit": "1"}) records = test_result.get("list", []) if records: print(f" ✓ Connected!") else: print(" ✓ Connected (table empty)") except Exception as e: print(f" ✗ Connection failed: {e}", file=sys.stderr) sys.exit(1) else: noco = None # Prompt for username username = input("\nEnter your username: ").strip() if not username: print("Error: Username cannot be empty.", file=sys.stderr) sys.exit(1) print(f"\nRunning as: {username}") # Get hashes from qBittorrent session print(f"\nScanning BT_backup: {args.bt_backup}") session_hashes, name_size_lookup = get_bt_backup_data(args.bt_backup) print(f"Found {len(session_hashes)} torrents in qBittorrent session") print(f"Built name+size index for {len(name_size_lookup)} torrents") # Scan id-folder and match print(f"\nScanning ID folder: {args.id_folder}") print() matches = [] stats = {'total': 0, 'hash_match': 0, 'name_size_match': 0, 'not_found': 0, 'hash_error': 0} for torrent_file in sorted(args.id_folder.glob('*.torrent')): stats['total'] += 1 # Extract ID from filename torrent_id = torrent_file.stem # Get info hash, name, and size info_hash, name, size = get_torrent_info(torrent_file) display_name = name or "(unknown)" if info_hash is None: stats['hash_error'] += 1 print(f" ✗ {torrent_id}: {display_name} [read error]") continue # Check if in session - try hash first, then name+size if info_hash in session_hashes: stats['hash_match'] += 1 matches.append(torrent_id) print(f" ✓ {torrent_id}: {display_name}") elif name and size and (name, size) in name_size_lookup: stats['name_size_match'] += 1 matches.append(torrent_id) print(f" ≈ {torrent_id}: {display_name} [name+size match]") else: stats['not_found'] += 1 print(f" - {torrent_id}: {display_name}") # Update NocoDB or write CSV if use_api: print(f"\nUpdating {len(matches)} records in NocoDB...") api_stats = {'updated': 0, 'already_seeding': 0, 'failed': 0, 'not_found': 0} for i, torrent_id in enumerate(matches, 1): # Get current record record = noco.get_record(torrent_id) if record is None: print(f" ? {torrent_id}: record not found in NocoDB") api_stats['not_found'] += 1 continue # Parse current seeders current_seeders = parse_multiselect(record.get(noco.seeding_field)) if username in current_seeders: print(f" = {torrent_id}: already listed") api_stats['already_seeding'] += 1 continue # Add username and update current_seeders.add(username) new_value = format_multiselect(current_seeders) if noco.update_record(torrent_id, new_value): print(f" + {torrent_id}: added {username}") api_stats['updated'] += 1 else: print(f" ! {torrent_id}: update failed") api_stats['failed'] += 1 # Rate limit: NocoDB allows 5 req/sec, we'll do ~3/sec to be safe if i % 3 == 0: time.sleep(1) # Summary print("\n" + "=" * 50) print("Summary") print("=" * 50) print(f" Total .torrent files scanned: {stats['total']}") print(f" Matched by hash: {stats['hash_match']}") print(f" Matched by name+size: {stats['name_size_match']}") print(f" Total matched: {stats['hash_match'] + stats['name_size_match']}") print(f" Not in session: {stats['not_found']}") print(f" Hash extraction errors: {stats['hash_error']}") print() print(f" API: Records updated: {api_stats['updated']}") print(f" API: Already seeding: {api_stats['already_seeding']}") print(f" API: Not found in NocoDB: {api_stats['not_found']}") print(f" API: Update failed: {api_stats['failed']}") print(f"\nDone!") else: # CSV-only mode print(f"\nWriting {len(matches)} matches to: {args.output}") with open(args.output, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['Id', 'seeding_users']) for torrent_id in matches: writer.writerow([torrent_id, username]) # Summary print("\n" + "=" * 50) print("Summary") print("=" * 50) print(f" Total .torrent files scanned: {stats['total']}") print(f" Matched by hash: {stats['hash_match']}") print(f" Matched by name+size: {stats['name_size_match']}") print(f" Total matched: {stats['hash_match'] + stats['name_size_match']}") print(f" Not in session: {stats['not_found']}") print(f" Hash extraction errors: {stats['hash_error']}") print(f"\nDone! Import {args.output} into NocoDB.") if __name__ == '__main__': main()