import os import re import unicodedata import json import pandas as pd import requests import time import random from io import StringIO from app.models import db, Job print("πŸ†• MODERN webcrawler LOADED! – BATCHED + PROXY + RESUME + ETA + 2x SCRAPER") UPLOAD_FOLDER = '/app/uploads' RESULT_FOLDER = '/app/results' # 2x Scraper – abwechselnd genutzt SCRAPER_URLS = [ "http://gmaps-scraper-1:8080", "http://gmaps-scraper-2:8080", ] OUTPUT_COLS = ['title', 'category', 'address', 'open_hours', 'website', 'phone', 'link'] PROXY_URL = "http://bitlleuv-rotate:s5hzse6hz74b@p.webshare.io:80" API_PROXIES = {"http": PROXY_URL, "https": PROXY_URL} # ────────────────────────────────────────────── # Tuning # ────────────────────────────────────────────── BATCH_SIZE = 30 # Keywords pro Scraper-Job BATCH_DELAY_MIN = 3 # Sekunden Pause zwischen Batches (min) BATCH_DELAY_MAX = 6 # Sekunden Pause zwischen Batches (max) MAX_TIME = 60 # Sekunden die der Scraper pro Batch hat POLL_MAX = 90 # Max. Poll-Versuche pro Batch POLL_DELAY_MIN = 2 # Sekunden zwischen Polls (min) POLL_DELAY_MAX = 5 # Sekunden zwischen Polls (max) STUCK_THRESHOLD = 8 # Polls auf 'pending' bis Auto-Restart MAX_RETRIES = 2 # Wiederholversuche pro Batch bei Fehler # ────────────────────────────────────────────── # Hilfsfunktionen # ────────────────────────────────────────────── def is_blocked(data): text = str(data).lower() blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429']) if blocked: print(f"🚫 BLOCKED: {str(data)[:100]}") return blocked def fix_encoding(text): if not isinstance(text, str): return text try: return text.encode('latin-1').decode('utf-8') except (UnicodeEncodeError, UnicodeDecodeError): return text # Fix 1: Sonderzeichen in Queries bereinigen def clean_query(q): """Steuerzeichen + fehlerhafte Bytes entfernen fΓΌr saubere Google Maps URLs""" q = ''.join(c for c in q if unicodedata.category(c) != 'Cc') q = ' '.join(q.split()) return q.strip() def build_input_addresses(df): addresses = set() for _, row in df.iterrows(): plz = str(row.get('PLZ', '')).strip() stadt = str(row.get('Stadt', '')).strip() str_ = str(row.get('Straße', '')).strip() nr = str(row.get('Hausnummer', '')).strip() zusatz = str(row.get('Zusatz', '')).strip() full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip() full = ' '.join(full.split()) addresses.add(full) return addresses def normalize_address(addr): if not isinstance(addr, str): return '' addr = fix_encoding(addr) return ' '.join(addr.lower().strip().split()) def address_in_input(result_addr, input_addresses): norm = normalize_address(result_addr) for inp_addr in input_addresses: plz_match = re.search(r'\b\d{5}\b', inp_addr) if plz_match: plz = plz_match.group() if plz in norm: street = inp_addr.split()[0] if inp_addr else '' if len(street) > 3 and street[:4].lower() in norm: return True return False def format_eta(seconds): """Sekunden β†’ lesbares ETA-Format""" if seconds < 60: return f"{int(seconds)}s" h, rem = divmod(int(seconds), 3600) m = rem // 60 return f"{h}h {m:02d}min" if h > 0 else f"{m}min" # ────────────────────────────────────────────── # Fix 3: Scraper-Neustart bei Inactivity # ────────────────────────────────────────────── def restart_scraper(scraper_url): """Den betroffenen Scraper-Container neu starten""" try: import subprocess # Container-Name aus URL ableiten: http://gmaps-scraper-1:8080 β†’ gmaps-scraper-1 container = scraper_url.split("//")[1].split(":")[0] print(f"πŸ”„ Starte {container} neu...") subprocess.run(["docker", "restart", container], timeout=30, capture_output=True) print(f"βœ… {container} neu gestartet – warte 15s...") time.sleep(15) return True except Exception as e: print(f"⚠️ Scraper-Neustart fehlgeschlagen: {e}") return False # ────────────────────────────────────────────── # Resume: Progress-File Hilfsfunktionen # ────────────────────────────────────────────── def get_progress_path(job_id): return os.path.join(RESULT_FOLDER, f"progress_{job_id}.json") def get_partial_path(job_id, suffix): return os.path.join(RESULT_FOLDER, f"partial_{job_id}_{suffix}.csv") def load_progress(job_id): """Gespeicherten Fortschritt laden (falls vorhanden)""" path = get_progress_path(job_id) if os.path.exists(path): with open(path, 'r') as f: data = json.load(f) print(f"πŸ” RESUME: ab Batch {data['last_completed_batch'] + 1}/{data['total_batches']}") return data return None def save_progress(job_id, last_completed_batch, total_batches): """Fortschritt nach jedem Batch speichern""" path = get_progress_path(job_id) with open(path, 'w') as f: json.dump({"last_completed_batch": last_completed_batch, "total_batches": total_batches}, f) def append_partial(job_id, df_filtered, df_raw): """Batch-Ergebnis an Partial-CSV anhΓ€ngen""" for suffix, df in [('filtered', df_filtered), ('raw', df_raw)]: if df is None: continue path = get_partial_path(job_id, suffix) header = not os.path.exists(path) df.to_csv(path, mode='a', index=False, header=header, encoding='utf-8-sig', sep=';') def load_partial(job_id): """Bestehende Partial-CSVs laden""" results_filtered, results_raw = [], [] for suffix, lst in [('filtered', results_filtered), ('raw', results_raw)]: path = get_partial_path(job_id, suffix) if os.path.exists(path): try: df = pd.read_csv(path, sep=';', encoding='utf-8-sig') lst.append(df) print(f"πŸ“‚ Partial {suffix}: {len(df)} Zeilen geladen") except Exception as e: print(f"⚠️ Partial {suffix} Ladefehler: {e}") return results_filtered, results_raw def cleanup_progress(job_id): """Progress + Partial-Files nach Abschluss lΓΆschen""" for path in [ get_progress_path(job_id), get_partial_path(job_id, 'filtered'), get_partial_path(job_id, 'raw'), ]: if os.path.exists(path): os.remove(path) # ────────────────────────────────────────────── # CSV Nachbearbeitung # ────────────────────────────────────────────── def process_result_csv(raw_bytes, input_df, apply_filter=True): try: content = raw_bytes.decode('utf-8', errors='replace') df_out = pd.read_csv(StringIO(content)) print(f"πŸ“„ Raw result: {df_out.shape}") available = [c for c in OUTPUT_COLS if c in df_out.columns] df_out = df_out[available] for col in df_out.columns: df_out[col] = df_out[col].apply(fix_encoding) if apply_filter: input_addresses = build_input_addresses(input_df) before = len(df_out) df_out = df_out[ df_out['address'].apply(lambda a: address_in_input(a, input_addresses)) ] print(f"πŸ“ Filter: {before} β†’ {len(df_out)}") df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first') df_out = df_out.dropna(subset=['title'], how='all') df_out = df_out[df_out['title'].str.strip().astype(bool)] print(f"βœ… Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}") return df_out except Exception as e: print(f"πŸ’₯ process_result_csv: {e}") return None # ────────────────────────────────────────────── # HAUPT-WORKER # ────────────────────────────────────────────── def process_file(filename, job_id, app): print(f"🎯 {filename} Job#{job_id} START!") with app.app_context(): job = Job.query.get(job_id) if not job: print("❌ Job missing") return try: #Parse + ALLE Queries job.status = "πŸ“Š parsing CSV" db.session.commit() filepath = os.path.join(UPLOAD_FOLDER, filename) print(f"πŸ“ {filepath} | {os.path.getsize(filepath)}b") df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1') print(f"πŸ“Š {df_input.shape}") queries = [] for _, row in df_input.iterrows(): parts = [ str(row.get('PLZ', '')).strip(), str(row.get('Stadt', '')).strip(), str(row.get('Straße', '')).strip(), str(row.get('Hausnummer', '')).strip(), str(row.get('Zusatz', '')).strip(), ] q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip() q = clean_query(q) # Fix 1: Sonderzeichen bereinigen if len(q) > 10: queries.append(q) total_queries = len(queries) print(f"πŸ” {total_queries} Queries | Samples: {queries[:3]}") if total_queries == 0: raise ValueError("Keine gΓΌltigen Adressen") #BATCHED Processing batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE # Resume: Fortschritt laden falls vorhanden os.makedirs(RESULT_FOLDER, exist_ok=True) progress = load_progress(job_id) start_batch = progress['last_completed_batch'] + 1 if progress else 0 all_results_filtered, all_results_raw = load_partial(job_id) if progress else ([], []) eta_initial = format_eta((batches - start_batch) * ((BATCH_DELAY_MAX + MAX_TIME) / 2)) print(f"πŸ“¦ {batches} Batches Γ  {BATCH_SIZE} | 2x Scraper | Start: {start_batch} | ETA: ~{eta_initial}") job_start_time = time.time() job.status = f"πŸ”„ Batch {start_batch+1}/{batches} | ⏱️ ~{eta_initial}" db.session.commit() for batch_idx in range(start_batch, batches): batch_start = batch_idx * BATCH_SIZE batch_end = min(batch_start + BATCH_SIZE, total_queries) batch_queries = queries[batch_start:batch_end] # 2x Scraper: abwechselnd nutzen scraper_url = SCRAPER_URLS[batch_idx % len(SCRAPER_URLS)] print(f"\nπŸ”„ BATCH {batch_idx+1}/{batches} ({batch_start+1}-{batch_end}/{total_queries}) β†’ {scraper_url}") #Random Delay delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX) print(f"😴 Delay: {delay:.0f}s") time.sleep(delay) #API Call payload = { "name": f"{filename.replace('.csv','')}-{job_id}-B{batch_idx+1:03d}", "keywords": batch_queries, "lang": "de", "depth": 1, "zoom": 15, "radius": 50, "max_time": MAX_TIME, "fast_mode": False, "proxies": [PROXY_URL] } batch_success = False # Fix 2: Retry-Logik bei Scraper-Fehler for attempt in range(1, MAX_RETRIES + 1): try: resp = requests.post(f"{scraper_url}/api/v1/jobs", json=payload, timeout=45) print(f"πŸ“€ {resp.status_code} (Versuch {attempt} | {scraper_url})") if is_blocked(resp.text): print("🚫 Batch ΓΌbersprungen (blocked)") break if resp.status_code != 201: print(f"⚠️ Batch {batch_idx+1} fehlgeschlagen: {resp.text[:100]}") if attempt < MAX_RETRIES: time.sleep(10) continue scraper_id = resp.json()['id'] print(f"βœ… Scraper: {scraper_id}") stuck_counter = 0 for poll_i in range(1, POLL_MAX + 1): r = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=15) data = r.json() status = data.get('Status', data.get('status', '?')) print(f"⏳ Poll {poll_i}: {status}") # Fix 4: Auto-Recovery bei Pending-Stuck if status == 'pending': stuck_counter += 1 if stuck_counter >= STUCK_THRESHOLD: print(f"⚠️ Job {scraper_id} hΓ€ngt – abbrechen + Neustart") requests.delete(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=10) restart_scraper(scraper_url) # Fix 3: Nur betroffenen Scraper neu starten break else: stuck_counter = 0 if status in ('ok', 'completed', 'scraped'): dl = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}/download", timeout=90) if dl.status_code == 200: df_filtered = process_result_csv(dl.content, df_input, True) df_raw = process_result_csv(dl.content, df_input, False) if df_filtered is not None: all_results_filtered.append(df_filtered) all_results_raw.append(df_raw) append_partial(job_id, df_filtered, df_raw) # Resume: sofort speichern print(f"πŸ“Š Batch {batch_idx+1}: {len(df_filtered)} filtered") batch_success = True break # Fix 2: Scraper-Fehler β†’ Retry elif status in ('failed', 'error'): print(f"πŸ’₯ Batch {batch_idx+1}: {status} (Versuch {attempt})") if attempt < MAX_RETRIES: time.sleep(10) break time.sleep(random.uniform(POLL_DELAY_MIN, POLL_DELAY_MAX)) if batch_success: break except Exception as e: print(f"πŸ’₯ Batch {batch_idx+1} Versuch {attempt}: {e}") if attempt < MAX_RETRIES: time.sleep(10) # Resume: Fortschritt nach jedem Batch speichern save_progress(job_id, batch_idx, batches) # ETA berechnen elapsed = time.time() - job_start_time done_so_far = batch_idx - start_batch + 1 if done_so_far > 0: avg_per_batch = elapsed / done_so_far remaining = (batches - batch_idx - 1) * avg_per_batch eta_str = format_eta(remaining) else: eta_str = "?" job.status = f"πŸ”„ Batch {batch_idx+2}/{batches} | ⏱️ ~{eta_str}" db.session.commit() #MERGE & SAVE job.status = "πŸ”§ merging results" db.session.commit() base = filename.replace('.csv', '') if all_results_filtered: df_final_filtered = pd.concat(all_results_filtered, ignore_index=True) df_final_filtered = df_final_filtered.drop_duplicates(subset=['title', 'address']) out_filtered = f"results_{base}_filtered.csv" df_final_filtered.to_csv( os.path.join(RESULT_FOLDER, out_filtered), index=False, encoding='utf-8-sig', sep=';' ) if all_results_raw: df_final_raw = pd.concat(all_results_raw, ignore_index=True) out_raw = f"results_{base}_all.csv" df_final_raw.to_csv( os.path.join(RESULT_FOLDER, out_raw), index=False, encoding='utf-8-sig', sep=';' ) job.result_filename = out_filtered job.result_filename_raw = out_raw job.status = f"βœ… Fertig: {len(df_final_filtered)} Firmen" # Resume: Cleanup nach Abschluss cleanup_progress(job_id) else: job.status = "❌ Keine Ergebnisse" db.session.commit() print(f"πŸŽ‰ Job {job_id} komplett!") except Exception as e: job.status = f"Failed: {str(e)[:50]}" print(f"πŸ’₯ FATAL: {e}") import traceback traceback.print_exc() db.session.commit() print(f"βœ… DONE! Status: {job.status}")