import os import re import unicodedata import json import threading import pandas as pd import requests import time import random from io import StringIO from concurrent.futures import ThreadPoolExecutor, as_completed from app.models import db, Job print("πŸ†• MODERN webcrawler LOADED! – BATCHED + PROXY + RESUME + ETA + 4x SCRAPER CHUNK-PARALLEL") UPLOAD_FOLDER = '/app/uploads' RESULT_FOLDER = '/app/results' SCRAPER_URLS = [ "http://gmaps-scraper-1:8080", "http://gmaps-scraper-2:8080", "http://gmaps-scraper-3:8080", "http://gmaps-scraper-4:8080", ] OUTPUT_COLS = ['title', 'category', 'address', 'website', 'phone', 'link'] PROXY_URL = "http://bitlleuv-rotate:s5hzse6hz74b@p.webshare.io:80" API_PROXIES = {"http": PROXY_URL, "https": PROXY_URL} _job_semaphore = threading.Semaphore(1) # ────────────────────────────────────────────── # Tuning # ────────────────────────────────────────────── BATCH_SIZE = 30 # Keywords pro Scraper-Job BATCH_DELAY_MIN = 3 # Sekunden Pause zwischen Chunks (min) BATCH_DELAY_MAX = 6 # Sekunden Pause zwischen Chunks (max) MAX_TIME = 60 # Sekunden die der Scraper pro Batch hat POLL_MAX = 90 # Max. Poll-Versuche pro Batch POLL_DELAY_MIN = 2 # Sekunden zwischen Polls (min) POLL_DELAY_MAX = 5 # Sekunden zwischen Polls (max) STUCK_TIMEOUT = 300 # Sekunden bis Scraper-Neustart (5 Min) MAX_RETRIES = 2 # Wiederholversuche pro Batch bei Fehler PARALLEL_WORKERS = len(SCRAPER_URLS) _partial_lock = threading.Lock() # ────────────────────────────────────────────── # Hilfsfunktionen # ────────────────────────────────────────────── def is_blocked(data): text = str(data).lower() blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429']) if blocked: print(f"🚫 BLOCKED: {str(data)[:100]}") return blocked def fix_encoding(text): if not isinstance(text, str): return text try: return text.encode('latin-1').decode('utf-8') except (UnicodeEncodeError, UnicodeDecodeError): return text def clean_query(q): q = ''.join(c for c in q if unicodedata.category(c) != 'Cc') q = ' '.join(q.split()) return q.strip() def build_input_addresses(df): addresses = set() for _, row in df.iterrows(): plz = str(row.get('PLZ', '')).strip() stadt = str(row.get('Stadt', '')).strip() str_ = str(row.get('Straße', '')).strip() nr = str(row.get('Hausnummer', '')).strip() zusatz = str(row.get('Zusatz', '')).strip() full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip() full = ' '.join(full.split()) addresses.add(full) return addresses def normalize_address(addr): if not isinstance(addr, str): return '' addr = fix_encoding(addr) return ' '.join(addr.lower().strip().split()) def address_in_input(result_addr, input_addresses): norm = normalize_address(result_addr) for inp_addr in input_addresses: plz_match = re.search(r'\b\d{5}\b', inp_addr) if not plz_match: continue plz = plz_match.group() if plz not in norm: continue parts = inp_addr.split() street = parts[0] if parts else '' if len(street) < 4 or street[:5].lower() not in norm: continue hausnr = parts[1] if len(parts) > 1 else '' if hausnr and not re.search(rf'\b{re.escape(hausnr)}\b', norm): continue return True return False def format_eta(seconds): if seconds < 60: return f"{int(seconds)}s" h, rem = divmod(int(seconds), 3600) m = rem // 60 return f"{h}h {m:02d}min" if h > 0 else f"{m}min" # ────────────────────────────────────────────── # Scraper-Job Cleanup # ────────────────────────────────────────────── def _cleanup_scraper_job(scraper_url, scraper_id): """Scraper-Job immer aufrΓ€umen wenn wir ihn nicht mehr brauchen""" try: requests.delete(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=10) print(f"πŸ—‘οΈ Scraper-Job {scraper_id} gelΓΆscht") except Exception as e: print(f"⚠️ Cleanup fehlgeschlagen: {e}") # ────────────────────────────────────────────── # Scraper-Neustart via Docker SDK # ────────────────────────────────────────────── def restart_scraper(scraper_url): try: import docker container_name = scraper_url.split("//")[1].split(":")[0] print(f"πŸ”„ Starte {container_name} neu...") client = docker.from_env() container = client.containers.get(container_name) container.restart() print(f"βœ… {container_name} neu gestartet – warte 15s...") time.sleep(15) return True except Exception as e: print(f"⚠️ Scraper-Neustart fehlgeschlagen: {e}") return False # ────────────────────────────────────────────── # Resume: Progress-File Hilfsfunktionen # ────────────────────────────────────────────── def get_progress_path(job_id): return os.path.join(RESULT_FOLDER, f"progress_{job_id}.json") def get_partial_path(job_id, suffix): return os.path.join(RESULT_FOLDER, f"partial_{job_id}_{suffix}.csv") def load_progress(job_id): path = get_progress_path(job_id) if os.path.exists(path): with open(path, 'r') as f: data = json.load(f) print(f"πŸ” RESUME: ab Batch {data['last_completed_batch'] + 1}/{data['total_batches']}") return data return None def save_progress(job_id, last_completed_batch, total_batches): path = get_progress_path(job_id) with open(path, 'w') as f: json.dump({"last_completed_batch": last_completed_batch, "total_batches": total_batches}, f) def append_partial(job_id, df_filtered, df_raw): with _partial_lock: for suffix, df in [('filtered', df_filtered), ('raw', df_raw)]: if df is None: continue path = get_partial_path(job_id, suffix) header = not os.path.exists(path) df.to_csv(path, mode='a', index=False, header=header, encoding='utf-8-sig', sep=';') def load_partial(job_id): results_filtered, results_raw = [], [] for suffix, lst in [('filtered', results_filtered), ('raw', results_raw)]: path = get_partial_path(job_id, suffix) if os.path.exists(path): try: df = pd.read_csv(path, sep=';', encoding='utf-8-sig') lst.append(df) print(f"πŸ“‚ Partial {suffix}: {len(df)} Zeilen geladen") except Exception as e: print(f"⚠️ Partial {suffix} Ladefehler: {e}") return results_filtered, results_raw def cleanup_progress(job_id): for path in [ get_progress_path(job_id), get_partial_path(job_id, 'filtered'), get_partial_path(job_id, 'raw'), ]: if os.path.exists(path): os.remove(path) # ────────────────────────────────────────────── # CSV Nachbearbeitung # ────────────────────────────────────────────── def process_result_csv(raw_bytes, input_df, apply_filter=True): try: content = raw_bytes.decode('utf-8', errors='replace') df_out = pd.read_csv(StringIO(content)) print(f"πŸ“„ Raw result: {df_out.shape}") available = [c for c in OUTPUT_COLS if c in df_out.columns] df_out = df_out[available] for col in df_out.columns: df_out[col] = df_out[col].apply(fix_encoding) if apply_filter: input_addresses = build_input_addresses(input_df) before = len(df_out) df_out = df_out[ df_out['address'].apply(lambda a: address_in_input(a, input_addresses)) ] print(f"πŸ“ Filter: {before} β†’ {len(df_out)}") df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first') df_out = df_out.dropna(subset=['title'], how='all') df_out = df_out[df_out['title'].str.strip().astype(bool)] print(f"βœ… Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}") return df_out except Exception as e: print(f"πŸ’₯ process_result_csv: {e}") return None # ────────────────────────────────────────────── # Parallel: Einzelnen Batch verarbeiten # ────────────────────────────────────────────── def process_batch(batch_idx, batch_queries, scraper_url, filename, job_id, df_input): payload = { "name": f"{filename.replace('.csv','')}-{job_id}-B{batch_idx+1:03d}", "keywords": batch_queries, "lang": "de", "depth": 1, "zoom": 17, "radius": 100, "max_time": MAX_TIME, "fast_mode": False, "proxies": [PROXY_URL] } for attempt in range(1, MAX_RETRIES + 1): scraper_id = None try: resp = requests.post(f"{scraper_url}/api/v1/jobs", json=payload, timeout=45) print(f"πŸ“€ Batch {batch_idx+1} β†’ {scraper_url} | {resp.status_code} (Versuch {attempt})") if is_blocked(resp.text): print(f"🚫 Batch {batch_idx+1} blocked") return None, None if resp.status_code != 201: print(f"⚠️ Batch {batch_idx+1} fehlgeschlagen: {resp.text[:100]}") if attempt < MAX_RETRIES: time.sleep(10) continue scraper_id = resp.json()['id'] print(f"βœ… Batch {batch_idx+1} Scraper-ID: {scraper_id}") batch_start_time = time.time() for poll_i in range(1, POLL_MAX + 1): r = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=15) data = r.json() status = data.get('Status', data.get('status', '?')) elapsed = time.time() - batch_start_time print(f"⏳ Batch {batch_idx+1} Poll {poll_i}: {status} | {int(elapsed)}s") if status == 'pending' and elapsed > STUCK_TIMEOUT: print(f"⚠️ Batch {batch_idx+1} hΓ€ngt seit {int(elapsed)}s – Neustart {scraper_url}") _cleanup_scraper_job(scraper_url, scraper_id) scraper_id = None restart_scraper(scraper_url) break if status in ('ok', 'completed', 'scraped'): dl = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}/download", timeout=90) scraper_id = None if dl.status_code == 200: df_filtered = process_result_csv(dl.content, df_input, True) df_raw = process_result_csv(dl.content, df_input, False) print(f"πŸ“Š Batch {batch_idx+1}: {len(df_filtered) if df_filtered is not None else 0} filtered") return df_filtered, df_raw return None, None elif status in ('failed', 'error'): print(f"πŸ’₯ Batch {batch_idx+1}: {status} (Versuch {attempt})") _cleanup_scraper_job(scraper_url, scraper_id) scraper_id = None if attempt < MAX_RETRIES: time.sleep(10) break time.sleep(random.uniform(POLL_DELAY_MIN, POLL_DELAY_MAX)) except Exception as e: print(f"πŸ’₯ Batch {batch_idx+1} Versuch {attempt}: {e}") if scraper_id: _cleanup_scraper_job(scraper_url, scraper_id) scraper_id = None if attempt < MAX_RETRIES: time.sleep(10) return None, None # ────────────────────────────────────────────── # HAUPT-WORKER # ────────────────────────────────────────────── def process_file(filename, job_id, app): with app.app_context(): job = Job.query.get(job_id) if job: job.status = "⏳ Wartet auf anderen Job..." db.session.commit() with _job_semaphore: print(f"🎯 {filename} Job#{job_id} START!") with app.app_context(): job = Job.query.get(job_id) if not job: print("❌ Job missing") return try: job.status = "πŸ“Š parsing CSV" db.session.commit() filepath = os.path.join(UPLOAD_FOLDER, filename) print(f"πŸ“ {filepath} | {os.path.getsize(filepath)}b") df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1') print(f"πŸ“Š {df_input.shape}") queries = [] for _, row in df_input.iterrows(): parts = [ str(row.get('PLZ', '')).strip(), str(row.get('Stadt', '')).strip(), str(row.get('Straße', '')).strip(), str(row.get('Hausnummer', '')).strip(), str(row.get('Zusatz', '')).strip(), ] q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip() q = clean_query(q) if len(q) > 10: queries.append(q) total_queries = len(queries) print(f"πŸ” {total_queries} Queries | Samples: {queries[:3]}") if total_queries == 0: raise ValueError("Keine gΓΌltigen Adressen") batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE os.makedirs(RESULT_FOLDER, exist_ok=True) progress = load_progress(job_id) start_batch = progress['last_completed_batch'] + 1 if progress else 0 all_results_filtered, all_results_raw = load_partial(job_id) if progress else ([], []) eta_initial = format_eta((batches - start_batch) * ((BATCH_DELAY_MAX + MAX_TIME) / 2) / PARALLEL_WORKERS) print(f"πŸ“¦ {batches} Batches Γ  {BATCH_SIZE} | {PARALLEL_WORKERS}x parallel (Chunk) | Start: {start_batch} | ETA: ~{eta_initial}") job_start_time = time.time() job.status = f"πŸ”„ Batch {start_batch+1}/{batches} | ⏱️ ~{eta_initial}" db.session.commit() completed_count = 0 batch_indices = list(range(start_batch, batches)) chunks = [ batch_indices[i:i + PARALLEL_WORKERS] for i in range(0, len(batch_indices), PARALLEL_WORKERS) ] with ThreadPoolExecutor(max_workers=PARALLEL_WORKERS) as executor: for chunk_idx, chunk in enumerate(chunks): futures = {} for batch_idx in chunk: batch_start_q = batch_idx * BATCH_SIZE batch_end_q = min(batch_start_q + BATCH_SIZE, total_queries) batch_queries = queries[batch_start_q:batch_end_q] scraper_url = SCRAPER_URLS[batch_idx % len(SCRAPER_URLS)] print(f"\nπŸš€ Chunk {chunk_idx+1} | Batch {batch_idx+1}/{batches} β†’ {scraper_url}") time.sleep(random.uniform(1, 2)) future = executor.submit( process_batch, batch_idx, batch_queries, scraper_url, filename, job_id, df_input ) futures[future] = batch_idx for future in as_completed(futures): batch_idx = futures[future] completed_count += 1 try: df_filtered, df_raw = future.result() if df_filtered is not None: all_results_filtered.append(df_filtered) all_results_raw.append(df_raw) append_partial(job_id, df_filtered, df_raw) except Exception as e: print(f"πŸ’₯ Batch {batch_idx+1} Exception: {e}") save_progress(job_id, batch_idx, batches) elapsed = time.time() - job_start_time if completed_count > 0: avg_per_batch = elapsed / completed_count remaining = (batches - start_batch - completed_count) * avg_per_batch / PARALLEL_WORKERS eta_str = format_eta(remaining) else: eta_str = "?" job.status = f"πŸ”„ {completed_count}/{batches - start_batch} fertig | ⏱️ ~{eta_str}" db.session.commit() if chunk_idx < len(chunks) - 1: delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX) print(f"⏸️ Chunk {chunk_idx+1} fertig – warte {delay:.1f}s...") time.sleep(delay) # ── MERGE & SAVE ── job.status = "πŸ”§ merging results" db.session.commit() base = filename.replace('.csv', '') if all_results_filtered: df_final_filtered = pd.concat(all_results_filtered, ignore_index=True) df_final_filtered = df_final_filtered.drop_duplicates(subset=['title', 'address']) out_filtered = f"results_{base}_filtered.csv" df_final_filtered.to_csv( os.path.join(RESULT_FOLDER, out_filtered), index=False, encoding='utf-8-sig', sep=';' ) out_raw = None if all_results_raw: df_final_raw = pd.concat(all_results_raw, ignore_index=True) out_raw = f"results_{base}_all.csv" df_final_raw.to_csv( os.path.join(RESULT_FOLDER, out_raw), index=False, encoding='utf-8-sig', sep=';' ) job.result_filename = out_filtered job.result_filename_raw = out_raw job.status = f"βœ… Fertig: {len(df_final_filtered)} Firmen" cleanup_progress(job_id) else: job.status = "❌ Keine Ergebnisse" db.session.commit() print(f"πŸŽ‰ Job {job_id} komplett!") except Exception as e: job.status = f"Failed: {str(e)[:50]}" print(f"πŸ’₯ FATAL: {e}") import traceback traceback.print_exc() db.session.commit() print(f"βœ… DONE! Status: {job.status}")