heko_webcrawler/app/webcrawler.py
2026-03-10 16:55:55 +01:00

487 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import unicodedata
import json
import threading
import pandas as pd
import requests
import time
import random
from io import StringIO
from concurrent.futures import ThreadPoolExecutor, as_completed
from app.models import db, Job
print("🆕 MODERN webcrawler LOADED! BATCHED + PROXY + RESUME + ETA + 4x SCRAPER CHUNK-PARALLEL")
UPLOAD_FOLDER = '/app/uploads'
RESULT_FOLDER = '/app/results'
SCRAPER_URLS = [
"http://gmaps-scraper-1:8080",
"http://gmaps-scraper-2:8080",
"http://gmaps-scraper-3:8080",
"http://gmaps-scraper-4:8080",
]
OUTPUT_COLS = ['title', 'category', 'address', 'website', 'phone', 'link']
PROXY_URL = "http://bitlleuv-rotate:s5hzse6hz74b@p.webshare.io:80"
API_PROXIES = {"http": PROXY_URL, "https": PROXY_URL}
_job_semaphore = threading.Semaphore(1)
# ──────────────────────────────────────────────
# Tuning
# ──────────────────────────────────────────────
BATCH_SIZE = 30 # Keywords pro Scraper-Job
BATCH_DELAY_MIN = 3 # Sekunden Pause zwischen Chunks (min)
BATCH_DELAY_MAX = 6 # Sekunden Pause zwischen Chunks (max)
MAX_TIME = 60 # Sekunden die der Scraper pro Batch hat
POLL_MAX = 90 # Max. Poll-Versuche pro Batch
POLL_DELAY_MIN = 2 # Sekunden zwischen Polls (min)
POLL_DELAY_MAX = 5 # Sekunden zwischen Polls (max)
STUCK_TIMEOUT = 300 # Sekunden bis Scraper-Neustart (5 Min)
MAX_RETRIES = 2 # Wiederholversuche pro Batch bei Fehler
PARALLEL_WORKERS = len(SCRAPER_URLS)
_partial_lock = threading.Lock()
# ──────────────────────────────────────────────
# Hilfsfunktionen
# ──────────────────────────────────────────────
def is_blocked(data):
text = str(data).lower()
blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429'])
if blocked:
print(f"🚫 BLOCKED: {str(data)[:100]}")
return blocked
def fix_encoding(text):
if not isinstance(text, str):
return text
try:
return text.encode('latin-1').decode('utf-8')
except (UnicodeEncodeError, UnicodeDecodeError):
return text
def clean_query(q):
q = ''.join(c for c in q if unicodedata.category(c) != 'Cc')
q = ' '.join(q.split())
return q.strip()
def build_input_addresses(df):
addresses = set()
for _, row in df.iterrows():
plz = str(row.get('PLZ', '')).strip()
stadt = str(row.get('Stadt', '')).strip()
str_ = str(row.get('Straße', '')).strip()
nr = str(row.get('Hausnummer', '')).strip()
zusatz = str(row.get('Zusatz', '')).strip()
full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip()
full = ' '.join(full.split())
addresses.add(full)
return addresses
def normalize_address(addr):
if not isinstance(addr, str):
return ''
addr = fix_encoding(addr)
return ' '.join(addr.lower().strip().split())
def address_in_input(result_addr, input_addresses):
norm = normalize_address(result_addr)
for inp_addr in input_addresses:
plz_match = re.search(r'\b\d{5}\b', inp_addr)
if not plz_match:
continue
plz = plz_match.group()
if plz not in norm:
continue
parts = inp_addr.split()
street = parts[0] if parts else ''
if len(street) < 4 or street[:5].lower() not in norm:
continue
hausnr = parts[1] if len(parts) > 1 else ''
if hausnr and not re.search(rf'\b{re.escape(hausnr)}\b', norm):
continue
return True
return False
def format_eta(seconds):
if seconds < 60:
return f"{int(seconds)}s"
h, rem = divmod(int(seconds), 3600)
m = rem // 60
return f"{h}h {m:02d}min" if h > 0 else f"{m}min"
# ──────────────────────────────────────────────
# Scraper-Job Cleanup
# ──────────────────────────────────────────────
def _cleanup_scraper_job(scraper_url, scraper_id):
"""Scraper-Job immer aufräumen wenn wir ihn nicht mehr brauchen"""
try:
requests.delete(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=10)
print(f"🗑️ Scraper-Job {scraper_id} gelöscht")
except Exception as e:
print(f"⚠️ Cleanup fehlgeschlagen: {e}")
# ──────────────────────────────────────────────
# Scraper-Neustart via Docker SDK
# ──────────────────────────────────────────────
def restart_scraper(scraper_url):
try:
import docker
container_name = scraper_url.split("//")[1].split(":")[0]
print(f"🔄 Starte {container_name} neu...")
client = docker.from_env()
container = client.containers.get(container_name)
container.restart()
print(f"{container_name} neu gestartet warte 15s...")
time.sleep(15)
return True
except Exception as e:
print(f"⚠️ Scraper-Neustart fehlgeschlagen: {e}")
return False
# ──────────────────────────────────────────────
# Resume: Progress-File Hilfsfunktionen
# ──────────────────────────────────────────────
def get_progress_path(job_id):
return os.path.join(RESULT_FOLDER, f"progress_{job_id}.json")
def get_partial_path(job_id, suffix):
return os.path.join(RESULT_FOLDER, f"partial_{job_id}_{suffix}.csv")
def load_progress(job_id):
path = get_progress_path(job_id)
if os.path.exists(path):
with open(path, 'r') as f:
data = json.load(f)
print(f"🔁 RESUME: ab Batch {data['last_completed_batch'] + 1}/{data['total_batches']}")
return data
return None
def save_progress(job_id, last_completed_batch, total_batches):
path = get_progress_path(job_id)
with open(path, 'w') as f:
json.dump({"last_completed_batch": last_completed_batch, "total_batches": total_batches}, f)
def append_partial(job_id, df_filtered, df_raw):
with _partial_lock:
for suffix, df in [('filtered', df_filtered), ('raw', df_raw)]:
if df is None:
continue
path = get_partial_path(job_id, suffix)
header = not os.path.exists(path)
df.to_csv(path, mode='a', index=False, header=header, encoding='utf-8-sig', sep=';')
def load_partial(job_id):
results_filtered, results_raw = [], []
for suffix, lst in [('filtered', results_filtered), ('raw', results_raw)]:
path = get_partial_path(job_id, suffix)
if os.path.exists(path):
try:
df = pd.read_csv(path, sep=';', encoding='utf-8-sig')
lst.append(df)
print(f"📂 Partial {suffix}: {len(df)} Zeilen geladen")
except Exception as e:
print(f"⚠️ Partial {suffix} Ladefehler: {e}")
return results_filtered, results_raw
def cleanup_progress(job_id):
for path in [
get_progress_path(job_id),
get_partial_path(job_id, 'filtered'),
get_partial_path(job_id, 'raw'),
]:
if os.path.exists(path):
os.remove(path)
# ──────────────────────────────────────────────
# CSV Nachbearbeitung
# ──────────────────────────────────────────────
def process_result_csv(raw_bytes, input_df, apply_filter=True):
try:
content = raw_bytes.decode('utf-8', errors='replace')
df_out = pd.read_csv(StringIO(content))
print(f"📄 Raw result: {df_out.shape}")
available = [c for c in OUTPUT_COLS if c in df_out.columns]
df_out = df_out[available]
for col in df_out.columns:
df_out[col] = df_out[col].apply(fix_encoding)
if apply_filter:
input_addresses = build_input_addresses(input_df)
before = len(df_out)
df_out = df_out[
df_out['address'].apply(lambda a: address_in_input(a, input_addresses))
]
print(f"📍 Filter: {before}{len(df_out)}")
df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first')
df_out = df_out.dropna(subset=['title'], how='all')
df_out = df_out[df_out['title'].str.strip().astype(bool)]
print(f"✅ Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}")
return df_out
except Exception as e:
print(f"💥 process_result_csv: {e}")
return None
# ──────────────────────────────────────────────
# Parallel: Einzelnen Batch verarbeiten
# ──────────────────────────────────────────────
def process_batch(batch_idx, batch_queries, scraper_url, filename, job_id, df_input):
payload = {
"name": f"{filename.replace('.csv','')}-{job_id}-B{batch_idx+1:03d}",
"keywords": batch_queries,
"lang": "de",
"depth": 1,
"zoom": 17,
"radius": 100,
"max_time": MAX_TIME,
"fast_mode": False,
"proxies": [PROXY_URL]
}
for attempt in range(1, MAX_RETRIES + 1):
scraper_id = None
try:
resp = requests.post(f"{scraper_url}/api/v1/jobs", json=payload, timeout=45)
print(f"📤 Batch {batch_idx+1}{scraper_url} | {resp.status_code} (Versuch {attempt})")
if is_blocked(resp.text):
print(f"🚫 Batch {batch_idx+1} blocked")
return None, None
if resp.status_code != 201:
print(f"⚠️ Batch {batch_idx+1} fehlgeschlagen: {resp.text[:100]}")
if attempt < MAX_RETRIES:
time.sleep(10)
continue
scraper_id = resp.json()['id']
print(f"✅ Batch {batch_idx+1} Scraper-ID: {scraper_id}")
batch_start_time = time.time()
for poll_i in range(1, POLL_MAX + 1):
r = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=15)
data = r.json()
status = data.get('Status', data.get('status', '?'))
elapsed = time.time() - batch_start_time
print(f"⏳ Batch {batch_idx+1} Poll {poll_i}: {status} | {int(elapsed)}s")
if status == 'pending' and elapsed > STUCK_TIMEOUT:
print(f"⚠️ Batch {batch_idx+1} hängt seit {int(elapsed)}s Neustart {scraper_url}")
_cleanup_scraper_job(scraper_url, scraper_id)
scraper_id = None
restart_scraper(scraper_url)
break
if status in ('ok', 'completed', 'scraped'):
dl = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}/download", timeout=90)
scraper_id = None
if dl.status_code == 200:
df_filtered = process_result_csv(dl.content, df_input, True)
df_raw = process_result_csv(dl.content, df_input, False)
print(f"📊 Batch {batch_idx+1}: {len(df_filtered) if df_filtered is not None else 0} filtered")
return df_filtered, df_raw
return None, None
elif status in ('failed', 'error'):
print(f"💥 Batch {batch_idx+1}: {status} (Versuch {attempt})")
_cleanup_scraper_job(scraper_url, scraper_id)
scraper_id = None
if attempt < MAX_RETRIES:
time.sleep(10)
break
time.sleep(random.uniform(POLL_DELAY_MIN, POLL_DELAY_MAX))
except Exception as e:
print(f"💥 Batch {batch_idx+1} Versuch {attempt}: {e}")
if scraper_id:
_cleanup_scraper_job(scraper_url, scraper_id)
scraper_id = None
if attempt < MAX_RETRIES:
time.sleep(10)
return None, None
# ──────────────────────────────────────────────
# HAUPT-WORKER
# ──────────────────────────────────────────────
def process_file(filename, job_id, app):
with app.app_context():
job = Job.query.get(job_id)
if job:
job.status = "⏳ Wartet auf anderen Job..."
db.session.commit()
with _job_semaphore:
print(f"🎯 {filename} Job#{job_id} START!")
with app.app_context():
job = Job.query.get(job_id)
if not job:
print("❌ Job missing")
return
try:
job.status = "📊 parsing CSV"
db.session.commit()
filepath = os.path.join(UPLOAD_FOLDER, filename)
print(f"📁 {filepath} | {os.path.getsize(filepath)}b")
df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1')
print(f"📊 {df_input.shape}")
queries = []
for _, row in df_input.iterrows():
parts = [
str(row.get('PLZ', '')).strip(),
str(row.get('Stadt', '')).strip(),
str(row.get('Straße', '')).strip(),
str(row.get('Hausnummer', '')).strip(),
str(row.get('Zusatz', '')).strip(),
]
q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip()
q = clean_query(q)
if len(q) > 10:
queries.append(q)
total_queries = len(queries)
print(f"🔍 {total_queries} Queries | Samples: {queries[:3]}")
if total_queries == 0:
raise ValueError("Keine gültigen Adressen")
batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE
os.makedirs(RESULT_FOLDER, exist_ok=True)
progress = load_progress(job_id)
start_batch = progress['last_completed_batch'] + 1 if progress else 0
all_results_filtered, all_results_raw = load_partial(job_id) if progress else ([], [])
eta_initial = format_eta((batches - start_batch) * ((BATCH_DELAY_MAX + MAX_TIME) / 2) / PARALLEL_WORKERS)
print(f"📦 {batches} Batches à {BATCH_SIZE} | {PARALLEL_WORKERS}x parallel (Chunk) | Start: {start_batch} | ETA: ~{eta_initial}")
job_start_time = time.time()
job.status = f"🔄 Batch {start_batch+1}/{batches} | ⏱️ ~{eta_initial}"
db.session.commit()
completed_count = 0
batch_indices = list(range(start_batch, batches))
chunks = [
batch_indices[i:i + PARALLEL_WORKERS]
for i in range(0, len(batch_indices), PARALLEL_WORKERS)
]
with ThreadPoolExecutor(max_workers=PARALLEL_WORKERS) as executor:
for chunk_idx, chunk in enumerate(chunks):
futures = {}
for batch_idx in chunk:
batch_start_q = batch_idx * BATCH_SIZE
batch_end_q = min(batch_start_q + BATCH_SIZE, total_queries)
batch_queries = queries[batch_start_q:batch_end_q]
scraper_url = SCRAPER_URLS[batch_idx % len(SCRAPER_URLS)]
print(f"\n🚀 Chunk {chunk_idx+1} | Batch {batch_idx+1}/{batches}{scraper_url}")
time.sleep(random.uniform(1, 2))
future = executor.submit(
process_batch,
batch_idx, batch_queries, scraper_url,
filename, job_id, df_input
)
futures[future] = batch_idx
for future in as_completed(futures):
batch_idx = futures[future]
completed_count += 1
try:
df_filtered, df_raw = future.result()
if df_filtered is not None:
all_results_filtered.append(df_filtered)
all_results_raw.append(df_raw)
append_partial(job_id, df_filtered, df_raw)
except Exception as e:
print(f"💥 Batch {batch_idx+1} Exception: {e}")
save_progress(job_id, batch_idx, batches)
elapsed = time.time() - job_start_time
if completed_count > 0:
avg_per_batch = elapsed / completed_count
remaining = (batches - start_batch - completed_count) * avg_per_batch / PARALLEL_WORKERS
eta_str = format_eta(remaining)
else:
eta_str = "?"
job.status = f"🔄 {completed_count}/{batches - start_batch} fertig | ⏱️ ~{eta_str}"
db.session.commit()
if chunk_idx < len(chunks) - 1:
delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX)
print(f"⏸️ Chunk {chunk_idx+1} fertig warte {delay:.1f}s...")
time.sleep(delay)
# ── MERGE & SAVE ──
job.status = "🔧 merging results"
db.session.commit()
base = filename.replace('.csv', '')
if all_results_filtered:
df_final_filtered = pd.concat(all_results_filtered, ignore_index=True)
df_final_filtered = df_final_filtered.drop_duplicates(subset=['title', 'address'])
out_filtered = f"results_{base}_filtered.csv"
df_final_filtered.to_csv(
os.path.join(RESULT_FOLDER, out_filtered),
index=False, encoding='utf-8-sig', sep=';'
)
out_raw = None
if all_results_raw:
df_final_raw = pd.concat(all_results_raw, ignore_index=True)
out_raw = f"results_{base}_all.csv"
df_final_raw.to_csv(
os.path.join(RESULT_FOLDER, out_raw),
index=False, encoding='utf-8-sig', sep=';'
)
job.result_filename = out_filtered
job.result_filename_raw = out_raw
job.status = f"✅ Fertig: {len(df_final_filtered)} Firmen"
cleanup_progress(job_id)
else:
job.status = "❌ Keine Ergebnisse"
db.session.commit()
print(f"🎉 Job {job_id} komplett!")
except Exception as e:
job.status = f"Failed: {str(e)[:50]}"
print(f"💥 FATAL: {e}")
import traceback
traceback.print_exc()
db.session.commit()
print(f"✅ DONE! Status: {job.status}")