import os import re import pandas as pd import requests import time import random from io import StringIO from app.models import db, Job print("πŸ†• MODERN webcrawler LOADED!") UPLOAD_FOLDER = '/app/uploads' RESULT_FOLDER = '/app/results' SCRAPER_URL = "http://gmaps-scraper:8080" OUTPUT_COLS = ['title', 'category', 'address', 'open_hours', 'website', 'phone', 'link'] # ────────────────────────────────────────────── # Hilfsfunktionen # ────────────────────────────────────────────── def get_batch_size(total_rows): if total_rows < 50: return 10 elif total_rows < 200: return 10 elif total_rows < 500: return 5 else: return 5 def get_delay(total_rows): if total_rows < 50: return (5, 10) elif total_rows < 200: return (10, 20) else: return (20, 40) def is_blocked(data): text = str(data).lower() blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429']) if blocked: print(f"🚫 BLOCKED: {str(data)[:100]}") return blocked def fix_encoding(text): """Kaputte ISOβ†’UTF8 Zeichen reparieren (z.B. IndustriestraΓƒΕΈe β†’ Industriestraße)""" if not isinstance(text, str): return text try: return text.encode('latin-1').decode('utf-8') except (UnicodeEncodeError, UnicodeDecodeError): return text def build_input_addresses(df): """Normalisierte Adressen aus Input-CSV fΓΌr Abgleich""" addresses = set() for _, row in df.iterrows(): plz = str(row.get('PLZ', '')).strip() stadt = str(row.get('Stadt', '')).strip() str_ = str(row.get('Straße', '')).strip() nr = str(row.get('Hausnummer', '')).strip() zusatz = str(row.get('Zusatz', '')).strip() full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip() full = ' '.join(full.split()) addresses.add(full) return addresses def normalize_address(addr): """Output-Adresse normalisieren fΓΌr Abgleich""" if not isinstance(addr, str): return '' addr = fix_encoding(addr) return ' '.join(addr.lower().strip().split()) def address_in_input(result_addr, input_addresses): """PrΓΌft ob PLZ + Straßenname aus Result im Input vorkommen""" norm = normalize_address(result_addr) for inp_addr in input_addresses: plz_match = re.search(r'\b\d{5}\b', inp_addr) if plz_match: plz = plz_match.group() if plz in norm: street = inp_addr.split()[0] if inp_addr else '' if len(street) > 3 and street[:4].lower() in norm: return True return False # ────────────────────────────────────────────── # CSV Nachbearbeitung (apply_filter umschaltbar) # ────────────────────────────────────────────── def process_result_csv(raw_bytes, input_df, apply_filter=True): """ Raw CSV β†’ bereinigt: - Nur OUTPUT_COLS - Encoding fix - Optional: Input/Output Abgleich + Duplikate """ try: content = raw_bytes.decode('utf-8', errors='replace') df_out = pd.read_csv(StringIO(content)) print(f"πŸ“„ Raw result: {df_out.shape} | Columns: {list(df_out.columns)[:8]}") # Spalten filtern available = [c for c in OUTPUT_COLS if c in df_out.columns] missing = [c for c in OUTPUT_COLS if c not in df_out.columns] if missing: print(f"⚠️ Fehlende Spalten: {missing}") df_out = df_out[available] # πŸ”€ Encoding fix for col in df_out.columns: df_out[col] = df_out[col].apply(fix_encoding) print(f"πŸ”€ Encoding fix: done") if apply_filter: # πŸ“ Input/Output Abgleich input_addresses = build_input_addresses(input_df) before = len(df_out) df_out = df_out[ df_out['address'].apply( lambda a: address_in_input(a, input_addresses) ) ] print(f"πŸ“ Adress-Filter: {before} β†’ {len(df_out)} Zeilen") # πŸ” Duplikate entfernen (immer, auch bei Raw) before_dedup = len(df_out) df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first') print(f"πŸ” Duplikate: {before_dedup} β†’ {len(df_out)} Zeilen") # Leere Titel entfernen df_out = df_out.dropna(subset=['title'], how='all') df_out = df_out[df_out['title'].str.strip().astype(bool)] print(f"βœ… Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}") return df_out except Exception as e: print(f"πŸ’₯ process_result_csv: {e}") import traceback traceback.print_exc() return None # ────────────────────────────────────────────── # Haupt-Worker # ────────────────────────────────────────────── def process_file(filename, job_id, app): print(f"🎯 {filename} Job#{job_id} START!") with app.app_context(): job = Job.query.get(job_id) if not job: print("❌ Job missing") return try: # 1️⃣ CSV Parse job.status = "πŸ“Š parsing CSV" db.session.commit() filepath = os.path.join(UPLOAD_FOLDER, filename) print(f"πŸ“ {filepath} | {os.path.getsize(filepath)}b") df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1') print(f"πŸ“Š {df_input.shape} | Columns: {list(df_input.columns)}") queries = [] for _, row in df_input.iterrows(): parts = [ str(row.get('PLZ', '')).strip(), str(row.get('Stadt', '')).strip(), str(row.get('Straße', '')).strip(), str(row.get('Hausnummer', '')).strip(), str(row.get('Zusatz', '')).strip(), ] q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip() if len(q) > 10: queries.append(q) total = len(queries) print(f"πŸ” {total} Queries | Samples: {queries[:3]}") if not queries: raise ValueError("Keine gΓΌltigen Adressen in CSV") # 2️⃣ Batch + Delay batch_size = get_batch_size(total) delay_min, delay_max = get_delay(total) batch = queries[:batch_size] pre_delay = random.uniform(delay_min, delay_max) print(f"πŸ“¦ Batch {len(batch)}/{total} | 😴 {pre_delay:.1f}s Delay") time.sleep(pre_delay) # 3️⃣ API Call job.status = "πŸ“€ sending to scraper" db.session.commit() payload = { "name": f"{filename.replace('.csv','')}-{job_id}", "keywords": batch, "lang": "de", "depth": 1, "zoom": 17, "radius": 50, "max_time": 60, "fast_mode": False } print(f"🌐 POST {SCRAPER_URL}/api/v1/jobs | {payload['name']}") resp = requests.post(f"{SCRAPER_URL}/api/v1/jobs", json=payload, timeout=30) print(f"πŸ“€ {resp.status_code}: {resp.text[:300]}") if is_blocked(resp.text): raise ValueError("🚫 IP geblockt! Proxy konfigurieren.") if resp.status_code != 201: raise ValueError(f"API {resp.status_code}: {resp.text[:200]}") # 4️⃣ Polling scraper_id = resp.json()['id'] job.scraper_job_id = scraper_id job.status = "⏳ scraping" db.session.commit() print(f"βœ… Scraper Job: {scraper_id}") for i in range(1, 61): # Max 10min try: r = requests.get( f"{SCRAPER_URL}/api/v1/jobs/{scraper_id}", timeout=10 ) data = r.json() status = data.get('Status', data.get('status', '?')) print(f"⏳ {i}/60: {status}") if is_blocked(data): raise ValueError("🚫 IP geblockt wΓ€hrend scraping!") if status in ('ok', 'completed', 'scraped'): dl = requests.get( f"{SCRAPER_URL}/api/v1/jobs/{scraper_id}/download", timeout=60 ) if dl.status_code != 200: raise ValueError(f"Download {dl.status_code}") if is_blocked(dl.text[:200]): raise ValueError("🚫 IP geblockt beim Download!") # 5️⃣ Nachbearbeitung β†’ zwei Versionen job.status = "πŸ”§ processing result" db.session.commit() base = filename.replace('.csv', '') os.makedirs(RESULT_FOLDER, exist_ok=True) # ── Version A: Gefiltert (Adressabgleich + Deduplizierung) ── df_filtered = process_result_csv(dl.content, df_input, apply_filter=True) outname_filtered = f"results_{base}_filtered.csv" outpath_filtered = os.path.join(RESULT_FOLDER, outname_filtered) if df_filtered is not None and len(df_filtered) > 0: df_filtered.to_csv( outpath_filtered, index=False, encoding='utf-8-sig', sep=';' ) print(f"🎯 Filtered: {outname_filtered} β†’ {len(df_filtered)} Firmen") else: print("⚠️ Keine Treffer nach Filter – leere Datei wird erstellt") pd.DataFrame(columns=OUTPUT_COLS).to_csv( outpath_filtered, index=False, encoding='utf-8-sig', sep=';' ) # ── Version B: Alle (nur Spalten + Encoding, kein Filter) ── df_raw = process_result_csv(dl.content, df_input, apply_filter=False) outname_raw = f"results_{base}_all.csv" outpath_raw = os.path.join(RESULT_FOLDER, outname_raw) if df_raw is not None: df_raw.to_csv( outpath_raw, index=False, encoding='utf-8-sig', sep=';' ) print(f"πŸ“‹ All: {outname_raw} β†’ {len(df_raw)} Firmen") else: print("⚠️ df_raw None – Rohinhalt wird gespeichert") with open(outpath_raw, 'wb') as f: f.write(dl.content) # ── DB speichern ── job.status = "βœ… Fertig" job.result_filename = outname_filtered # 🎯 Gefiltert job.result_filename_raw = outname_raw # πŸ“‹ Alle db.session.commit() print(f"πŸŽ‰ Beide Dateien gespeichert!") break elif status in ('failed', 'cancelled', 'error'): raise ValueError(f"Scraper: {status}") except requests.RequestException as e: print(f"⚠️ Poll {i}: {e}") time.sleep(random.uniform(8, 15)) else: raise ValueError("Timeout nach 10min") except Exception as e: job.status = "Failed" job.result_filename = str(e) print(f"πŸ’₯ ERROR: {e}") import traceback traceback.print_exc() db.session.commit() print(f"βœ… DONE! Status: {job.status}\n")