#!/usr/bin/env python3 """ Torrent Tracker Scrape Server Remplacement auto-hébergé de scrape.php / du Cloudflare Worker. Dépendances : aucune (stdlib Python 3.8+) Démarrage rapide : python3 scrape_server.py Avec systemd : voir torrent-scrape.service Usage : GET http://127.0.0.1:8765/?hash=<40_hex_chars> GET http://127.0.0.1:8765/?magnet= Réponse JSON : {"seeders": n, "leechers": n, "health": "...", "popularity": "...", "sources": n} """ import json import re import time import urllib.request import urllib.parse from http.server import HTTPServer, BaseHTTPRequestHandler from concurrent.futures import ThreadPoolExecutor, as_completed # --------------------------------------------------------------- # Configuration # --------------------------------------------------------------- HOST = '127.0.0.1' # Écouter uniquement en local (nginx fait le proxy) PORT = 8765 WORKERS = 10 # Requêtes parallèles vers les trackers TIMEOUT = 7 # Secondes par tracker CACHE_TTL = 300 # Durée du cache en secondes (5 min) TRACKERS = [ 'http://tracker.opentrackr.org:1337/scrape', 'http://open.tracker.cl:1337/scrape', 'http://tracker.openbittorrent.com:80/scrape', 'http://tracker.torrent.eu.org:451/scrape', 'http://tracker.tiny-vps.com:6969/scrape', 'http://tracker.files.fm:6969/scrape', 'http://tracker1.bt.moack.co.kr:80/scrape', 'http://tracker.leechersparadise.org:6969/scrape', 'http://open.stealth.si:80/scrape', 'http://tracker4.itzmx.com:2710/scrape', ] # --------------------------------------------------------------- # Cache en mémoire { hash_hex: (timestamp, data_dict) } # Renvoie les dernières données connues si les trackers sont muets. # --------------------------------------------------------------- _cache: dict = {} # --------------------------------------------------------------- # Décodeur bencoding (format réponse tracker) # --------------------------------------------------------------- def bdecode(data: bytearray, pos: list) -> object: c = data[pos[0]] # Entier : ie if c == ord('i'): pos[0] += 1 end = data.index(ord('e'), pos[0]) n = int(data[pos[0]:end]) pos[0] = end + 1 return n # Liste : le if c == ord('l'): pos[0] += 1 lst = [] while data[pos[0]] != ord('e'): lst.append(bdecode(data, pos)) pos[0] += 1 return lst # Dictionnaire : d...e if c == ord('d'): pos[0] += 1 d = {} while data[pos[0]] != ord('e'): key = bdecode(data, pos) val = bdecode(data, pos) # Clé binaire (ex. info hash 20 octets) → hex string if isinstance(key, (bytes, bytearray)): key = key.hex() d[str(key)] = val pos[0] += 1 return d # Chaîne : : if chr(c).isdigit(): colon = data.index(ord(':'), pos[0]) length = int(data[pos[0]:colon]) pos[0] = colon + 1 raw = data[pos[0]:pos[0] + length] pos[0] += length try: # Texte ASCII → str ; données binaires → bytes decoded = raw.decode('ascii') return decoded except (UnicodeDecodeError, ValueError): return bytes(raw) return None # --------------------------------------------------------------- # Scrape d'un tracker # --------------------------------------------------------------- def scrape_tracker(tracker_url: str, hash_hex: str) -> dict | None: hash_bytes = bytes.fromhex(hash_hex) encoded = urllib.parse.quote(hash_bytes, safe='') url = f"{tracker_url}?info_hash={encoded}" try: req = urllib.request.Request( url, headers={'User-Agent': 'TorrentIndicator/1.0'} ) with urllib.request.urlopen(req, timeout=TIMEOUT) as resp: raw = resp.read() except Exception: return None try: parsed = bdecode(bytearray(raw), [0]) except Exception: return None if not isinstance(parsed, dict) or 'files' not in parsed: return None files = parsed['files'] if not isinstance(files, dict): return None for file_data in files.values(): if isinstance(file_data, dict): return { 'seeders': int(file_data.get('complete', 0) or 0), 'leechers': int(file_data.get('incomplete', 0) or 0), } return None # --------------------------------------------------------------- # Parsing du magnet link # --------------------------------------------------------------- def extract_hash(magnet: str) -> str: # Hex 40 chars m = re.search(r'xt=urn:btih:([0-9a-fA-F]{40})', magnet, re.I) if m: return m.group(1).lower() # Base32 32 chars m = re.search(r'xt=urn:btih:([A-Z2-7]{32})', magnet, re.I) if m: return _base32_to_hex(m.group(1).upper()) return '' def _base32_to_hex(s: str) -> str: alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567' buf, bits, out = 0, 0, [] for c in s: val = alphabet.find(c) if val < 0: continue buf = (buf << 5) | val bits += 5 if bits >= 8: bits -= 8 out.append((buf >> bits) & 0xFF) return bytes(out).hex() # --------------------------------------------------------------- # Calculs santé / popularité # --------------------------------------------------------------- def compute_health(seeders: int, leechers: int) -> str: if seeders == 0: return 'dead' ratio = seeders / max(1, seeders + leechers) if ratio >= 0.5: return 'excellent' if ratio >= 0.2: return 'good' return 'poor' def compute_popularity(total: int) -> str: if total >= 1000: return 'viral' if total >= 100: return 'popular' if total >= 10: return 'moderate' return 'low' # --------------------------------------------------------------- # Serveur HTTP # --------------------------------------------------------------- class ScrapeHandler(BaseHTTPRequestHandler): def log_message(self, *args): pass # Désactiver les logs par défaut (gérer via systemd journal) def do_OPTIONS(self): self.send_response(204) self._add_cors() self.end_headers() def do_GET(self): parsed = urllib.parse.urlparse(self.path) params = urllib.parse.parse_qs(parsed.query) hash_hex = params.get('hash', [''])[0].strip().lower() magnet = params.get('magnet', [''])[0].strip() if not hash_hex and magnet: hash_hex = extract_hash(urllib.parse.unquote(magnet)) if not re.fullmatch(r'[0-9a-f]{40}', hash_hex): self._send_json({'error': 'Hash invalide. Fournissez ?hash= (40 hex) ou ?magnet=.'}, 400) return # Vérifier le cache avant d'interroger les trackers cached = _cache.get(hash_hex) if cached and (time.time() - cached[0]) < CACHE_TTL: self._send_json(cached[1]) return best_seeders = 0 best_leechers = 0 sources = 0 with ThreadPoolExecutor(max_workers=WORKERS) as executor: futures = { executor.submit(scrape_tracker, tracker, hash_hex): tracker for tracker in TRACKERS } for future in as_completed(futures): result = future.result() if result: if result['seeders'] > best_seeders: best_seeders = result['seeders'] if result['leechers'] > best_leechers: best_leechers = result['leechers'] sources += 1 if sources == 0 and cached: # Aucun tracker n'a répondu : renvoyer le cache même expiré # plutôt qu'une erreur visible stale = dict(cached[1]) stale['stale'] = True self._send_json(stale) return data = { 'seeders': best_seeders, 'leechers': best_leechers, 'health': compute_health(best_seeders, best_leechers), 'popularity': compute_popularity(best_seeders + best_leechers), 'sources': sources, } # Mettre en cache uniquement si au moins un tracker a répondu if sources > 0: _cache[hash_hex] = (time.time(), data) self._send_json(data) def _send_json(self, data: dict, status: int = 200): body = json.dumps(data).encode('utf-8') self.send_response(status) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Content-Length', str(len(body))) self.send_header('Cache-Control', 'no-store') self._add_cors() self.end_headers() self.wfile.write(body) def _add_cors(self): self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') # --------------------------------------------------------------- # Point d'entrée # --------------------------------------------------------------- if __name__ == '__main__': server = HTTPServer((HOST, PORT), ScrapeHandler) print(f"Torrent scrape server → http://{HOST}:{PORT}") print("Arrêt : Ctrl+C") try: server.serve_forever() except KeyboardInterrupt: print("\nServeur arrêté.")