diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a24f49f --- /dev/null +++ b/.gitignore @@ -0,0 +1,163 @@ +### Flask ### +instance/* +!instance/.gitignore +.webassets-cache +.env + +### Flask.Python Stack ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Uploads / Results +uploads/ +results/ +scraper-data-*/ diff --git a/Dockerfile b/Dockerfile index 2dc2c9d..6be6427 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,12 +7,14 @@ WORKDIR /app # Abhängigkeiten installieren COPY requirements.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txt +RUN apt update +RUN apt install curl -y # App-Dateien kopieren COPY . . -# Flask Umgebungsvariable setzen ENV FLASK_APP=app +ENV FLASK_ENV=production -# Flask starten +EXPOSE 5000 CMD ["flask", "run", "--host=0.0.0.0", "--port=5000"] diff --git a/app/__init__.py b/app/__init__.py index 23efef7..70abb2b 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,52 +1,88 @@ import os -from flask import Flask, redirect, url_for, request +from flask import Flask, redirect, url_for, request, current_app from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, current_user -from .models import db, User +from flask_migrate import Migrate +from sqlalchemy import text -# Konfiguration für Upload- und Ergebnis-Ordner +# ✅ Docker-Pfade UPLOAD_FOLDER = '/app/uploads' RESULT_FOLDER = '/app/results' +db = SQLAlchemy() +login_manager = LoginManager() +migrate = Migrate() + + +def _run_migrations(app): + """Fehlende DB-Spalten automatisch hinzufügen – übersteht jeden Neustart""" + migrations = [ + ("job", "result_filename_raw", "VARCHAR(150)"), + ("job", "scraper_job_id", "VARCHAR(255)"), + ("user", "is_admin", "BOOLEAN DEFAULT 0"), + ] + with app.app_context(): + for table, column, col_type in migrations: + try: + db.session.execute(text(f"ALTER TABLE {table} ADD COLUMN {column} {col_type}")) + db.session.commit() + print(f"✅ Migration: {table}.{column} hinzugefügt") + except Exception: + db.session.rollback() + + def create_app(): app = Flask(__name__) + + # 🔑 Configs app.config['SECRET_KEY'] = '008e7369b075886d5f494c8813efdfb17155da6af12b3fe8ee' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' + app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['RESULT_FOLDER'] = RESULT_FOLDER + app.config['ALLOW_USER_SIGNUP'] = False + # DB + Tools db.init_app(app) - - # Flask-Login Setup - login_manager = LoginManager() - login_manager.login_view = 'auth.login' + migrate.init_app(app, db) login_manager.init_app(app) + login_manager.login_view = 'auth.login' + # User Loader @login_manager.user_loader def load_user(user_id): + from .models import User return User.query.get(int(user_id)) - # Umleitung nicht authentifizierter Benutzer, statische Dateien und bestimmte Routen ausnehmen + # Protected Routes @app.before_request def require_login(): - allowed_routes = ['auth.login', 'auth.signup'] - - # Prüfen, ob der Benutzer authentifiziert ist oder eine erlaubte Route anfragt - if (not current_user.is_authenticated - and request.endpoint not in allowed_routes - and not request.path.startswith('/static/')): + allowed = ['auth.login', 'auth.signup', 'static'] + if (not current_user.is_authenticated and + request.endpoint not in allowed and + not request.path.startswith('/static')): return redirect(url_for('auth.login')) - # Erstellen Sie die Ordner, falls sie noch nicht existieren + # Ordner os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(app.config['RESULT_FOLDER'], exist_ok=True) - # Registrieren der Routen + # Routes from . import routes app.register_blueprint(routes.bp) - # Erstellen der Tabellen in der Datenbank + # Index Redirect + @app.route('/') + def index(): + return redirect(url_for('auth.job_status')) + + # DB Tables + Auto-Migration with app.app_context(): db.create_all() + _run_migrations(app) return app + +if __name__ == '__main__': + app = create_app() + app.run(host='0.0.0.0', port=5000, debug=False) diff --git a/app/__pycache__/__init__.cpython-310.pyc b/app/__pycache__/__init__.cpython-310.pyc index 41c64c9..3995c36 100644 Binary files a/app/__pycache__/__init__.cpython-310.pyc and b/app/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/__pycache__/models.cpython-310.pyc b/app/__pycache__/models.cpython-310.pyc index 641a174..0c6ace9 100644 Binary files a/app/__pycache__/models.cpython-310.pyc and b/app/__pycache__/models.cpython-310.pyc differ diff --git a/app/__pycache__/routes.cpython-310.pyc b/app/__pycache__/routes.cpython-310.pyc index 38d5d36..9db4eec 100644 Binary files a/app/__pycache__/routes.cpython-310.pyc and b/app/__pycache__/routes.cpython-310.pyc differ diff --git a/app/__pycache__/webcrawler.cpython-310.pyc b/app/__pycache__/webcrawler.cpython-310.pyc index b2349e9..9e8bb2e 100644 Binary files a/app/__pycache__/webcrawler.cpython-310.pyc and b/app/__pycache__/webcrawler.cpython-310.pyc differ diff --git a/app/init.py.bak b/app/init.py.bak new file mode 100644 index 0000000..7e77ff6 --- /dev/null +++ b/app/init.py.bak @@ -0,0 +1,68 @@ +import os +from flask import Flask, redirect, url_for, request, current_app +from flask_sqlalchemy import SQLAlchemy +from flask_login import LoginManager, current_user +from flask_migrate import Migrate + +# ✅ Docker-Pfade +UPLOAD_FOLDER = '/app/uploads' +RESULT_FOLDER = '/app/results' + +db = SQLAlchemy() +login_manager = LoginManager() +migrate = Migrate() + +def create_app(): + app = Flask(__name__) + + # 🔑 Configs + app.config['SECRET_KEY'] = '008e7369b075886d5f494c8813efdfb17155da6af12b3fe8ee' + app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' + app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER + app.config['RESULT_FOLDER'] = RESULT_FOLDER + app.config['ALLOW_USER_SIGNUP'] = True # ✅ Aktiviert! + + # DB + Tools + db.init_app(app) + migrate.init_app(app, db) + login_manager.init_app(app) + login_manager.login_view = 'auth.login' + + # User Loader + @login_manager.user_loader + def load_user(user_id): + from .models import User + return User.query.get(int(user_id)) + + # Protected Routes + @app.before_request + def require_login(): + allowed = ['auth.login', 'auth.signup', 'static'] + if (not current_user.is_authenticated and + request.endpoint not in allowed and + not request.path.startswith('/static')): + return redirect(url_for('auth.login')) + + # Ordner + os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) + os.makedirs(app.config['RESULT_FOLDER'], exist_ok=True) + + # Routes + from . import routes + app.register_blueprint(routes.bp) + + # Index Redirect + @app.route('/') + def index(): + return redirect(url_for('auth.job_status')) + + # DB Tables + with app.app_context(): + db.create_all() + + return app + +if __name__ == '__main__': + app = create_app() + app.run(host='0.0.0.0', port=5000, debug=False) diff --git a/app/models.py b/app/models.py index 6a5c7f4..203a390 100644 --- a/app/models.py +++ b/app/models.py @@ -1,20 +1,25 @@ -from flask_sqlalchemy import SQLAlchemy from flask_login import UserMixin from datetime import datetime - -db = SQLAlchemy() +from . import db class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password = db.Column(db.String(150), nullable=False) + is_admin = db.Column(db.Boolean, default=False) class Job(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) filename = db.Column(db.String(150), nullable=False) - status = db.Column(db.String(50), default="Pending") # Status: Pending, In Progress, Completed + status = db.Column(db.String(50), default="Pending") created_at = db.Column(db.DateTime, default=datetime.utcnow) result_filename = db.Column(db.String(150), nullable=True) - + result_filename_raw = db.Column(db.String(150), nullable=True) + user = db.relationship('User', backref=db.backref('jobs', lazy=True)) + +class AppConfig(db.Model): + id = db.Column(db.Integer, primary_key=True) + key = db.Column(db.String(100), unique=True, nullable=False) + value = db.Column(db.String(100), nullable=False, default='false') diff --git a/app/routes.orig b/app/routes.orig new file mode 100644 index 0000000..378517f --- /dev/null +++ b/app/routes.orig @@ -0,0 +1,223 @@ +import time +import csv +import os +import threading +from flask import Blueprint, request, redirect, url_for, flash, render_template, send_file, current_app +from flask_login import login_user, logout_user, login_required, current_user +from werkzeug.utils import secure_filename +from werkzeug.security import generate_password_hash, check_password_hash +from .models import db, User, Job +from .webcrawler import process_file # Importiere die Funktion für das Webscraping + +UPLOAD_FOLDER = 'uploads' +RESULT_FOLDER = 'results' + +# Blueprint für auth erstellen +bp = Blueprint('auth', __name__) + +@bp.route('/login', methods=['GET', 'POST']) +def login(): + if request.method == 'POST': + username = request.form['username'] + password = request.form['password'] + user = User.query.filter_by(username=username).first() + if user and check_password_hash(user.password, password): + login_user(user) + return redirect(url_for('auth.job_status')) + flash('Login fehlgeschlagen. Überprüfen Sie Benutzername und Passwort.') + return render_template('login.html') + +@bp.route('/signup', methods=['GET', 'POST']) +def signup(): + if not current_app.config['ALLOW_USER_SIGNUP']: + flash("Registrierung ist derzeit deaktiviert.") + return redirect(url_for('auth.login')) + + if request.method == 'POST': + username = request.form['username'] + password = generate_password_hash(request.form['password'], method='sha256') + new_user = User(username=username, password=password) + db.session.add(new_user) + db.session.commit() + flash('Benutzer erfolgreich erstellt! Sie können sich jetzt einloggen.') + return redirect(url_for('auth.login')) + + return render_template('signup.html') + +@bp.route('/logout') +@login_required +def logout(): + logout_user() + return redirect(url_for('auth.login')) + +@bp.route('/jobs') +@login_required +def job_status(): + jobs = Job.query.filter_by(user_id=current_user.id).all() + return render_template('jobs.html', jobs=jobs) + +@bp.route('/upload', methods=['GET', 'POST']) +@login_required +def upload(): + if request.method == 'POST': + file = request.files['file'] + filename = secure_filename(file.filename) + + # Überprüfen, ob eine Datei mit dem gleichen Namen bereits existiert + file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename) + if os.path.exists(file_path): + # Wenn eine Datei mit dem gleichen Namen existiert, einen Zeitstempel hinzufügen + name, ext = os.path.splitext(filename) + timestamp = time.strftime("%Y%m%d-%H%M%S") # Zeitstempel im Format JahrMonatTag-StundenMinutenSekunden + filename = f"{name}_{timestamp}{ext}" + file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename) + flash(f"Eine Datei mit gleichem Namen existierte bereits. Die Datei wurde als '{filename}' gespeichert.") + + # Speichern der Datei + file.save(file_path) + flash('Datei erfolgreich hochgeladen und Job gestartet') + + # Neuen Job erstellen + new_job = Job(user_id=current_user.id, filename=filename, status="Pending") + db.session.add(new_job) + db.session.commit() + + # Debugging-Ausgabe zur Überprüfung der Thread-Erstellung + print(f"Starte Scraping-Thread für Job-ID: {new_job.id}") + + # Starten des Scraping im Hintergrund-Thread und Übergeben des aktuellen Anwendungskontexts + thread = threading.Thread(target=process_file, args=(filename, new_job.id, current_app._get_current_object())) + thread.start() + + # Debugging-Ausgabe, nachdem der Thread gestartet wurde + print(f"Thread für Job {new_job.id} erfolgreich gestartet.") + + return redirect(url_for('auth.job_status')) + + return render_template('upload.html') + +@bp.route('/download/', methods=['GET']) +@login_required +def download_result(job_id): + job = Job.query.get_or_404(job_id) + print(f"Job ID: {job.id} - User ID: {job.user_id} - Current User ID: {current_user.id}") + + # Überprüfen, ob der Job dem aktuellen Benutzer gehört + if job.user_id != current_user.id: + flash("Sie haben keine Berechtigung, dieses Ergebnis herunterzuladen.") + return redirect(url_for('auth.job_status')) + + # Überprüfen, ob das Ergebnis vorhanden ist + if not job.result_filename: + flash("Das Ergebnis ist noch nicht verfügbar.") + return redirect(url_for('auth.job_status')) + + # Überprüfen, ob die Datei im angegebenen Pfad existiert + result_path = os.path.join(current_app.config['RESULT_FOLDER'], job.result_filename) + print(f"Versuche, Datei herunterzuladen von: {result_path}") + + if os.path.exists(result_path): + print("Datei existiert und wird zum Download bereitgestellt.") + return send_file(result_path, as_attachment=True) + else: + print("Datei nicht gefunden. Ergebnisverzeichnis oder Pfad prüfen.") + flash("Ergebnisdatei nicht gefunden.") + return redirect(url_for('auth.job_status')) + + +@bp.route('/delete_job/', methods=['POST']) +@login_required +def delete_job(job_id): + job = Job.query.get_or_404(job_id) + if job.user_id != current_user.id: + flash("Sie haben keine Berechtigung, diesen Job zu löschen.") + return redirect(url_for('auth.job_status')) + + # Löschen der Upload-Datei + upload_path = os.path.join(current_app.config['UPLOAD_FOLDER'], job.filename) + if os.path.exists(upload_path): + os.remove(upload_path) + print(f"Upload-Datei gelöscht: {upload_path}") + else: + print(f"Upload-Datei nicht gefunden: {upload_path}") + + # Löschen der Results-Datei, falls vorhanden + if job.result_filename: + result_path = os.path.join(current_app.config['RESULT_FOLDER'], job.result_filename) + print(f"Versuche Ergebnisdatei zu löschen: {result_path}") + + if os.path.exists(result_path): + try: + os.remove(result_path) + print(f"Ergebnisdatei gelöscht: {result_path}") + except Exception as e: + print(f"Fehler beim Löschen der Ergebnisdatei: {e}") + else: + print(f"Ergebnisdatei nicht gefunden im Pfad: {result_path}") + + # Job aus der Datenbank löschen + db.session.delete(job) + db.session.commit() + flash("Job erfolgreich gelöscht.") + return redirect(url_for('auth.job_status')) + +@bp.route('/admin', methods=['GET']) +@login_required +def admin_panel(): + if not current_user.is_admin: + flash("Keine Berechtigung.") + return redirect(url_for('auth.job_status')) + + users = User.query.all() + return render_template('admin_panel.html', users=users) + +@bp.route('/admin/create_user', methods=['POST']) +@login_required +def create_user(): + if not current_user.is_admin: + flash("Keine Berechtigung.") + return redirect(url_for('auth.admin_panel')) + + username = request.form['username'] + password = request.form['password'] + is_admin = 'is_admin' in request.form # Checkbox für Adminrechte + + hashed_password = generate_password_hash(password, method='sha256') + new_user = User(username=username, password=hashed_password, is_admin=is_admin) + db.session.add(new_user) + db.session.commit() + + flash(f"Benutzer {username} wurde erstellt.") + return redirect(url_for('auth.admin_panel')) + +@bp.route('/admin/reset_password/', methods=['POST']) +@login_required +def reset_password(user_id): + if not current_user.is_admin: + flash("Keine Berechtigung.") + return redirect(url_for('auth.admin_panel')) + + user = User.query.get_or_404(user_id) + new_password = request.form['new_password'] + user.password = generate_password_hash(new_password, method='sha256') + db.session.commit() + + flash(f"Passwort für Benutzer {user.username} wurde zurückgesetzt.") + return redirect(url_for('auth.admin_panel')) + +@bp.route('/admin/delete_user/', methods=['POST']) +@login_required +def delete_user(user_id): + if not current_user.is_admin: + flash("Keine Berechtigung.") + return redirect(url_for('auth.admin_panel')) + + user = User.query.get_or_404(user_id) + if user.is_admin: + flash("Administratoren können nicht gelöscht werden.") + return redirect(url_for('auth.admin_panel')) + + db.session.delete(user) + db.session.commit() + flash(f"Benutzer {user.username} wurde gelöscht.") + return redirect(url_for('auth.admin_panel')) diff --git a/app/routes.py b/app/routes.py index 779d11c..1d2c52f 100644 --- a/app/routes.py +++ b/app/routes.py @@ -1,17 +1,16 @@ -import csv +import time import os import threading -from flask import Blueprint, request, redirect, url_for, flash, render_template, send_file, current_app +from flask import Blueprint, request, redirect, url_for, flash, render_template, send_file, jsonify, current_app from flask_login import login_user, logout_user, login_required, current_user from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash -from .models import db, User, Job -from .webcrawler import process_file # Importiere die Funktion für das Webscraping +from .models import db, User, Job, AppConfig +from .webcrawler import process_file -UPLOAD_FOLDER = 'uploads' -RESULT_FOLDER = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'results') +UPLOAD_FOLDER = '/app/uploads' +RESULT_FOLDER = '/app/results' -# Blueprint für auth erstellen bp = Blueprint('auth', __name__) @bp.route('/login', methods=['GET', 'POST']) @@ -28,13 +27,18 @@ def login(): @bp.route('/signup', methods=['GET', 'POST']) def signup(): + cfg = AppConfig.query.filter_by(key='allow_signup').first() + if not cfg or cfg.value != 'true': + flash("Registrierung ist derzeit deaktiviert.") + return redirect(url_for('auth.login')) + if request.method == 'POST': username = request.form['username'] - password = generate_password_hash(request.form['password'], method='sha256') + password = generate_password_hash(request.form['password']) # ✅ Fix new_user = User(username=username, password=password) db.session.add(new_user) db.session.commit() - flash('Benutzer erfolgreich erstellt! Sie können sich jetzt einloggen.') + flash('Benutzer erfolgreich erstellt!') return redirect(url_for('auth.login')) return render_template('signup.html') @@ -47,102 +51,203 @@ def logout(): @bp.route('/jobs') @login_required def job_status(): - jobs = Job.query.filter_by(user_id=current_user.id).all() + jobs = Job.query.filter_by(user_id=current_user.id).order_by(Job.created_at.desc()).all() return render_template('jobs.html', jobs=jobs) -# Hochladen und Verarbeiten der Datei im Hintergrund @bp.route('/upload', methods=['GET', 'POST']) @login_required def upload(): if request.method == 'POST': - file = request.files['file'] - filename = secure_filename(file.filename) - if not filename.endswith('.csv'): - flash('Bitte eine CSV-Datei hochladen') + if 'file' not in request.files: + flash('Keine Datei ausgewählt.') return redirect(url_for('auth.upload')) - file_path = os.path.join(UPLOAD_FOLDER, filename) - file.save(file_path) - flash('Datei erfolgreich hochgeladen und Job gestartet') + file = request.files['file'] + if not file or file.filename == '': + flash('Keine gültige Datei.') + return redirect(url_for('auth.upload')) - # Neuen Job erstellen - new_job = Job(user_id=current_user.id, filename=filename, status="Pending") + filename = secure_filename(file.filename) + name, ext = os.path.splitext(filename) + + timestamp = time.strftime("%Y%m%d_%H%M%S") + unique_filename = f"{name}_{timestamp}{ext}" if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)) else filename + + filepath = os.path.join(UPLOAD_FOLDER, unique_filename) + file.save(filepath) + print(f"💾 UPLOAD: {filepath}") + + new_job = Job( + user_id=current_user.id, + filename=unique_filename, + status="Pending" + ) db.session.add(new_job) db.session.commit() + print(f"🆕 JOB #{new_job.id} für User {current_user.id}") - # Debugging-Ausgabe zur Überprüfung der Thread-Erstellung - print(f"Starte Scraping-Thread für Job-ID: {new_job.id}") - - # Starten des Scraping im Hintergrund-Thread und Übergeben des aktuellen Anwendungskontexts - thread = threading.Thread(target=process_file, args=(filename, new_job.id, current_app._get_current_object())) + thread = threading.Thread( + target=process_file, + args=(unique_filename, new_job.id, current_app._get_current_object()) + ) + thread.daemon = True thread.start() - - # Debugging-Ausgabe, nachdem der Thread gestartet wurde - print(f"Thread für Job {new_job.id} erfolgreich gestartet.") + print(f"🔄 THREAD STARTED Job {new_job.id}") + flash(f'"{unique_filename}" → Job #{new_job.id} läuft!') return redirect(url_for('auth.job_status')) return render_template('upload.html') -@bp.route('/download/', methods=['GET']) +@bp.route('/download/') @login_required def download_result(job_id): - job = Job.query.get_or_404(job_id) - print(f"Job ID: {job.id} - User ID: {job.user_id} - Current User ID: {current_user.id}") + job = Job.query.filter_by(id=job_id, user_id=current_user.id).first_or_404() - # Überprüfen, ob der Job dem aktuellen Benutzer gehört - if job.user_id != current_user.id: - flash("Sie haben keine Berechtigung, dieses Ergebnis herunterzuladen.") + if not job.result_filename or not job.status.startswith('✅'): + flash('Ergebnis nicht bereit.') return redirect(url_for('auth.job_status')) - # Überprüfen, ob das Ergebnis vorhanden ist - if not job.result_filename: - flash("Das Ergebnis ist noch nicht verfügbar.") - return redirect(url_for('auth.job_status')) - - # Überprüfen, ob die Datei im angegebenen Pfad existiert - result_path = os.path.join(current_app.config['RESULT_FOLDER'], job.result_filename) - print(f"Versuche, Datei herunterzuladen von: {result_path}") - + result_path = os.path.join(RESULT_FOLDER, job.result_filename) if os.path.exists(result_path): - print("Datei existiert und wird zum Download bereitgestellt.") return send_file(result_path, as_attachment=True) - else: - print("Datei nicht gefunden. Ergebnisverzeichnis oder Pfad prüfen.") - flash("Ergebnisdatei nicht gefunden.") + flash('Datei fehlt.') + return redirect(url_for('auth.job_status')) + +@bp.route('/download_raw/') +@login_required +def download_result_raw(job_id): + job = Job.query.filter_by(id=job_id, user_id=current_user.id).first_or_404() + + if not job.result_filename_raw: + flash('Rohdaten nicht verfügbar.') return redirect(url_for('auth.job_status')) + result_path = os.path.join(RESULT_FOLDER, job.result_filename_raw) + if os.path.exists(result_path): + return send_file(result_path, as_attachment=True) + flash('Datei fehlt.') + return redirect(url_for('auth.job_status')) @bp.route('/delete_job/', methods=['POST']) @login_required def delete_job(job_id): - job = Job.query.get_or_404(job_id) - if job.user_id != current_user.id: - flash("Sie haben keine Berechtigung, diesen Job zu löschen.") - return redirect(url_for('auth.job_status')) + job = Job.query.filter_by(id=job_id, user_id=current_user.id).first_or_404() - # Löschen der Upload-Datei upload_path = os.path.join(UPLOAD_FOLDER, job.filename) if os.path.exists(upload_path): os.remove(upload_path) - print(f"Upload-Datei gelöscht: {upload_path}") - else: - print(f"Upload-Datei nicht gefunden: {upload_path}") - # Löschen der Results-Datei, falls vorhanden if job.result_filename: result_path = os.path.join(RESULT_FOLDER, job.result_filename) if os.path.exists(result_path): - try: - os.remove(result_path) - print(f"Ergebnisdatei gelöscht: {result_path}") - except Exception as e: - print(f"Fehler beim Löschen der Ergebnisdatei: {e}") - else: - print(f"Ergebnisdatei nicht gefunden: {result_path}") + os.remove(result_path) + + if job.result_filename_raw: # ✅ Raw auch löschen + raw_path = os.path.join(RESULT_FOLDER, job.result_filename_raw) + if os.path.exists(raw_path): + os.remove(raw_path) - # Job aus der Datenbank löschen db.session.delete(job) db.session.commit() - flash("Job erfolgreich gelöscht.") + flash('Job gelöscht.') return redirect(url_for('auth.job_status')) + +@bp.route('/job_status/') +@login_required +def job_status_api(job_id): + job = Job.query.filter_by(id=job_id, user_id=current_user.id).first() + if not job: + return jsonify({'error': 'Not found'}), 404 + return jsonify({ + 'id': job.id, + 'status': job.status, + 'result_filename': job.result_filename, + 'result_filename_raw': getattr(job, 'result_filename_raw', None), + 'scraper_job_id': getattr(job, 'scraper_job_id', None) + }) + +@bp.route('/resume_job/', methods=['POST']) +@login_required +def resume_job(job_id): + job = Job.query.filter_by(id=job_id, user_id=current_user.id).first_or_404() + + thread = threading.Thread( + target=process_file, + args=(job.filename, job.id, current_app._get_current_object()) + ) + thread.daemon = True + thread.start() + flash(f'Job #{job_id} wird fortgesetzt...') + return redirect(url_for('auth.job_status')) + + +# ── ADMIN ────────────────────────────────────────── +@bp.route('/admin', methods=['GET']) +@login_required +def admin_panel(): + if not current_user.is_admin: + flash("Keine Berechtigung.") + return redirect(url_for('auth.job_status')) + users = User.query.all() + cfg = AppConfig.query.filter_by(key='allow_signup').first() + signup_allowed = cfg and cfg.value == 'true' + return render_template('admin_panel.html', users=users, signup_allowed=signup_allowed) + +@bp.route('/admin/create_user', methods=['POST']) +@login_required +def create_user(): + if not current_user.is_admin: + return redirect(url_for('auth.admin_panel')) + username = request.form['username'] + password = generate_password_hash(request.form['password']) # ✅ Fix + is_admin = 'is_admin' in request.form + new_user = User(username=username, password=password, is_admin=is_admin) + db.session.add(new_user) + db.session.commit() + flash(f'{username} erstellt.') + return redirect(url_for('auth.admin_panel')) + +@bp.route('/admin/reset_password/', methods=['POST']) +@login_required +def reset_password(user_id): + if not current_user.is_admin: + return redirect(url_for('auth.admin_panel')) + user = User.query.get_or_404(user_id) + new_password = request.form['new_password'] + user.password = generate_password_hash(new_password) # ✅ Fix + db.session.commit() + flash(f'Passwort {user.username} zurückgesetzt.') + return redirect(url_for('auth.admin_panel')) + +@bp.route('/admin/delete_user/', methods=['POST']) +@login_required +def delete_user(user_id): + if not current_user.is_admin: + return redirect(url_for('auth.admin_panel')) + user = User.query.get_or_404(user_id) + if user.is_admin: + flash('Admin nicht löschbar.') + return redirect(url_for('auth.admin_panel')) + db.session.delete(user) + db.session.commit() + flash(f'{user.username} gelöscht.') + return redirect(url_for('auth.admin_panel')) + +@bp.route('/admin/toggle_signup', methods=['POST']) +@login_required +def toggle_signup(): + if not current_user.is_admin: + return redirect(url_for('auth.admin_panel')) + + cfg = AppConfig.query.filter_by(key='allow_signup').first() + if not cfg: + cfg = AppConfig(key='allow_signup', value='true') + db.session.add(cfg) + else: + cfg.value = 'false' if cfg.value == 'true' else 'true' + + db.session.commit() + state = '✅ aktiviert' if cfg.value == 'true' else '🔒 deaktiviert' + flash(f'Registrierung {state}.') + return redirect(url_for('auth.admin_panel')) diff --git a/app/static/Adressliste_vorlage.csv b/app/static/Adressliste_vorlage.csv new file mode 100755 index 0000000..252bc02 --- /dev/null +++ b/app/static/Adressliste_vorlage.csv @@ -0,0 +1,6 @@ +PLZ;Stadt;Strae;Hausnummer;Zusatz +53175;Bonn;Godesberger Str.;27; +50667;Kln;Hohenzollernring;52;A +10115;Berlin;Chausseestraße;125; +80331;Mnchen;Marienplatz;1;B +20095;Hamburg;Ostwall;5; diff --git a/app/static/styles.css b/app/static/styles.css index 20a6174..09f3efc 100644 --- a/app/static/styles.css +++ b/app/static/styles.css @@ -164,3 +164,186 @@ tr:nth-child(even) td { .delete-btn:hover { background-color: #e60000; } + +/* Flash-Badge Styling */ +.flash-badge { + position: fixed; + top: 20px; + right: 20px; + background-color: #f44336; /* Material Design Rot */ + color: white; + padding: 12px 24px; + border-radius: 8px; + font-family: 'Roboto', sans-serif; + font-weight: 500; + box-shadow: 0px 4px 8px rgba(0, 0, 0, 0.2); + z-index: 1000; + opacity: 0; + transform: translateY(-20px); + transition: opacity 0.4s ease, transform 0.4s ease; +} + +/* Einblend-Animation */ +.flash-badge.show { + opacity: 1; + transform: translateY(0); +} + +/* Ausblend-Animation */ +.flash-badge.hide { + opacity: 0; + transform: translateY(-20px); +} + +.admin-panel { + max-width: 800px; + margin: 2em auto; + padding: 2em; + background: white; + border-radius: 8px; + box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1); +} + +.admin-panel h2 { + font-weight: 500; + color: #1d1d1f; + margin-bottom: 1em; +} + +.user-table { + width: 100%; + border-collapse: collapse; + margin-bottom: 2em; +} + +.user-table th, .user-table td { + padding: 0.75em; + text-align: left; + border: 1px solid #d1d1d6; +} + +.user-table th { + background-color: #f1f1f1; + color: #333; +} + +.user-table td { + background-color: white; +} + +.user-table tr:nth-child(even) td { + background-color: #f9f9f9; +} + +.reset-btn, .delete-btn, .create-btn { + padding: 0.5em 1em; + font-size: 0.9em; + font-weight: 500; + border: none; + border-radius: 4px; + cursor: pointer; + transition: background-color 0.2s ease-in-out; +} + +.reset-btn { + background-color: #4caf50; + color: white; +} + +.reset-btn:hover { + background-color: #388e3c; +} + +.delete-btn { + background-color: #f44336; + color: white; +} + +.delete-btn:hover { + background-color: #d32f2f; +} + +.create-btn { + background-color: #007aff; + color: white; + padding: 0.75em; + margin-top: 1em; + display: block; + width: 100%; + font-size: 1em; +} + +.create-btn:hover { + background-color: #005bb5; +} + +.create-user-form { + margin-top: 1.5em; +} + +.create-user-form input[type="text"], +.create-user-form input[type="password"] { + width: 100%; + padding: 0.75em; + margin-bottom: 1em; + border: 1px solid #d1d1d6; + border-radius: 8px; +} + +.create-user-form label { + font-size: 0.9em; + color: #6e6e73; + display: block; + margin-bottom: 1em; +} + +.logo-container { + text-align: center; + margin-bottom: 1em; +} + +.logo-container img { + max-width: 100%; + height: auto; + width: 150px; /* Standardbreite für das Logo */ + transition: width 0.3s ease; +} + +@media (max-width: 768px) { + .logo-container img { + width: 120px; /* Kleinere Breite auf kleineren Bildschirmen */ + } +} + +@media (max-width: 480px) { + .logo-container img { + width: 100px; /* Noch kleinere Breite auf sehr kleinen Bildschirmen */ + } +} + +.template-info { + background-color: #f9f9f9; + padding: 1em; + border-radius: 8px; + margin-bottom: 1.5em; + text-align: center; +} + +.template-info p { + margin: 0.5em 0; +} + +.template-download { + display: inline-block; + padding: 0.5em 1em; + margin-top: 0.5em; + background-color: #007aff; + color: white; + border-radius: 4px; + text-decoration: none; + transition: background-color 0.2s ease; +} + +.template-download:hover { + background-color: #005bb5; +} diff --git a/app/templates/admin_panel.html b/app/templates/admin_panel.html new file mode 100644 index 0000000..4fa3528 --- /dev/null +++ b/app/templates/admin_panel.html @@ -0,0 +1,78 @@ +{% extends "base.html" %} + +{% block content %} +
+

Benutzerverwaltung

+ + + + + + + + + + + + + {% for user in users %} + + + + + + + {% endfor %} + +
IDBenutzernameAdminAktionen
{{ user.id }}{{ user.username }}{{ 'Ja' if user.is_admin else 'Nein' }} +
+ + +
+ {% if not user.is_admin %} +
+ +
+ {% endif %} +
+ + +

Neuen Benutzer erstellen

+
+ + + + +
+
+ +
+

⚙️ Einstellungen

+
+
+ Benutzer-Registrierung: + {% if signup_allowed %} + ✅ Aktiv + + {% else %} + 🔒 Deaktiviert + + {% endif %} +
+
+
+ + +{% endblock %} diff --git a/app/templates/base.html b/app/templates/base.html index 1f06c06..f1f1aa7 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -14,14 +14,50 @@ {% endif %} - + + + {% with messages = get_flashed_messages() %} + {% if messages %} +
+ {% for message in messages %} +
{{ message }}
+ {% endfor %} +
+ {% endif %} + {% endwith %} +
{% block content %}{% endblock %}
+ + + diff --git a/app/templates/jobs.bck b/app/templates/jobs.bck new file mode 100644 index 0000000..2615e7c --- /dev/null +++ b/app/templates/jobs.bck @@ -0,0 +1,121 @@ +{% extends "base.html" %} + +{% block content %} +
+

Ihre Aufträge

+ + + + + + + + + + + + {% for job in jobs %} + + + + + + + + {% endfor %} + +
DateinameStatusErstellt amErgebnisAktionen
{{ job.filename }}{{ job.status }}{{ job.created_at.strftime('%Y-%m-%d %H:%M:%S') }} + {% if job.result_filename and 'Failed' not in job.status %} + + 🎯 Gefiltert + + {% if job.result_filename_raw %} +   + + 📋 Alle + + {% endif %} + {% elif 'Failed' in job.status %} + ❌ {{ job.result_filename or 'Fehler' }} + {% else %} + ⏳ Noch nicht verfügbar + {% endif %} + +
+ +
+
+
+ + + + +{% endblock %} diff --git a/app/templates/jobs.html b/app/templates/jobs.html index 34c8bd0..aa15c97 100644 --- a/app/templates/jobs.html +++ b/app/templates/jobs.html @@ -15,20 +15,38 @@ {% for job in jobs %} - + {{ job.filename }} - {{ job.status }} + + {{ job.status }} + {{ job.created_at.strftime('%Y-%m-%d %H:%M:%S') }} - - {% if job.status == "Completed" %} - Download + + {% if job.result_filename and 'Failed' not in job.status %} + + 🎯 Gefiltert + + {% if job.result_filename_raw %} +   + + 📋 Alle + + {% endif %} + {% elif 'Failed' in job.status %} + ❌ {{ job.result_filename or 'Fehler' }} {% else %} - Noch nicht verfügbar + ⏳ Noch nicht verfügbar {% endif %} + {% if 'Failed' in job.status %} + +
+ +
+ {% endif %}
- +
@@ -37,25 +55,101 @@ + + {% endblock %} diff --git a/app/templates/jobs.orig b/app/templates/jobs.orig new file mode 100644 index 0000000..34c8bd0 --- /dev/null +++ b/app/templates/jobs.orig @@ -0,0 +1,61 @@ +{% extends "base.html" %} + +{% block content %} +
+

Ihre Aufträge

+ + + + + + + + + + + + {% for job in jobs %} + + + + + + + + {% endfor %} + +
DateinameStatusErstellt amErgebnisAktionen
{{ job.filename }}{{ job.status }}{{ job.created_at.strftime('%Y-%m-%d %H:%M:%S') }} + {% if job.status == "Completed" %} + Download + {% else %} + Noch nicht verfügbar + {% endif %} + +
+ +
+
+
+ + +{% endblock %} diff --git a/app/templates/login.html b/app/templates/login.html index 905a1f2..a75d980 100644 --- a/app/templates/login.html +++ b/app/templates/login.html @@ -2,6 +2,9 @@ {% block content %}

Anmelden

+
+ Logo +
diff --git a/app/templates/upload.html b/app/templates/upload.html index 279c7af..fb267cf 100644 --- a/app/templates/upload.html +++ b/app/templates/upload.html @@ -1,11 +1,22 @@ {% extends "base.html" %} + {% block content %} -
-

Datei hochladen

- - - - - -
+
+

Datei hochladen

+ + +
+

Bitte verwenden Sie die folgende Vorlage für Ihre Adressliste:

+ Adressliste Vorlage herunterladen +

Die Vorlage enthält die folgenden Spalten: PLZ, Stadt, Straße, Hausnummer, Zusatz. Bitte verwenden Sie das Format für eine reibungslose Verarbeitung.

+
+ +
+ + + +
+
+ + {% endblock %} diff --git a/app/webcrawler.bck02032026 b/app/webcrawler.bck02032026 new file mode 100644 index 0000000..32be52b --- /dev/null +++ b/app/webcrawler.bck02032026 @@ -0,0 +1,316 @@ +import os +import re +import pandas as pd +import requests +import time +import random +from io import StringIO +from app.models import db, Job + +print("🆕 MODERN webcrawler LOADED!") + +UPLOAD_FOLDER = '/app/uploads' +RESULT_FOLDER = '/app/results' +SCRAPER_URL = "http://gmaps-scraper:8080" + +OUTPUT_COLS = ['title', 'category', 'address', 'open_hours', 'website', 'phone', 'link'] + + +# ────────────────────────────────────────────── +# Hilfsfunktionen +# ────────────────────────────────────────────── + +def get_batch_size(total_rows): + if total_rows < 50: return 10 + elif total_rows < 200: return 10 + elif total_rows < 500: return 5 + else: return 5 + +def get_delay(total_rows): + if total_rows < 50: return (5, 10) + elif total_rows < 200: return (10, 20) + else: return (20, 40) + +def is_blocked(data): + text = str(data).lower() + blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429']) + if blocked: + print(f"🚫 BLOCKED: {str(data)[:100]}") + return blocked + +def fix_encoding(text): + """Kaputte ISO→UTF8 Zeichen reparieren (z.B. Industriestraße → Industriestraße)""" + if not isinstance(text, str): + return text + try: + return text.encode('latin-1').decode('utf-8') + except (UnicodeEncodeError, UnicodeDecodeError): + return text + +def build_input_addresses(df): + """Normalisierte Adressen aus Input-CSV für Abgleich""" + addresses = set() + for _, row in df.iterrows(): + plz = str(row.get('PLZ', '')).strip() + stadt = str(row.get('Stadt', '')).strip() + str_ = str(row.get('Straße', '')).strip() + nr = str(row.get('Hausnummer', '')).strip() + zusatz = str(row.get('Zusatz', '')).strip() + + full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip() + full = ' '.join(full.split()) + addresses.add(full) + return addresses + +def normalize_address(addr): + """Output-Adresse normalisieren für Abgleich""" + if not isinstance(addr, str): + return '' + addr = fix_encoding(addr) + return ' '.join(addr.lower().strip().split()) + +def address_in_input(result_addr, input_addresses): + """Prüft ob PLZ + Straßenname aus Result im Input vorkommen""" + norm = normalize_address(result_addr) + for inp_addr in input_addresses: + plz_match = re.search(r'\b\d{5}\b', inp_addr) + if plz_match: + plz = plz_match.group() + if plz in norm: + street = inp_addr.split()[0] if inp_addr else '' + if len(street) > 3 and street[:4].lower() in norm: + return True + return False + + +# ────────────────────────────────────────────── +# CSV Nachbearbeitung (apply_filter umschaltbar) +# ────────────────────────────────────────────── + +def process_result_csv(raw_bytes, input_df, apply_filter=True): + """ + Raw CSV → bereinigt: + - Nur OUTPUT_COLS + - Encoding fix + - Optional: Input/Output Abgleich + Duplikate + """ + try: + content = raw_bytes.decode('utf-8', errors='replace') + df_out = pd.read_csv(StringIO(content)) + print(f"📄 Raw result: {df_out.shape} | Columns: {list(df_out.columns)[:8]}") + + # Spalten filtern + available = [c for c in OUTPUT_COLS if c in df_out.columns] + missing = [c for c in OUTPUT_COLS if c not in df_out.columns] + if missing: + print(f"⚠️ Fehlende Spalten: {missing}") + df_out = df_out[available] + + # 🔤 Encoding fix + for col in df_out.columns: + df_out[col] = df_out[col].apply(fix_encoding) + print(f"🔤 Encoding fix: done") + + if apply_filter: + # 📍 Input/Output Abgleich + input_addresses = build_input_addresses(input_df) + before = len(df_out) + df_out = df_out[ + df_out['address'].apply( + lambda a: address_in_input(a, input_addresses) + ) + ] + print(f"📍 Adress-Filter: {before} → {len(df_out)} Zeilen") + + # 🔁 Duplikate entfernen (immer, auch bei Raw) + before_dedup = len(df_out) + df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first') + print(f"🔁 Duplikate: {before_dedup} → {len(df_out)} Zeilen") + + # Leere Titel entfernen + df_out = df_out.dropna(subset=['title'], how='all') + df_out = df_out[df_out['title'].str.strip().astype(bool)] + + print(f"✅ Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}") + return df_out + + except Exception as e: + print(f"💥 process_result_csv: {e}") + import traceback + traceback.print_exc() + return None + + +# ────────────────────────────────────────────── +# Haupt-Worker +# ────────────────────────────────────────────── + +def process_file(filename, job_id, app): + print(f"🎯 {filename} Job#{job_id} START!") + + with app.app_context(): + job = Job.query.get(job_id) + if not job: + print("❌ Job missing") + return + + try: + # 1️⃣ CSV Parse + job.status = "📊 parsing CSV" + db.session.commit() + + filepath = os.path.join(UPLOAD_FOLDER, filename) + print(f"📁 {filepath} | {os.path.getsize(filepath)}b") + + df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1') + print(f"📊 {df_input.shape} | Columns: {list(df_input.columns)}") + + queries = [] + for _, row in df_input.iterrows(): + parts = [ + str(row.get('PLZ', '')).strip(), + str(row.get('Stadt', '')).strip(), + str(row.get('Straße', '')).strip(), + str(row.get('Hausnummer', '')).strip(), + str(row.get('Zusatz', '')).strip(), + ] + q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip() + if len(q) > 10: + queries.append(q) + + total = len(queries) + print(f"🔍 {total} Queries | Samples: {queries[:3]}") + if not queries: + raise ValueError("Keine gültigen Adressen in CSV") + + # 2️⃣ Batch + Delay + batch_size = get_batch_size(total) + delay_min, delay_max = get_delay(total) + batch = queries[:batch_size] + pre_delay = random.uniform(delay_min, delay_max) + print(f"📦 Batch {len(batch)}/{total} | 😴 {pre_delay:.1f}s Delay") + time.sleep(pre_delay) + + # 3️⃣ API Call + job.status = "📤 sending to scraper" + db.session.commit() + + payload = { + "name": f"{filename.replace('.csv','')}-{job_id}", + "keywords": batch, + "lang": "de", + "depth": 1, + "zoom": 17, + "radius": 50, + "max_time": 60, + "fast_mode": False + } + + print(f"🌐 POST {SCRAPER_URL}/api/v1/jobs | {payload['name']}") + resp = requests.post(f"{SCRAPER_URL}/api/v1/jobs", json=payload, timeout=30) + print(f"📤 {resp.status_code}: {resp.text[:300]}") + + if is_blocked(resp.text): + raise ValueError("🚫 IP geblockt! Proxy konfigurieren.") + if resp.status_code != 201: + raise ValueError(f"API {resp.status_code}: {resp.text[:200]}") + + # 4️⃣ Polling + scraper_id = resp.json()['id'] + job.scraper_job_id = scraper_id + job.status = "⏳ scraping" + db.session.commit() + print(f"✅ Scraper Job: {scraper_id}") + + for i in range(1, 61): # Max 10min + try: + r = requests.get( + f"{SCRAPER_URL}/api/v1/jobs/{scraper_id}", + timeout=10 + ) + data = r.json() + status = data.get('Status', data.get('status', '?')) + print(f"⏳ {i}/60: {status}") + + if is_blocked(data): + raise ValueError("🚫 IP geblockt während scraping!") + + if status in ('ok', 'completed', 'scraped'): + dl = requests.get( + f"{SCRAPER_URL}/api/v1/jobs/{scraper_id}/download", + timeout=60 + ) + if dl.status_code != 200: + raise ValueError(f"Download {dl.status_code}") + if is_blocked(dl.text[:200]): + raise ValueError("🚫 IP geblockt beim Download!") + + # 5️⃣ Nachbearbeitung → zwei Versionen + job.status = "🔧 processing result" + db.session.commit() + + base = filename.replace('.csv', '') + os.makedirs(RESULT_FOLDER, exist_ok=True) + + # ── Version A: Gefiltert (Adressabgleich + Deduplizierung) ── + df_filtered = process_result_csv(dl.content, df_input, apply_filter=True) + outname_filtered = f"results_{base}_filtered.csv" + outpath_filtered = os.path.join(RESULT_FOLDER, outname_filtered) + + if df_filtered is not None and len(df_filtered) > 0: + df_filtered.to_csv( + outpath_filtered, index=False, + encoding='utf-8-sig', sep=';' + ) + print(f"🎯 Filtered: {outname_filtered} → {len(df_filtered)} Firmen") + else: + print("⚠️ Keine Treffer nach Filter – leere Datei wird erstellt") + pd.DataFrame(columns=OUTPUT_COLS).to_csv( + outpath_filtered, index=False, + encoding='utf-8-sig', sep=';' + ) + + # ── Version B: Alle (nur Spalten + Encoding, kein Filter) ── + df_raw = process_result_csv(dl.content, df_input, apply_filter=False) + outname_raw = f"results_{base}_all.csv" + outpath_raw = os.path.join(RESULT_FOLDER, outname_raw) + + if df_raw is not None: + df_raw.to_csv( + outpath_raw, index=False, + encoding='utf-8-sig', sep=';' + ) + print(f"📋 All: {outname_raw} → {len(df_raw)} Firmen") + else: + print("⚠️ df_raw None – Rohinhalt wird gespeichert") + with open(outpath_raw, 'wb') as f: + f.write(dl.content) + + # ── DB speichern ── + job.status = "✅ Fertig" + job.result_filename = outname_filtered # 🎯 Gefiltert + job.result_filename_raw = outname_raw # 📋 Alle + db.session.commit() + print(f"🎉 Beide Dateien gespeichert!") + break + + elif status in ('failed', 'cancelled', 'error'): + raise ValueError(f"Scraper: {status}") + + except requests.RequestException as e: + print(f"⚠️ Poll {i}: {e}") + + time.sleep(random.uniform(8, 15)) + + else: + raise ValueError("Timeout nach 10min") + + except Exception as e: + job.status = "Failed" + job.result_filename = str(e) + print(f"💥 ERROR: {e}") + import traceback + traceback.print_exc() + + db.session.commit() + print(f"✅ DONE! Status: {job.status}\n") diff --git a/app/webcrawler.bck04032026 b/app/webcrawler.bck04032026 new file mode 100644 index 0000000..8860ad6 --- /dev/null +++ b/app/webcrawler.bck04032026 @@ -0,0 +1,275 @@ +import os +import re +import pandas as pd +import requests +import time +import random +from io import StringIO +from app.models import db, Job + +print("🆕 MODERN webcrawler LOADED! – BATCHED + PROXY") + +UPLOAD_FOLDER = '/app/uploads' +RESULT_FOLDER = '/app/results' +SCRAPER_URL = "http://gmaps-scraper:8080" + +OUTPUT_COLS = ['title', 'category', 'address', 'open_hours', 'website', 'phone', 'link'] + +PROXY_URL = "http://bitlleuv-rotate:s5hzse6hz74b@p.webshare.io:80" +API_PROXIES = {"http": PROXY_URL, "https": PROXY_URL} + +# ────────────────────────────────────────────── +# Hilfsfunktionen +# ────────────────────────────────────────────── + +def is_blocked(data): + text = str(data).lower() + blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429']) + if blocked: + print(f"🚫 BLOCKED: {str(data)[:100]}") + return blocked + +def fix_encoding(text): + if not isinstance(text, str): + return text + try: + return text.encode('latin-1').decode('utf-8') + except (UnicodeEncodeError, UnicodeDecodeError): + return text + +def build_input_addresses(df): + addresses = set() + for _, row in df.iterrows(): + plz = str(row.get('PLZ', '')).strip() + stadt = str(row.get('Stadt', '')).strip() + str_ = str(row.get('Straße', '')).strip() + nr = str(row.get('Hausnummer', '')).strip() + zusatz = str(row.get('Zusatz', '')).strip() + full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip() + full = ' '.join(full.split()) + addresses.add(full) + return addresses + +def normalize_address(addr): + if not isinstance(addr, str): + return '' + addr = fix_encoding(addr) + return ' '.join(addr.lower().strip().split()) + +def address_in_input(result_addr, input_addresses): + norm = normalize_address(result_addr) + for inp_addr in input_addresses: + plz_match = re.search(r'\b\d{5}\b', inp_addr) + if plz_match: + plz = plz_match.group() + if plz in norm: + street = inp_addr.split()[0] if inp_addr else '' + if len(street) > 3 and street[:4].lower() in norm: + return True + return False + +# ────────────────────────────────────────────── +# CSV Nachbearbeitung +# ────────────────────────────────────────────── + +def process_result_csv(raw_bytes, input_df, apply_filter=True): + try: + content = raw_bytes.decode('utf-8', errors='replace') + df_out = pd.read_csv(StringIO(content)) + print(f"📄 Raw result: {df_out.shape}") + + available = [c for c in OUTPUT_COLS if c in df_out.columns] + df_out = df_out[available] + + for col in df_out.columns: + df_out[col] = df_out[col].apply(fix_encoding) + + if apply_filter: + input_addresses = build_input_addresses(input_df) + before = len(df_out) + df_out = df_out[ + df_out['address'].apply( + lambda a: address_in_input(a, input_addresses) + ) + ] + print(f"📍 Filter: {before} → {len(df_out)}") + + df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first') + df_out = df_out.dropna(subset=['title'], how='all') + df_out = df_out[df_out['title'].str.strip().astype(bool)] + + print(f"✅ Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}") + return df_out + except Exception as e: + print(f"💥 process_result_csv: {e}") + return None + +# ────────────────────────────────────────────── +# HAUPT-WORKER +# ────────────────────────────────────────────── + +def process_file(filename, job_id, app): + print(f"🎯 {filename} Job#{job_id} START!") + + with app.app_context(): + job = Job.query.get(job_id) + if not job: + print("❌ Job missing") + return + + try: + #Parse + ALLE Queries + job.status = "📊 parsing CSV" + db.session.commit() + + filepath = os.path.join(UPLOAD_FOLDER, filename) + print(f"📁 {filepath} | {os.path.getsize(filepath)}b") + + df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1') + print(f"📊 {df_input.shape}") + + queries = [] + for _, row in df_input.iterrows(): + parts = [ + str(row.get('PLZ', '')).strip(), + str(row.get('Stadt', '')).strip(), + str(row.get('Straße', '')).strip(), + str(row.get('Hausnummer', '')).strip(), + str(row.get('Zusatz', '')).strip(), + ] + q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip() + if len(q) > 10: + queries.append(q) + + total_queries = len(queries) + print(f"🔍 {total_queries} Queries | Samples: {queries[:3]}") + if total_queries == 0: + raise ValueError("Keine gültigen Adressen") + + #BATCHED Processing + BATCH_SIZE = 10 # Erhöht: 5 → 10 (paid proxy) + BATCH_DELAY_MIN, BATCH_DELAY_MAX = 10, 20 # Reduziert: 30-60s → 10-20s (paid proxy) + batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE + print(f"📦 {batches} Batches à {BATCH_SIZE} | ETA: ~{batches*15//60:.0f}h") + + all_results_filtered = [] + all_results_raw = [] + job.status = f"🔄 Batch 1/{batches}" + db.session.commit() + + for batch_idx in range(batches): + batch_start = batch_idx * BATCH_SIZE + batch_end = min(batch_start + BATCH_SIZE, total_queries) + batch_queries = queries[batch_start:batch_end] + print(f"\n🔄 BATCH {batch_idx+1}/{batches} ({batch_start+1}-{batch_end}/{total_queries})") + + #Random Delay + delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX) + print(f"😴 Delay: {delay:.0f}s | Proxy: {PROXY_URL}") + time.sleep(delay) + + #API Call + payload = { + "name": f"{filename.replace('.csv','')}-{job_id}-B{batch_idx+1:03d}", + "keywords": batch_queries, + "lang": "de", + "depth": 1, + "zoom": 17, + "radius": 50, + "max_time": 60, # Reduziert: 120 → 60 (paid proxy schneller) + "fast_mode": False, + "proxies": [PROXY_URL] + } + + try: + resp = requests.post( + f"{SCRAPER_URL}/api/v1/jobs", + json=payload, + timeout=45 + ) + print(f"📤 {resp.status_code}") + if is_blocked(resp.text): + print("🚫 Batch übersprungen (blocked)") + continue + if resp.status_code != 201: + print(f"⚠️ Batch {batch_idx+1} fehlgeschlagen: {resp.text[:100]}") + continue + + scraper_id = resp.json()['id'] + print(f"✅ Scraper: {scraper_id}") + + for poll_i in range(1, 61): # Reduziert: 121 → 61 (max_time 60s) + r = requests.get( + f"{SCRAPER_URL}/api/v1/jobs/{scraper_id}", + timeout=15 + ) + data = r.json() + status = data.get('Status', data.get('status', '?')) + + if status in ('ok', 'completed', 'scraped'): + dl = requests.get( + f"{SCRAPER_URL}/api/v1/jobs/{scraper_id}/download", + timeout=90 + ) + if dl.status_code == 200: + df_filtered = process_result_csv(dl.content, df_input, True) + df_raw = process_result_csv(dl.content, df_input, False) + if df_filtered is not None: + all_results_filtered.append(df_filtered) + all_results_raw.append(df_raw) + print(f"📊 Batch {batch_idx+1}: {len(df_filtered)} filtered") + break + elif status in ('failed', 'error'): + print(f"💥 Batch {batch_idx+1}: {status}") + break + + time.sleep(random.uniform(5, 10)) # Reduziert: 10-20s → 5-10s (paid proxy) + + except Exception as e: + print(f"💥 Batch {batch_idx+1}: {e}") + + job.status = f"🔄 Batch {batch_idx+2}/{batches}" + db.session.commit() + + #MERGE & SAVE + job.status = "🔧 merging results" + db.session.commit() + + base = filename.replace('.csv', '') + os.makedirs(RESULT_FOLDER, exist_ok=True) + + if all_results_filtered: + df_final_filtered = pd.concat(all_results_filtered, ignore_index=True) + df_final_filtered = df_final_filtered.drop_duplicates(subset=['title', 'address']) + + out_filtered = f"results_{base}_filtered.csv" + df_final_filtered.to_csv( + os.path.join(RESULT_FOLDER, out_filtered), + index=False, encoding='utf-8-sig', sep=';' + ) + + if all_results_raw: + df_final_raw = pd.concat(all_results_raw, ignore_index=True) + out_raw = f"results_{base}_all.csv" + df_final_raw.to_csv( + os.path.join(RESULT_FOLDER, out_raw), + index=False, encoding='utf-8-sig', sep=';' + ) + + job.result_filename = out_filtered + job.result_filename_raw = out_raw + job.status = f"✅ Fertig: {len(df_final_filtered)} Firmen" + else: + job.status = "❌ Keine Ergebnisse" + + db.session.commit() + print(f"🎉 Job {job_id} komplett!") + + except Exception as e: + job.status = f"Failed: {str(e)[:50]}" + print(f"💥 FATAL: {e}") + import traceback + traceback.print_exc() + db.session.commit() + + print(f"✅ DONE! Status: {job.status}") diff --git a/app/webcrawler.bck04032026_2 b/app/webcrawler.bck04032026_2 new file mode 100644 index 0000000..4a30b38 --- /dev/null +++ b/app/webcrawler.bck04032026_2 @@ -0,0 +1,429 @@ +import os +import re +import unicodedata +import json +import pandas as pd +import requests +import time +import random +from io import StringIO +from app.models import db, Job + +print("🆕 MODERN webcrawler LOADED! – BATCHED + PROXY + RESUME + ETA + 2x SCRAPER") + +UPLOAD_FOLDER = '/app/uploads' +RESULT_FOLDER = '/app/results' + +# 2x Scraper – abwechselnd genutzt +SCRAPER_URLS = [ + "http://gmaps-scraper-1:8080", + "http://gmaps-scraper-2:8080", +] + +OUTPUT_COLS = ['title', 'category', 'address', 'open_hours', 'website', 'phone', 'link'] + +PROXY_URL = "http://bitlleuv-rotate:s5hzse6hz74b@p.webshare.io:80" +API_PROXIES = {"http": PROXY_URL, "https": PROXY_URL} + +# ────────────────────────────────────────────── +# Tuning +# ────────────────────────────────────────────── +BATCH_SIZE = 30 # Keywords pro Scraper-Job +BATCH_DELAY_MIN = 3 # Sekunden Pause zwischen Batches (min) +BATCH_DELAY_MAX = 6 # Sekunden Pause zwischen Batches (max) +MAX_TIME = 60 # Sekunden die der Scraper pro Batch hat +POLL_MAX = 90 # Max. Poll-Versuche pro Batch +POLL_DELAY_MIN = 2 # Sekunden zwischen Polls (min) +POLL_DELAY_MAX = 5 # Sekunden zwischen Polls (max) +STUCK_THRESHOLD = 8 # Polls auf 'pending' bis Auto-Restart +MAX_RETRIES = 2 # Wiederholversuche pro Batch bei Fehler + +# ────────────────────────────────────────────── +# Hilfsfunktionen +# ────────────────────────────────────────────── + +def is_blocked(data): + text = str(data).lower() + blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429']) + if blocked: + print(f"🚫 BLOCKED: {str(data)[:100]}") + return blocked + +def fix_encoding(text): + if not isinstance(text, str): + return text + try: + return text.encode('latin-1').decode('utf-8') + except (UnicodeEncodeError, UnicodeDecodeError): + return text + +# Fix 1: Sonderzeichen in Queries bereinigen +def clean_query(q): + """Steuerzeichen + fehlerhafte Bytes entfernen für saubere Google Maps URLs""" + q = ''.join(c for c in q if unicodedata.category(c) != 'Cc') + q = ' '.join(q.split()) + return q.strip() + +def build_input_addresses(df): + addresses = set() + for _, row in df.iterrows(): + plz = str(row.get('PLZ', '')).strip() + stadt = str(row.get('Stadt', '')).strip() + str_ = str(row.get('Straße', '')).strip() + nr = str(row.get('Hausnummer', '')).strip() + zusatz = str(row.get('Zusatz', '')).strip() + full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip() + full = ' '.join(full.split()) + addresses.add(full) + return addresses + +def normalize_address(addr): + if not isinstance(addr, str): + return '' + addr = fix_encoding(addr) + return ' '.join(addr.lower().strip().split()) + +def address_in_input(result_addr, input_addresses): + norm = normalize_address(result_addr) + for inp_addr in input_addresses: + plz_match = re.search(r'\b\d{5}\b', inp_addr) + if plz_match: + plz = plz_match.group() + if plz in norm: + street = inp_addr.split()[0] if inp_addr else '' + if len(street) > 3 and street[:4].lower() in norm: + return True + return False + +def format_eta(seconds): + """Sekunden → lesbares ETA-Format""" + if seconds < 60: + return f"{int(seconds)}s" + h, rem = divmod(int(seconds), 3600) + m = rem // 60 + return f"{h}h {m:02d}min" if h > 0 else f"{m}min" + +# ────────────────────────────────────────────── +# Fix 3: Scraper-Neustart bei Inactivity +# ────────────────────────────────────────────── + +def restart_scraper(scraper_url): + """Den betroffenen Scraper-Container neu starten""" + try: + import subprocess + # Container-Name aus URL ableiten: http://gmaps-scraper-1:8080 → gmaps-scraper-1 + container = scraper_url.split("//")[1].split(":")[0] + print(f"🔄 Starte {container} neu...") + subprocess.run(["docker", "restart", container], timeout=30, capture_output=True) + print(f"✅ {container} neu gestartet – warte 15s...") + time.sleep(15) + return True + except Exception as e: + print(f"⚠️ Scraper-Neustart fehlgeschlagen: {e}") + return False + +# ────────────────────────────────────────────── +# Resume: Progress-File Hilfsfunktionen +# ────────────────────────────────────────────── + +def get_progress_path(job_id): + return os.path.join(RESULT_FOLDER, f"progress_{job_id}.json") + +def get_partial_path(job_id, suffix): + return os.path.join(RESULT_FOLDER, f"partial_{job_id}_{suffix}.csv") + +def load_progress(job_id): + """Gespeicherten Fortschritt laden (falls vorhanden)""" + path = get_progress_path(job_id) + if os.path.exists(path): + with open(path, 'r') as f: + data = json.load(f) + print(f"🔁 RESUME: ab Batch {data['last_completed_batch'] + 1}/{data['total_batches']}") + return data + return None + +def save_progress(job_id, last_completed_batch, total_batches): + """Fortschritt nach jedem Batch speichern""" + path = get_progress_path(job_id) + with open(path, 'w') as f: + json.dump({"last_completed_batch": last_completed_batch, "total_batches": total_batches}, f) + +def append_partial(job_id, df_filtered, df_raw): + """Batch-Ergebnis an Partial-CSV anhängen""" + for suffix, df in [('filtered', df_filtered), ('raw', df_raw)]: + if df is None: + continue + path = get_partial_path(job_id, suffix) + header = not os.path.exists(path) + df.to_csv(path, mode='a', index=False, header=header, encoding='utf-8-sig', sep=';') + +def load_partial(job_id): + """Bestehende Partial-CSVs laden""" + results_filtered, results_raw = [], [] + for suffix, lst in [('filtered', results_filtered), ('raw', results_raw)]: + path = get_partial_path(job_id, suffix) + if os.path.exists(path): + try: + df = pd.read_csv(path, sep=';', encoding='utf-8-sig') + lst.append(df) + print(f"📂 Partial {suffix}: {len(df)} Zeilen geladen") + except Exception as e: + print(f"⚠️ Partial {suffix} Ladefehler: {e}") + return results_filtered, results_raw + +def cleanup_progress(job_id): + """Progress + Partial-Files nach Abschluss löschen""" + for path in [ + get_progress_path(job_id), + get_partial_path(job_id, 'filtered'), + get_partial_path(job_id, 'raw'), + ]: + if os.path.exists(path): + os.remove(path) + +# ────────────────────────────────────────────── +# CSV Nachbearbeitung +# ────────────────────────────────────────────── + +def process_result_csv(raw_bytes, input_df, apply_filter=True): + try: + content = raw_bytes.decode('utf-8', errors='replace') + df_out = pd.read_csv(StringIO(content)) + print(f"📄 Raw result: {df_out.shape}") + + available = [c for c in OUTPUT_COLS if c in df_out.columns] + df_out = df_out[available] + + for col in df_out.columns: + df_out[col] = df_out[col].apply(fix_encoding) + + if apply_filter: + input_addresses = build_input_addresses(input_df) + before = len(df_out) + df_out = df_out[ + df_out['address'].apply(lambda a: address_in_input(a, input_addresses)) + ] + print(f"📍 Filter: {before} → {len(df_out)}") + + df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first') + df_out = df_out.dropna(subset=['title'], how='all') + df_out = df_out[df_out['title'].str.strip().astype(bool)] + + print(f"✅ Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}") + return df_out + except Exception as e: + print(f"💥 process_result_csv: {e}") + return None + +# ────────────────────────────────────────────── +# HAUPT-WORKER +# ────────────────────────────────────────────── + +def process_file(filename, job_id, app): + print(f"🎯 {filename} Job#{job_id} START!") + + with app.app_context(): + job = Job.query.get(job_id) + if not job: + print("❌ Job missing") + return + + try: + #Parse + ALLE Queries + job.status = "📊 parsing CSV" + db.session.commit() + + filepath = os.path.join(UPLOAD_FOLDER, filename) + print(f"📁 {filepath} | {os.path.getsize(filepath)}b") + + df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1') + print(f"📊 {df_input.shape}") + + queries = [] + for _, row in df_input.iterrows(): + parts = [ + str(row.get('PLZ', '')).strip(), + str(row.get('Stadt', '')).strip(), + str(row.get('Straße', '')).strip(), + str(row.get('Hausnummer', '')).strip(), + str(row.get('Zusatz', '')).strip(), + ] + q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip() + q = clean_query(q) # Fix 1: Sonderzeichen bereinigen + if len(q) > 10: + queries.append(q) + + total_queries = len(queries) + print(f"🔍 {total_queries} Queries | Samples: {queries[:3]}") + if total_queries == 0: + raise ValueError("Keine gültigen Adressen") + + #BATCHED Processing + batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE + + # Resume: Fortschritt laden falls vorhanden + os.makedirs(RESULT_FOLDER, exist_ok=True) + progress = load_progress(job_id) + start_batch = progress['last_completed_batch'] + 1 if progress else 0 + all_results_filtered, all_results_raw = load_partial(job_id) if progress else ([], []) + + eta_initial = format_eta((batches - start_batch) * ((BATCH_DELAY_MAX + MAX_TIME) / 2)) + print(f"📦 {batches} Batches à {BATCH_SIZE} | 2x Scraper | Start: {start_batch} | ETA: ~{eta_initial}") + job_start_time = time.time() + job.status = f"🔄 Batch {start_batch+1}/{batches} | ⏱️ ~{eta_initial}" + db.session.commit() + + for batch_idx in range(start_batch, batches): + batch_start = batch_idx * BATCH_SIZE + batch_end = min(batch_start + BATCH_SIZE, total_queries) + batch_queries = queries[batch_start:batch_end] + + # 2x Scraper: abwechselnd nutzen + scraper_url = SCRAPER_URLS[batch_idx % len(SCRAPER_URLS)] + print(f"\n🔄 BATCH {batch_idx+1}/{batches} ({batch_start+1}-{batch_end}/{total_queries}) → {scraper_url}") + + #Random Delay + delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX) + print(f"😴 Delay: {delay:.0f}s") + time.sleep(delay) + + #API Call + payload = { + "name": f"{filename.replace('.csv','')}-{job_id}-B{batch_idx+1:03d}", + "keywords": batch_queries, + "lang": "de", + "depth": 1, + "zoom": 15, + "radius": 50, + "max_time": MAX_TIME, + "fast_mode": False, + "proxies": [PROXY_URL] + } + + batch_success = False + # Fix 2: Retry-Logik bei Scraper-Fehler + for attempt in range(1, MAX_RETRIES + 1): + try: + resp = requests.post(f"{scraper_url}/api/v1/jobs", json=payload, timeout=45) + print(f"📤 {resp.status_code} (Versuch {attempt} | {scraper_url})") + + if is_blocked(resp.text): + print("🚫 Batch übersprungen (blocked)") + break + if resp.status_code != 201: + print(f"⚠️ Batch {batch_idx+1} fehlgeschlagen: {resp.text[:100]}") + if attempt < MAX_RETRIES: + time.sleep(10) + continue + + scraper_id = resp.json()['id'] + print(f"✅ Scraper: {scraper_id}") + + stuck_counter = 0 + for poll_i in range(1, POLL_MAX + 1): + r = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=15) + data = r.json() + status = data.get('Status', data.get('status', '?')) + print(f"⏳ Poll {poll_i}: {status}") + + # Fix 4: Auto-Recovery bei Pending-Stuck + if status == 'pending': + stuck_counter += 1 + if stuck_counter >= STUCK_THRESHOLD: + print(f"⚠️ Job {scraper_id} hängt – abbrechen + Neustart") + requests.delete(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=10) + restart_scraper(scraper_url) # Fix 3: Nur betroffenen Scraper neu starten + break + else: + stuck_counter = 0 + + if status in ('ok', 'completed', 'scraped'): + dl = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}/download", timeout=90) + if dl.status_code == 200: + df_filtered = process_result_csv(dl.content, df_input, True) + df_raw = process_result_csv(dl.content, df_input, False) + if df_filtered is not None: + all_results_filtered.append(df_filtered) + all_results_raw.append(df_raw) + append_partial(job_id, df_filtered, df_raw) # Resume: sofort speichern + print(f"📊 Batch {batch_idx+1}: {len(df_filtered)} filtered") + batch_success = True + break + + # Fix 2: Scraper-Fehler → Retry + elif status in ('failed', 'error'): + print(f"💥 Batch {batch_idx+1}: {status} (Versuch {attempt})") + if attempt < MAX_RETRIES: + time.sleep(10) + break + + time.sleep(random.uniform(POLL_DELAY_MIN, POLL_DELAY_MAX)) + + if batch_success: + break + + except Exception as e: + print(f"💥 Batch {batch_idx+1} Versuch {attempt}: {e}") + if attempt < MAX_RETRIES: + time.sleep(10) + + # Resume: Fortschritt nach jedem Batch speichern + save_progress(job_id, batch_idx, batches) + + # ETA berechnen + elapsed = time.time() - job_start_time + done_so_far = batch_idx - start_batch + 1 + if done_so_far > 0: + avg_per_batch = elapsed / done_so_far + remaining = (batches - batch_idx - 1) * avg_per_batch + eta_str = format_eta(remaining) + else: + eta_str = "?" + + job.status = f"🔄 Batch {batch_idx+2}/{batches} | ⏱️ ~{eta_str}" + db.session.commit() + + #MERGE & SAVE + job.status = "🔧 merging results" + db.session.commit() + + base = filename.replace('.csv', '') + + if all_results_filtered: + df_final_filtered = pd.concat(all_results_filtered, ignore_index=True) + df_final_filtered = df_final_filtered.drop_duplicates(subset=['title', 'address']) + + out_filtered = f"results_{base}_filtered.csv" + df_final_filtered.to_csv( + os.path.join(RESULT_FOLDER, out_filtered), + index=False, encoding='utf-8-sig', sep=';' + ) + + if all_results_raw: + df_final_raw = pd.concat(all_results_raw, ignore_index=True) + out_raw = f"results_{base}_all.csv" + df_final_raw.to_csv( + os.path.join(RESULT_FOLDER, out_raw), + index=False, encoding='utf-8-sig', sep=';' + ) + + job.result_filename = out_filtered + job.result_filename_raw = out_raw + job.status = f"✅ Fertig: {len(df_final_filtered)} Firmen" + + # Resume: Cleanup nach Abschluss + cleanup_progress(job_id) + else: + job.status = "❌ Keine Ergebnisse" + + db.session.commit() + print(f"🎉 Job {job_id} komplett!") + + except Exception as e: + job.status = f"Failed: {str(e)[:50]}" + print(f"💥 FATAL: {e}") + import traceback + traceback.print_exc() + db.session.commit() + + print(f"✅ DONE! Status: {job.status}") diff --git a/app/webcrawler.orig b/app/webcrawler.orig new file mode 100644 index 0000000..f73f061 --- /dev/null +++ b/app/webcrawler.orig @@ -0,0 +1,138 @@ +import csv +import os +import requests +from .models import db, Job +from flask import current_app + +UPLOAD_FOLDER = 'uploads' +RESULT_FOLDER = 'results' + +API_KEY = 'AIzaSyAIf0yXJTwo87VMWLBtq2m2LqE-OaPGbzw' + +processed_companies = set() + +def get_geocode(address): + url = f"https://maps.googleapis.com/maps/api/geocode/json" + params = {'address': address, 'key': API_KEY} + + try: + response = requests.get(url, params=params, timeout=5) + if response.status_code == 200: + data = response.json() + if data['status'] == 'OK': + location = data['results'][0]['geometry']['location'] + return location['lat'], location['lng'] + except requests.RequestException as e: + print(f"Geocode API Fehler für {address}: {e}") + return None, None + +def get_nearby_places(lat, lng): + places_url = f"https://maps.googleapis.com/maps/api/place/nearbysearch/json" + params = { + 'location': f"{lat},{lng}", + 'radius': 10, + 'type': 'point_of_interest', + 'key': API_KEY + } + + try: + response = requests.get(places_url, params=params, timeout=5) + if response.status_code == 200: + return response.json().get('results', []) + except requests.RequestException as e: + print(f"Nearby Places API Fehler für Standort {lat},{lng}: {e}") + return [] + +def get_place_details(place_id): + details_url = f"https://maps.googleapis.com/maps/api/place/details/json" + params = { + 'place_id': place_id, + 'fields': 'formatted_phone_number,website', + 'key': API_KEY + } + + try: + response = requests.get(details_url, params=params, timeout=5) + if response.status_code == 200: + result = response.json().get('result', {}) + return result.get('formatted_phone_number', 'N/A'), result.get('website', 'N/A') + except requests.RequestException as e: + print(f"Place Details API Fehler für Place ID {place_id}: {e}") + return 'N/A', 'N/A' + +def process_file(filename, job_id, app): + with app.app_context(): + filepath = os.path.join(UPLOAD_FOLDER, filename) + results = [] + + job = Job.query.get(job_id) + if not job: + print("Job wurde abgebrochen.") + return + job.status = "In Progress" + db.session.commit() + + with open(filepath, newline='', encoding='ISO-8859-1') as csvfile: + reader = csv.DictReader(csvfile, delimiter=';') + headers = reader.fieldnames + + if not all(field in headers for field in ['PLZ', 'Straße', 'Hausnummer']): + print("CSV-Datei enthält nicht alle notwendigen Spalten.") + job.status = "Failed" + db.session.commit() + return + + for row in reader: + plz = row.get('PLZ', '').strip() + city = row.get('Stadt', row.get('Bezirk', '')).strip() + street = row.get('Straße', '').strip() + house_number = row.get('Hausnummer', '').strip() + additional = row.get('Zusatz', '').strip() + + if not all([plz, city, street, house_number]): + continue + + full_address = f"{street} {house_number} {additional}, {plz} {city}" + lat, lng = get_geocode(full_address) + if lat is None or lng is None: + continue + + nearby_places = get_nearby_places(lat, lng) + for place in nearby_places: + company_name = place['name'] + if company_name in processed_companies: + continue + + processed_companies.add(company_name) + company_address = place.get('vicinity', 'N/A').split(',')[0] + place_id = place.get('place_id') + company_phone, company_website = get_place_details(place_id) if place_id else ('N/A', 'N/A') + + results.append({ + 'PLZ': plz, + 'Stadt': city, + 'Straße': street, + 'Hausnummer': house_number, + 'Zusatz': additional, + 'Company Name': company_name, + 'Company Address': company_address, + 'Company Phone': company_phone, + 'Company Website': company_website + }) + + if results: + result_file = f"results_{os.path.splitext(filename)[0]}.csv" + result_path = os.path.join(RESULT_FOLDER, result_file) + with open(result_path, 'w', newline='', encoding='utf-8-sig') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=[ + 'PLZ', 'Stadt', 'Straße', 'Hausnummer', 'Zusatz', + 'Company Name', 'Company Address', 'Company Phone', 'Company Website' + ]) + writer.writeheader() + writer.writerows(results) + job.status = "Completed" + job.result_filename = result_file + db.session.commit() + else: + job.status = "Failed" + db.session.commit() diff --git a/app/webcrawler.py b/app/webcrawler.py index c4b4aff..dc559ee 100644 --- a/app/webcrawler.py +++ b/app/webcrawler.py @@ -1,128 +1,487 @@ -import csv import os +import re +import unicodedata +import json +import threading +import pandas as pd import requests -from .models import db, Job -from flask import current_app +import time +import random +from io import StringIO +from concurrent.futures import ThreadPoolExecutor, as_completed +from app.models import db, Job -UPLOAD_FOLDER = 'uploads' -RESULT_FOLDER = 'results' -API_KEY = 'AIzaSyAIf0yXJTwo87VMWLBtq2m2LqE-OaPGbzw' +print("🆕 MODERN webcrawler LOADED! – BATCHED + PROXY + RESUME + ETA + 4x SCRAPER CHUNK-PARALLEL") -def get_place_details(street, city_zip): - address = f"{street}, {city_zip}" - url = f"https://maps.googleapis.com/maps/api/place/textsearch/json" - params = {'query': address, 'key': API_KEY} +UPLOAD_FOLDER = '/app/uploads' +RESULT_FOLDER = '/app/results' - results = [] +SCRAPER_URLS = [ + "http://gmaps-scraper-1:8080", + "http://gmaps-scraper-2:8080", + "http://gmaps-scraper-3:8080", + "http://gmaps-scraper-4:8080", +] + +OUTPUT_COLS = ['title', 'category', 'address', 'open_hours', 'website', 'phone', 'link'] + +PROXY_URL = "http://bitlleuv-rotate:s5hzse6hz74b@p.webshare.io:80" +API_PROXIES = {"http": PROXY_URL, "https": PROXY_URL} + +_job_semaphore = threading.Semaphore(1) + +# ────────────────────────────────────────────── +# Tuning +# ────────────────────────────────────────────── +BATCH_SIZE = 30 # Keywords pro Scraper-Job +BATCH_DELAY_MIN = 3 # Sekunden Pause zwischen Chunks (min) +BATCH_DELAY_MAX = 6 # Sekunden Pause zwischen Chunks (max) +MAX_TIME = 60 # Sekunden die der Scraper pro Batch hat +POLL_MAX = 90 # Max. Poll-Versuche pro Batch +POLL_DELAY_MIN = 2 # Sekunden zwischen Polls (min) +POLL_DELAY_MAX = 5 # Sekunden zwischen Polls (max) +STUCK_TIMEOUT = 300 # Sekunden bis Scraper-Neustart (5 Min) +MAX_RETRIES = 2 # Wiederholversuche pro Batch bei Fehler +PARALLEL_WORKERS = len(SCRAPER_URLS) + +_partial_lock = threading.Lock() + +# ────────────────────────────────────────────── +# Hilfsfunktionen +# ────────────────────────────────────────────── + +def is_blocked(data): + text = str(data).lower() + blocked = any(kw in text for kw in ['captcha', 'blocked', 'rate limit', 'too many', '429']) + if blocked: + print(f"🚫 BLOCKED: {str(data)[:100]}") + return blocked + +def fix_encoding(text): + if not isinstance(text, str): + return text try: - response = requests.get(url, params=params, timeout=5) - if response.status_code == 200: - data = response.json() - print(f"API Response Data for {address}: {data}") + return text.encode('latin-1').decode('utf-8') + except (UnicodeEncodeError, UnicodeDecodeError): + return text - for place in data.get('results', []): - name = place.get('name', 'N/A') - place_id = place.get('place_id') - formatted_address = place.get('formatted_address', 'N/A') +def clean_query(q): + q = ''.join(c for c in q if unicodedata.category(c) != 'Cc') + q = ' '.join(q.split()) + return q.strip() - # Zweite Anfrage für detailliertere Informationen - phone, website = 'N/A', 'N/A' - if place_id: - details_url = f"https://maps.googleapis.com/maps/api/place/details/json" - details_params = { - 'place_id': place_id, - 'fields': 'formatted_phone_number,website', - 'key': API_KEY - } - details_response = requests.get(details_url, params=details_params, timeout=5) - if details_response.status_code == 200: - details_data = details_response.json().get('result', {}) - phone = details_data.get('formatted_phone_number', 'N/A') - website = details_data.get('website', 'N/A') +def build_input_addresses(df): + addresses = set() + for _, row in df.iterrows(): + plz = str(row.get('PLZ', '')).strip() + stadt = str(row.get('Stadt', '')).strip() + str_ = str(row.get('Straße', '')).strip() + nr = str(row.get('Hausnummer', '')).strip() + zusatz = str(row.get('Zusatz', '')).strip() + full = f"{str_} {nr} {zusatz} {plz} {stadt}".lower().strip() + full = ' '.join(full.split()) + addresses.add(full) + return addresses - # Speichern nur, wenn Name und Telefonnummer vorhanden sind - if name != 'N/A' and phone != 'N/A': - results.append({ - 'Name': name, - 'Address': formatted_address, - 'Phone': phone, - 'Website': website - }) - else: - print(f"Fehler beim Abrufen der URL: {url} - Statuscode: {response.status_code}") - except requests.exceptions.RequestException as e: - print(f"Anfragefehler für {url}: {e}") +def normalize_address(addr): + if not isinstance(addr, str): + return '' + addr = fix_encoding(addr) + return ' '.join(addr.lower().strip().split()) - return results +def address_in_input(result_addr, input_addresses): + norm = normalize_address(result_addr) + for inp_addr in input_addresses: + plz_match = re.search(r'\b\d{5}\b', inp_addr) + if not plz_match: + continue + plz = plz_match.group() + if plz not in norm: + continue + + parts = inp_addr.split() + + street = parts[0] if parts else '' + if len(street) < 4 or street[:5].lower() not in norm: + continue + + hausnr = parts[1] if len(parts) > 1 else '' + if hausnr and not re.search(rf'\b{re.escape(hausnr)}\b', norm): + continue + + return True + return False + +def format_eta(seconds): + if seconds < 60: + return f"{int(seconds)}s" + h, rem = divmod(int(seconds), 3600) + m = rem // 60 + return f"{h}h {m:02d}min" if h > 0 else f"{m}min" + +# ────────────────────────────────────────────── +# Scraper-Job Cleanup +# ────────────────────────────────────────────── + +def _cleanup_scraper_job(scraper_url, scraper_id): + """Scraper-Job immer aufräumen wenn wir ihn nicht mehr brauchen""" + try: + requests.delete(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=10) + print(f"🗑️ Scraper-Job {scraper_id} gelöscht") + except Exception as e: + print(f"⚠️ Cleanup fehlgeschlagen: {e}") + +# ────────────────────────────────────────────── +# Scraper-Neustart via Docker SDK +# ────────────────────────────────────────────── + +def restart_scraper(scraper_url): + try: + import docker + container_name = scraper_url.split("//")[1].split(":")[0] + print(f"🔄 Starte {container_name} neu...") + client = docker.from_env() + container = client.containers.get(container_name) + container.restart() + print(f"✅ {container_name} neu gestartet – warte 15s...") + time.sleep(15) + return True + except Exception as e: + print(f"⚠️ Scraper-Neustart fehlgeschlagen: {e}") + return False + +# ────────────────────────────────────────────── +# Resume: Progress-File Hilfsfunktionen +# ────────────────────────────────────────────── + +def get_progress_path(job_id): + return os.path.join(RESULT_FOLDER, f"progress_{job_id}.json") + +def get_partial_path(job_id, suffix): + return os.path.join(RESULT_FOLDER, f"partial_{job_id}_{suffix}.csv") + +def load_progress(job_id): + path = get_progress_path(job_id) + if os.path.exists(path): + with open(path, 'r') as f: + data = json.load(f) + print(f"🔁 RESUME: ab Batch {data['last_completed_batch'] + 1}/{data['total_batches']}") + return data + return None + +def save_progress(job_id, last_completed_batch, total_batches): + path = get_progress_path(job_id) + with open(path, 'w') as f: + json.dump({"last_completed_batch": last_completed_batch, "total_batches": total_batches}, f) + +def append_partial(job_id, df_filtered, df_raw): + with _partial_lock: + for suffix, df in [('filtered', df_filtered), ('raw', df_raw)]: + if df is None: + continue + path = get_partial_path(job_id, suffix) + header = not os.path.exists(path) + df.to_csv(path, mode='a', index=False, header=header, encoding='utf-8-sig', sep=';') + +def load_partial(job_id): + results_filtered, results_raw = [], [] + for suffix, lst in [('filtered', results_filtered), ('raw', results_raw)]: + path = get_partial_path(job_id, suffix) + if os.path.exists(path): + try: + df = pd.read_csv(path, sep=';', encoding='utf-8-sig') + lst.append(df) + print(f"📂 Partial {suffix}: {len(df)} Zeilen geladen") + except Exception as e: + print(f"⚠️ Partial {suffix} Ladefehler: {e}") + return results_filtered, results_raw + +def cleanup_progress(job_id): + for path in [ + get_progress_path(job_id), + get_partial_path(job_id, 'filtered'), + get_partial_path(job_id, 'raw'), + ]: + if os.path.exists(path): + os.remove(path) + +# ────────────────────────────────────────────── +# CSV Nachbearbeitung +# ────────────────────────────────────────────── + +def process_result_csv(raw_bytes, input_df, apply_filter=True): + try: + content = raw_bytes.decode('utf-8', errors='replace') + df_out = pd.read_csv(StringIO(content)) + print(f"📄 Raw result: {df_out.shape}") + + available = [c for c in OUTPUT_COLS if c in df_out.columns] + df_out = df_out[available] + + for col in df_out.columns: + df_out[col] = df_out[col].apply(fix_encoding) + + if apply_filter: + input_addresses = build_input_addresses(input_df) + before = len(df_out) + df_out = df_out[ + df_out['address'].apply(lambda a: address_in_input(a, input_addresses)) + ] + print(f"📍 Filter: {before} → {len(df_out)}") + + df_out = df_out.drop_duplicates(subset=['title', 'address'], keep='first') + df_out = df_out.dropna(subset=['title'], how='all') + df_out = df_out[df_out['title'].str.strip().astype(bool)] + + print(f"✅ Final ({'gefiltert' if apply_filter else 'alle'}): {df_out.shape}") + return df_out + except Exception as e: + print(f"💥 process_result_csv: {e}") + return None + +# ────────────────────────────────────────────── +# Parallel: Einzelnen Batch verarbeiten +# ────────────────────────────────────────────── + +def process_batch(batch_idx, batch_queries, scraper_url, filename, job_id, df_input): + payload = { + "name": f"{filename.replace('.csv','')}-{job_id}-B{batch_idx+1:03d}", + "keywords": batch_queries, + "lang": "de", + "depth": 1, + "zoom": 17, + "radius": 100, + "max_time": MAX_TIME, + "fast_mode": False, + "proxies": [PROXY_URL] + } + + for attempt in range(1, MAX_RETRIES + 1): + scraper_id = None + try: + resp = requests.post(f"{scraper_url}/api/v1/jobs", json=payload, timeout=45) + print(f"📤 Batch {batch_idx+1} → {scraper_url} | {resp.status_code} (Versuch {attempt})") + + if is_blocked(resp.text): + print(f"🚫 Batch {batch_idx+1} blocked") + return None, None + + if resp.status_code != 201: + print(f"⚠️ Batch {batch_idx+1} fehlgeschlagen: {resp.text[:100]}") + if attempt < MAX_RETRIES: + time.sleep(10) + continue + + scraper_id = resp.json()['id'] + print(f"✅ Batch {batch_idx+1} Scraper-ID: {scraper_id}") + + batch_start_time = time.time() + + for poll_i in range(1, POLL_MAX + 1): + r = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}", timeout=15) + data = r.json() + status = data.get('Status', data.get('status', '?')) + elapsed = time.time() - batch_start_time + print(f"⏳ Batch {batch_idx+1} Poll {poll_i}: {status} | {int(elapsed)}s") + + if status == 'pending' and elapsed > STUCK_TIMEOUT: + print(f"⚠️ Batch {batch_idx+1} hängt seit {int(elapsed)}s – Neustart {scraper_url}") + _cleanup_scraper_job(scraper_url, scraper_id) + scraper_id = None + restart_scraper(scraper_url) + break + + if status in ('ok', 'completed', 'scraped'): + dl = requests.get(f"{scraper_url}/api/v1/jobs/{scraper_id}/download", timeout=90) + scraper_id = None + if dl.status_code == 200: + df_filtered = process_result_csv(dl.content, df_input, True) + df_raw = process_result_csv(dl.content, df_input, False) + print(f"📊 Batch {batch_idx+1}: {len(df_filtered) if df_filtered is not None else 0} filtered") + return df_filtered, df_raw + return None, None + + elif status in ('failed', 'error'): + print(f"💥 Batch {batch_idx+1}: {status} (Versuch {attempt})") + _cleanup_scraper_job(scraper_url, scraper_id) + scraper_id = None + if attempt < MAX_RETRIES: + time.sleep(10) + break + + time.sleep(random.uniform(POLL_DELAY_MIN, POLL_DELAY_MAX)) + + except Exception as e: + print(f"💥 Batch {batch_idx+1} Versuch {attempt}: {e}") + if scraper_id: + _cleanup_scraper_job(scraper_url, scraper_id) + scraper_id = None + if attempt < MAX_RETRIES: + time.sleep(10) + + return None, None + +# ────────────────────────────────────────────── +# HAUPT-WORKER +# ────────────────────────────────────────────── def process_file(filename, job_id, app): with app.app_context(): - print(f"Starte Prozess für Job-ID: {job_id}") - filepath = os.path.join(UPLOAD_FOLDER, filename) - results = [] - job = Job.query.get(job_id) - if not job: - print("Job wurde abgebrochen, bevor er starten konnte.") - return - job.status = "In Progress" - db.session.commit() - - with open(filepath, newline='', encoding='ISO-8859-1') as csvfile: - reader = csv.DictReader(csvfile, delimiter=';') - rows = list(reader) - total_rows = len(rows) - print(f"Insgesamt zu verarbeitende Zeilen: {total_rows}") - - for index, row in enumerate(rows): - # Job-Verfügbarkeit erneut prüfen - job = Job.query.get(job_id) - if not job: - print("Job wurde abgebrochen.") - return - - # Vollständige Adresse erstellen - street = f"{row.get('Straße', '')} {row.get('Hausnummer', '')}".strip() - city_zip = f"{row.get('PLZ', '')} {row.get('Stadt', '')}".strip() - print(f"Verarbeite Adresse: {street}, {city_zip}") - address_results = get_place_details(street, city_zip) - - for result in address_results: - # Ergebnisse nur speichern, wenn Name und Telefonnummer vorhanden sind - if result['Name'] != 'N/A' and result['Phone'] != 'N/A': - result.update({ - 'PLZ': row.get('PLZ', ''), - 'Stadt': row.get('Stadt', ''), - 'Straße': row.get('Straße', ''), - 'Hausnummer': row.get('Hausnummer', ''), - 'Zusatz': row.get('Zusatz', '') - }) - results.append(result) - - # Results-Dateiname basierend auf dem Upload-Dateinamen - result_file = f"results_{filename}" - result_path = os.path.join(RESULT_FOLDER, result_file) - - # Prüfen und erstellen des Ergebnisverzeichnisses - if not os.path.exists(RESULT_FOLDER): - os.makedirs(RESULT_FOLDER) - print(f"Erstelle Ergebnisverzeichnis: {RESULT_FOLDER}") - - try: - if results: # Nur speichern, wenn Ergebnisse vorhanden sind - with open(result_path, 'w', newline='', encoding='utf-8-sig') as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=['Name', 'Address', 'Phone', 'Website', 'PLZ', 'Stadt', 'Straße', 'Hausnummer', 'Zusatz']) - writer.writeheader() - writer.writerows(results) - print(f"Ergebnisdatei erfolgreich gespeichert unter: {result_path}") - job.status = "Completed" - job.result_filename = result_file - db.session.commit() - else: - print("Keine relevanten Ergebnisse zum Speichern vorhanden. Markiere den Job als 'Failed'.") - job.status = "Failed" - db.session.commit() - except Exception as e: - print(f"Fehler beim Schreiben der Ergebnisdatei: {e}") - job.status = "Failed" + if job: + job.status = "⏳ Wartet auf anderen Job..." db.session.commit() + + with _job_semaphore: + print(f"🎯 {filename} Job#{job_id} START!") + + with app.app_context(): + job = Job.query.get(job_id) + if not job: + print("❌ Job missing") + return + + try: + job.status = "📊 parsing CSV" + db.session.commit() + + filepath = os.path.join(UPLOAD_FOLDER, filename) + print(f"📁 {filepath} | {os.path.getsize(filepath)}b") + + df_input = pd.read_csv(filepath, sep=';', encoding='ISO-8859-1') + print(f"📊 {df_input.shape}") + + queries = [] + for _, row in df_input.iterrows(): + parts = [ + str(row.get('PLZ', '')).strip(), + str(row.get('Stadt', '')).strip(), + str(row.get('Straße', '')).strip(), + str(row.get('Hausnummer', '')).strip(), + str(row.get('Zusatz', '')).strip(), + ] + q = f"Firmen {' '.join(p for p in parts if p and p != 'nan')}".strip() + q = clean_query(q) + if len(q) > 10: + queries.append(q) + + total_queries = len(queries) + print(f"🔍 {total_queries} Queries | Samples: {queries[:3]}") + if total_queries == 0: + raise ValueError("Keine gültigen Adressen") + + batches = (total_queries + BATCH_SIZE - 1) // BATCH_SIZE + + os.makedirs(RESULT_FOLDER, exist_ok=True) + progress = load_progress(job_id) + start_batch = progress['last_completed_batch'] + 1 if progress else 0 + all_results_filtered, all_results_raw = load_partial(job_id) if progress else ([], []) + + eta_initial = format_eta((batches - start_batch) * ((BATCH_DELAY_MAX + MAX_TIME) / 2) / PARALLEL_WORKERS) + print(f"📦 {batches} Batches à {BATCH_SIZE} | {PARALLEL_WORKERS}x parallel (Chunk) | Start: {start_batch} | ETA: ~{eta_initial}") + job_start_time = time.time() + job.status = f"🔄 Batch {start_batch+1}/{batches} | ⏱️ ~{eta_initial}" + db.session.commit() + + completed_count = 0 + + batch_indices = list(range(start_batch, batches)) + chunks = [ + batch_indices[i:i + PARALLEL_WORKERS] + for i in range(0, len(batch_indices), PARALLEL_WORKERS) + ] + + with ThreadPoolExecutor(max_workers=PARALLEL_WORKERS) as executor: + for chunk_idx, chunk in enumerate(chunks): + futures = {} + + for batch_idx in chunk: + batch_start_q = batch_idx * BATCH_SIZE + batch_end_q = min(batch_start_q + BATCH_SIZE, total_queries) + batch_queries = queries[batch_start_q:batch_end_q] + scraper_url = SCRAPER_URLS[batch_idx % len(SCRAPER_URLS)] + + print(f"\n🚀 Chunk {chunk_idx+1} | Batch {batch_idx+1}/{batches} → {scraper_url}") + time.sleep(random.uniform(1, 2)) + + future = executor.submit( + process_batch, + batch_idx, batch_queries, scraper_url, + filename, job_id, df_input + ) + futures[future] = batch_idx + + for future in as_completed(futures): + batch_idx = futures[future] + completed_count += 1 + try: + df_filtered, df_raw = future.result() + if df_filtered is not None: + all_results_filtered.append(df_filtered) + all_results_raw.append(df_raw) + append_partial(job_id, df_filtered, df_raw) + except Exception as e: + print(f"💥 Batch {batch_idx+1} Exception: {e}") + + save_progress(job_id, batch_idx, batches) + + elapsed = time.time() - job_start_time + if completed_count > 0: + avg_per_batch = elapsed / completed_count + remaining = (batches - start_batch - completed_count) * avg_per_batch / PARALLEL_WORKERS + eta_str = format_eta(remaining) + else: + eta_str = "?" + + job.status = f"🔄 {completed_count}/{batches - start_batch} fertig | ⏱️ ~{eta_str}" + db.session.commit() + + if chunk_idx < len(chunks) - 1: + delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX) + print(f"⏸️ Chunk {chunk_idx+1} fertig – warte {delay:.1f}s...") + time.sleep(delay) + + # ── MERGE & SAVE ── + job.status = "🔧 merging results" + db.session.commit() + + base = filename.replace('.csv', '') + + if all_results_filtered: + df_final_filtered = pd.concat(all_results_filtered, ignore_index=True) + df_final_filtered = df_final_filtered.drop_duplicates(subset=['title', 'address']) + + out_filtered = f"results_{base}_filtered.csv" + df_final_filtered.to_csv( + os.path.join(RESULT_FOLDER, out_filtered), + index=False, encoding='utf-8-sig', sep=';' + ) + + out_raw = None + if all_results_raw: + df_final_raw = pd.concat(all_results_raw, ignore_index=True) + out_raw = f"results_{base}_all.csv" + df_final_raw.to_csv( + os.path.join(RESULT_FOLDER, out_raw), + index=False, encoding='utf-8-sig', sep=';' + ) + + job.result_filename = out_filtered + job.result_filename_raw = out_raw + job.status = f"✅ Fertig: {len(df_final_filtered)} Firmen" + + cleanup_progress(job_id) + else: + job.status = "❌ Keine Ergebnisse" + + db.session.commit() + print(f"🎉 Job {job_id} komplett!") + + except Exception as e: + job.status = f"Failed: {str(e)[:50]}" + print(f"💥 FATAL: {e}") + import traceback + traceback.print_exc() + db.session.commit() + + print(f"✅ DONE! Status: {job.status}") diff --git a/delete-crawl-jobs.py b/delete-crawl-jobs.py new file mode 100644 index 0000000..b58dc6b --- /dev/null +++ b/delete-crawl-jobs.py @@ -0,0 +1,21 @@ +import requests +import time + +base_url = "http://localhost:5001/api/v1/jobs" + +response = requests.get(base_url) +jobs = response.json() # Direkt Array +print(f"{len(jobs)} Jobs gefunden.") + +deleted = 0 +for job in jobs: + job_id = job["ID"] + del_res = requests.delete(f"{base_url}/{job_id}") + if del_res.status_code in [200, 204]: + print(f"✓ {job_id}") + deleted += 1 + else: + print(f"✗ {job_id}: {del_res.status_code}") + time.sleep(0.1) + +print(f"{deleted}/{len(jobs)} gelöscht.") diff --git a/docker-compose.yml b/docker-compose.yml index e11dd79..8e14ef1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3' +version: '3.8' services: web: build: . @@ -6,6 +6,114 @@ services: - "5000:5000" environment: - FLASK_APP=app - command: flask run --host=0.0.0.0 --port=5000 + - FLASK_ENV=production + - PYTHONUNBUFFERED=1 volumes: - - .:/app + - ./app:/app/app + - ./uploads:/app/uploads + - ./results:/app/results + - ./instance:/app/instance + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - gmaps-scraper-1 + - gmaps-scraper-2 + - gmaps-scraper-3 + - gmaps-scraper-4 + restart: always + networks: + - scraper-net + + gmaps-scraper-1: + image: gosom/google-maps-scraper:latest + container_name: gmaps-scraper-1 + environment: + - PLAYWRIGHT_BROWSERS_PATH=/ms-playwright + ports: + - "5001:8080" + volumes: + - ./scraper-data-1:/gmapsdata + command: + - "-web" + - "-data-folder=/gmapsdata" + restart: always + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:8080/api/v1/jobs || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 15s + networks: + - scraper-net + + + gmaps-scraper-2: + image: gosom/google-maps-scraper:latest + container_name: gmaps-scraper-2 + environment: + - PLAYWRIGHT_BROWSERS_PATH=/ms-playwright + ports: + - "5002:8080" + volumes: + - ./scraper-data-2:/gmapsdata + command: + - "-web" + - "-data-folder=/gmapsdata" + restart: always + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:8080/api/v1/jobs || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 15s + networks: + - scraper-net + + gmaps-scraper-3: + image: gosom/google-maps-scraper:latest + container_name: gmaps-scraper-3 + environment: + - PLAYWRIGHT_BROWSERS_PATH=/ms-playwright + ports: + - "5003:8080" + volumes: + - ./scraper-data-3:/gmapsdata + command: + - "-web" + - "-data-folder=/gmapsdata" + restart: always + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:8080/api/v1/jobs || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 15s + networks: + - scraper-net + + gmaps-scraper-4: + image: gosom/google-maps-scraper:latest + container_name: gmaps-scraper-4 + environment: + - PLAYWRIGHT_BROWSERS_PATH=/ms-playwright + ports: + - "5004:8080" + volumes: + - ./scraper-data-4:/gmapsdata + command: + - "-web" + - "-data-folder=/gmapsdata" + restart: always + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:8080/api/v1/jobs || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 15s + networks: + - scraper-net + + + +networks: + scraper-net: + driver: bridge diff --git a/instance/users.db b/instance/users.db index 5f9b034..df1c9b8 100644 Binary files a/instance/users.db and b/instance/users.db differ diff --git a/migrations/README b/migrations/README new file mode 100644 index 0000000..0e04844 --- /dev/null +++ b/migrations/README @@ -0,0 +1 @@ +Single-database configuration for Flask. diff --git a/migrations/alembic.ini b/migrations/alembic.ini new file mode 100644 index 0000000..ec9d45c --- /dev/null +++ b/migrations/alembic.ini @@ -0,0 +1,50 @@ +# A generic, single database configuration. + +[alembic] +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic,flask_migrate + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[logger_flask_migrate] +level = INFO +handlers = +qualname = flask_migrate + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..4c97092 --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,113 @@ +import logging +from logging.config import fileConfig + +from flask import current_app + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) +logger = logging.getLogger('alembic.env') + + +def get_engine(): + try: + # this works with Flask-SQLAlchemy<3 and Alchemical + return current_app.extensions['migrate'].db.get_engine() + except (TypeError, AttributeError): + # this works with Flask-SQLAlchemy>=3 + return current_app.extensions['migrate'].db.engine + + +def get_engine_url(): + try: + return get_engine().url.render_as_string(hide_password=False).replace( + '%', '%%') + except AttributeError: + return str(get_engine().url).replace('%', '%%') + + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +config.set_main_option('sqlalchemy.url', get_engine_url()) +target_db = current_app.extensions['migrate'].db + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def get_metadata(): + if hasattr(target_db, 'metadatas'): + return target_db.metadatas[None] + return target_db.metadata + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, target_metadata=get_metadata(), literal_binds=True + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + + # this callback is used to prevent an auto-migration from being generated + # when there are no changes to the schema + # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html + def process_revision_directives(context, revision, directives): + if getattr(config.cmd_opts, 'autogenerate', False): + script = directives[0] + if script.upgrade_ops.is_empty(): + directives[:] = [] + logger.info('No changes in schema detected.') + + conf_args = current_app.extensions['migrate'].configure_args + if conf_args.get("process_revision_directives") is None: + conf_args["process_revision_directives"] = process_revision_directives + + connectable = get_engine() + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=get_metadata(), + **conf_args + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..2c01563 --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/migrations/versions/10331d61a25d_add_is_admin_column_to_user_model.py b/migrations/versions/10331d61a25d_add_is_admin_column_to_user_model.py new file mode 100644 index 0000000..0245f0b --- /dev/null +++ b/migrations/versions/10331d61a25d_add_is_admin_column_to_user_model.py @@ -0,0 +1,45 @@ +"""Add is_admin column to User model + +Revision ID: 10331d61a25d +Revises: +Create Date: 2024-11-14 08:36:27.125841 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '10331d61a25d' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('user') + op.drop_table('job') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('job', + sa.Column('id', sa.INTEGER(), nullable=False), + sa.Column('user_id', sa.INTEGER(), nullable=False), + sa.Column('filename', sa.VARCHAR(length=150), nullable=False), + sa.Column('status', sa.VARCHAR(length=50), nullable=True), + sa.Column('created_at', sa.DATETIME(), nullable=True), + sa.Column('result_filename', sa.VARCHAR(length=150), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('user', + sa.Column('id', sa.INTEGER(), nullable=False), + sa.Column('username', sa.VARCHAR(length=150), nullable=False), + sa.Column('password', sa.VARCHAR(length=150), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('username') + ) + # ### end Alembic commands ### diff --git a/requirements.txt b/requirements.txt index 979acdc..6b31110 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ -Flask==2.2.5 -Flask-Login==0.6.2 -Flask-SQLAlchemy==3.0.3 -Werkzeug==2.2.2 +flask +flask-sqlalchemy +flask-login +flask-migrate pandas requests -beautifulsoup4 +werkzeug +docker