Initial commit

This commit is contained in:
mkrieger 2026-03-10 11:33:18 +01:00
parent 387bc056b9
commit df8c2313a9
275 changed files with 12939 additions and 263 deletions

View file

@ -0,0 +1,429 @@
import os
import re
import unicodedata
import json
import pandas as pd
import requests
import time
import random
from io import StringIO
from app.models import db, Job
print("🆕 MODERN webcrawler LOADED! BATCHED + PROXY + RESUME + ETA + 2x SCRAPER")
UPLOAD_FOLDER = '/app/uploads'
RESULT_FOLDER = '/app/results'
# 2x Scraper abwechselnd genutzt
SCRAPER_URLS = [
"http://gmaps-scraper-1:8080",
"http://gmaps-scraper-2:8080",
]
OUTPUT_COLS = ['title', 'category', 'address', 'open_hours', 'website', 'phone', 'link']
PROXY_URL = "http://bitlleuv-rotate:s5hzse6hz74b@p.webshare.io:80"
API_PROXIES = {"http": PROXY_URL, "https": PROXY_URL}
# ──────────────────────────────────────────────
# Tuning
# ──────────────────────────────────────────────
BATCH_SIZE = 30 # Keywords pro Scraper-Job
BATCH_DELAY_MIN = 3 # Sekunden Pause zwischen Batches (min)
BATCH_DELAY_MAX = 6 # Sekunden Pause zwischen Batches (max)
MAX_TIME = 60 # Sekunden die der Scraper pro Batch hat
POLL_MAX = 90 # Max. Poll-Versuche pro Batch
POLL_DELAY_MIN = 2 # Sekunden zwischen Polls (min)
POLL_DELAY_MAX = 5 # Sekunden zwischen Polls (max)
STUCK_THRESHOLD = 8 # Polls auf 'pending' bis Auto-Restart
MAX_RETRIES = 2 # Wiederholversuche pro Batch bei Fehler
# ──────────────────────────────────────────────
# Hilfsfunktionen
# ──────────────────────────────────────────────
def is_blocked(data):
text = str(data).lower()
blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429'])
if blocked:
print(f"🚫 BLOCKED: {str(data)[:100]}")
return blocked
def fix_encoding(text):
if not isinstance(text, str):
return text
try:
return text.encode('latin-1').decode('utf-8')
except (UnicodeEncodeError, UnicodeDecodeError):
return text
# Fix 1: Sonderzeichen in Queries bereinigen
def clean_query(q):
"""Steuerzeichen + fehlerhafte Bytes entfernen für saubere Google Maps URLs"""
q = ''.join(c for c in q if unicodedata.category(c) != 'Cc')
q = ' '.join(q.split())
return q.strip()
def build_input_addresses(df):
addresses = set()
for _, row in df.iterrows():
plz = str(row.get('PLZ', '')).strip()
stadt = str(row.get('Stadt', '')).strip()
str_ = str(row.get('Straße', '')).strip()
nr = str(row.get('Hausnummer', '')).strip()
zusatz = str(row.get('Zusatz', '')).strip()
full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip()
full = ' '.join(full.split())
addresses.add(full)
return addresses
def normalize_address(addr):
if not isinstance(addr, str):
return ''
addr = fix_encoding(addr)
return ' '.join(addr.lower().strip().split())
def address_in_input(result_addr, input_addresses):
norm = normalize_address(result_addr)
for inp_addr in input_addresses:
plz_match = re.search(r'\b\d{5}\b', inp_addr)
if plz_match:
plz = plz_match.group()
if plz in norm:
street = inp_addr.split()[0] if inp_addr else ''
if len(street) > 3 and street[:4].lower() in norm:
return True
return False
def format_eta(seconds):
"""Sekunden → lesbares ETA-Format"""
if seconds < 60:
return f"{int(seconds)}s"
h, rem = divmod(int(seconds), 3600)
m = rem // 60
return f"{h}h {m:02d}min" if h > 0 else f"{m}min"
# ──────────────────────────────────────────────
# Fix 3: Scraper-Neustart bei Inactivity
# ──────────────────────────────────────────────
def restart_scraper(scraper_url):
"""Den betroffenen Scraper-Container neu starten"""
try:
import subprocess
# Container-Name aus URL ableiten: http://gmaps-scraper-1:8080 → gmaps-scraper-1
container = scraper_url.split("//")[1].split(":")[0]
print(f"🔄 Starte {container} neu...")
subprocess.run(["docker", "restart", container], timeout=30, capture_output=True)
print(f"✅ {container} neu gestartet warte 15s...")
time.sleep(15)
return True
except Exception as e:
print(f"⚠️ Scraper-Neustart fehlgeschlagen: {e}")
return False
# ──────────────────────────────────────────────
# Resume: Progress-File Hilfsfunktionen
# ──────────────────────────────────────────────
def get_progress_path(job_id):
return os.path.join(RESULT_FOLDER, f"progress_{job_id}.json")
def get_partial_path(job_id, suffix):
return os.path.join(RESULT_FOLDER, f"partial_{job_id}_{suffix}.csv")
def load_progress(job_id):
"""Gespeicherten Fortschritt laden (falls vorhanden)"""
path = get_progress_path(job_id)
if os.path.exists(path):
with open(path, 'r') as f:
data = json.load(f)
print(f"🔁 RESUME: ab Batch {data['last_completed_batch'] + 1}/{data['total_batches']}")
return data
return None
def save_progress(job_id, last_completed_batch, total_batches):
"""Fortschritt nach jedem Batch speichern"""
path = get_progress_path(job_id)
with open(path, 'w') as f:
json.dump({"last_completed_batch": last_completed_batch, "total_batches": total_batches}, f)
def append_partial(job_id, df_filtered, df_raw):
"""Batch-Ergebnis an Partial-CSV anhängen"""
for suffix, df in [('filtered', df_filtered), ('raw', df_raw)]:
if df is None:
continue
path = get_partial_path(job_id, suffix)
header = not os.path.exists(path)
df.to_csv(path, mode='a', index=False, header=header, encoding='utf-8-sig', sep=';')
def load_partial(job_id):
"""Bestehende Partial-CSVs laden"""
results_filtered, results_raw = [], []
for suffix, lst in [('filtered', results_filtered), ('raw', results_raw)]:
path = get_partial_path(job_id, suffix)
if os.path.exists(path):
try:
df = pd.read_csv(path, sep=';', encoding='utf-8-sig')
lst.append(df)
print(f"📂 Partial {suffix}: {len(df)} Zeilen geladen")
except Exception as e:
print(f"⚠️ Partial {suffix} Ladefehler: {e}")
return results_filtered, results_raw
def cleanup_progress(job_id):
"""Progress + Partial-Files nach Abschluss löschen"""
for path in [
get_progress_path(job_id),
get_partial_path(job_id, 'filtered'),
get_partial_path(job_id, 'raw'),
]:
if os.path.exists(path):
os.remove(path)
# ──────────────────────────────────────────────
# CSV Nachbearbeitung
# ──────────────────────────────────────────────
def process_result_csv(raw_bytes, input_df, apply_filter=True):
try:
content = raw_bytes.decode('utf-8', errors='replace')
df_out = pd.read_csv(StringIO(content))
print(f"📄 Raw result: {df_out.shape}")
available = [c for c in OUTPUT_COLS if c in df_out.columns]
df_out = df_out[available]
for col in df_out.columns:
df_out[col] = df_out[col].apply(fix_encoding)
if apply_filter:
input_addresses = build_input_addresses(input_df)
before = len(df_out)
df_out = df_out[
df_out['address'].apply(lambda a: address_in_input(a, input_addresses))
]
print(f"📍 Filter: {before} → {len(df_out)}")
df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first')
df_out = df_out.dropna(subset=['title'], how='all')
df_out = df_out[df_out['title'].str.strip().astype(bool)]
print(f"✅ Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}")
return df_out
except Exception as e:
print(f"💥 process_result_csv: {e}")
return None
# ──────────────────────────────────────────────
# HAUPT-WORKER
# ──────────────────────────────────────────────
def process_file(filename, job_id, app):
print(f"🎯 {filename} Job#{job_id} START!")
with app.app_context():
job = Job.query.get(job_id)
if not job:
print("❌ Job missing")
return
try:
#Parse + ALLE Queries
job.status = "📊 parsing CSV"
db.session.commit()
filepath = os.path.join(UPLOAD_FOLDER, filename)
print(f"📁 {filepath} | {os.path.getsize(filepath)}b")
df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1')
print(f"📊 {df_input.shape}")
queries = []
for _, row in df_input.iterrows():
parts = [
str(row.get('PLZ', '')).strip(),
str(row.get('Stadt', '')).strip(),
str(row.get('Straße', '')).strip(),
str(row.get('Hausnummer', '')).strip(),
str(row.get('Zusatz', '')).strip(),
]
q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip()
q = clean_query(q) # Fix 1: Sonderzeichen bereinigen
if len(q) > 10:
queries.append(q)
total_queries = len(queries)
print(f"🔍 {total_queries} Queries | Samples: {queries[:3]}")
if total_queries == 0:
raise ValueError("Keine gültigen Adressen")
#BATCHED Processing
batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE
# Resume: Fortschritt laden falls vorhanden
os.makedirs(RESULT_FOLDER, exist_ok=True)
progress = load_progress(job_id)
start_batch = progress['last_completed_batch'] + 1 if progress else 0
all_results_filtered, all_results_raw = load_partial(job_id) if progress else ([], [])
eta_initial = format_eta((batches - start_batch) * ((BATCH_DELAY_MAX + MAX_TIME) / 2))
print(f"📦 {batches} Batches à {BATCH_SIZE} | 2x Scraper | Start: {start_batch} | ETA: ~{eta_initial}")
job_start_time = time.time()
job.status = f"🔄 Batch {start_batch+1}/{batches} | ⏱️ ~{eta_initial}"
db.session.commit()
for batch_idx in range(start_batch, batches):
batch_start = batch_idx * BATCH_SIZE
batch_end = min(batch_start + BATCH_SIZE, total_queries)
batch_queries = queries[batch_start:batch_end]
# 2x Scraper: abwechselnd nutzen
scraper_url = SCRAPER_URLS[batch_idx % len(SCRAPER_URLS)]
print(f"\n🔄 BATCH {batch_idx+1}/{batches} ({batch_start+1}-{batch_end}/{total_queries}) → {scraper_url}")
#Random Delay
delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX)
print(f"😴 Delay: {delay:.0f}s")
time.sleep(delay)
#API Call
payload = {
"name": f"{filename.replace('.csv','')}-{job_id}-B{batch_idx+1:03d}",
"keywords": batch_queries,
"lang": "de",
"depth": 1,
"zoom": 15,
"radius": 50,
"max_time": MAX_TIME,
"fast_mode": False,
"proxies": [PROXY_URL]
}
batch_success = False
# Fix 2: Retry-Logik bei Scraper-Fehler
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = requests.post(f"{scraper_url}/api/v1/jobs", json=payload, timeout=45)
print(f"📤 {resp.status_code} (Versuch {attempt} | {scraper_url})")
if is_blocked(resp.text):
print("🚫 Batch übersprungen (blocked)")
break
if resp.status_code != 201:
print(f"⚠️ Batch {batch_idx+1} fehlgeschlagen: {resp.text[:100]}")
if attempt < MAX_RETRIES:
time.sleep(10)
continue
scraper_id = resp.json()['id']
print(f"✅ Scraper: {scraper_id}")
stuck_counter = 0
for poll_i in range(1, POLL_MAX + 1):
r = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=15)
data = r.json()
status = data.get('Status', data.get('status', '?'))
print(f"⏳ Poll {poll_i}: {status}")
# Fix 4: Auto-Recovery bei Pending-Stuck
if status == 'pending':
stuck_counter += 1
if stuck_counter >= STUCK_THRESHOLD:
print(f"⚠️ Job {scraper_id} hängt abbrechen + Neustart")
requests.delete(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=10)
restart_scraper(scraper_url) # Fix 3: Nur betroffenen Scraper neu starten
break
else:
stuck_counter = 0
if status in ('ok', 'completed', 'scraped'):
dl = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}/download", timeout=90)
if dl.status_code == 200:
df_filtered = process_result_csv(dl.content, df_input, True)
df_raw = process_result_csv(dl.content, df_input, False)
if df_filtered is not None:
all_results_filtered.append(df_filtered)
all_results_raw.append(df_raw)
append_partial(job_id, df_filtered, df_raw) # Resume: sofort speichern
print(f"📊 Batch {batch_idx+1}: {len(df_filtered)} filtered")
batch_success = True
break
# Fix 2: Scraper-Fehler → Retry
elif status in ('failed', 'error'):
print(f"💥 Batch {batch_idx+1}: {status} (Versuch {attempt})")
if attempt < MAX_RETRIES:
time.sleep(10)
break
time.sleep(random.uniform(POLL_DELAY_MIN, POLL_DELAY_MAX))
if batch_success:
break
except Exception as e:
print(f"💥 Batch {batch_idx+1} Versuch {attempt}: {e}")
if attempt < MAX_RETRIES:
time.sleep(10)
# Resume: Fortschritt nach jedem Batch speichern
save_progress(job_id, batch_idx, batches)
# ETA berechnen
elapsed = time.time() - job_start_time
done_so_far = batch_idx - start_batch + 1
if done_so_far > 0:
avg_per_batch = elapsed / done_so_far
remaining = (batches - batch_idx - 1) * avg_per_batch
eta_str = format_eta(remaining)
else:
eta_str = "?"
job.status = f"🔄 Batch {batch_idx+2}/{batches} | ⏱️ ~{eta_str}"
db.session.commit()
#MERGE & SAVE
job.status = "🔧 merging results"
db.session.commit()
base = filename.replace('.csv', '')
if all_results_filtered:
df_final_filtered = pd.concat(all_results_filtered, ignore_index=True)
df_final_filtered = df_final_filtered.drop_duplicates(subset=['title', 'address'])
out_filtered = f"results_{base}_filtered.csv"
df_final_filtered.to_csv(
os.path.join(RESULT_FOLDER, out_filtered),
index=False, encoding='utf-8-sig', sep=';'
)
if all_results_raw:
df_final_raw = pd.concat(all_results_raw, ignore_index=True)
out_raw = f"results_{base}_all.csv"
df_final_raw.to_csv(
os.path.join(RESULT_FOLDER, out_raw),
index=False, encoding='utf-8-sig', sep=';'
)
job.result_filename = out_filtered
job.result_filename_raw = out_raw
job.status = f"✅ Fertig: {len(df_final_filtered)} Firmen"
# Resume: Cleanup nach Abschluss
cleanup_progress(job_id)
else:
job.status = "❌ Keine Ergebnisse"
db.session.commit()
print(f"🎉 Job {job_id} komplett!")
except Exception as e:
job.status = f"Failed: {str(e)[:50]}"
print(f"💥 FATAL: {e}")
import traceback
traceback.print_exc()
db.session.commit()
print(f"✅ DONE! Status: {job.status}")