From aaabe8337951cb72fedfea1441c038b724398aa6 Mon Sep 17 00:00:00 2001 From: Julio Cesar Date: Wed, 9 Jul 2025 13:18:27 +0200 Subject: [PATCH] Replace gpg_key_ui_backend.py with a new implementation in app.py for GPG key management and encryption features --- app.py | 336 ++++++++++++++++++++++++++++++++++++++++++ gpg_key_ui_backend.py | 90 ----------- 2 files changed, 336 insertions(+), 90 deletions(-) create mode 100644 app.py delete mode 100644 gpg_key_ui_backend.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..0e4eb4b --- /dev/null +++ b/app.py @@ -0,0 +1,336 @@ +# gpg_api.py + +from flask import Flask, Request, request, jsonify, send_file +import gnupg +import tempfile +import os +import logging +import re +from werkzeug.utils import secure_filename +import io + +app = Flask(__name__) +app.config + +UPLOAD_DIR = os.environ.get("GPG_UPLOAD_DIR", "tmp/gpg_files") +os.makedirs(UPLOAD_DIR, exist_ok=True) +os.chmod(UPLOAD_DIR, 0o700) # Ensure proper permissions + +GPG_KEYRING_DIR = os.environ.get("GPG_KEYRING_DIR", "tmp/gpg_keyring") +os.makedirs(GPG_KEYRING_DIR, exist_ok=True) +os.chmod(GPG_KEYRING_DIR, 0o700) # Ensure proper permissions + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize GPG instance +gpg_instance = gnupg.GPG( + gnupghome=GPG_KEYRING_DIR, +) +gpg_instance.encoding = "utf-8" + + +def valid_email(email): + """Validate email format using regex pattern""" + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" + return re.match(pattern, email) is not None + + +def cleanup_file(filepath): + """Safely cleanup a file with error handling""" + try: + if os.path.exists(filepath): + os.remove(filepath) + logger.info(f"Cleaned up file: {filepath}") + except OSError as e: + logger.warning(f"Failed to clean up {filepath}: {e}") + + +def get_gpg_context(armor=False, passphrase=None): + """Create a GPG context with common settings""" + try: + # For python-gnupg, we return the global instance + # Armor and passphrase are handled per operation + return gpg_instance + except Exception as e: + logger.error(f"Failed to create GPG context: {e}") + raise + + +def find_key_by_email(email, secret=False): + """Find a GPG key by email address""" + try: + keys = gpg_instance.list_keys(secret=secret) + for key in keys: + for uid in key["uids"]: + if email.lower() in uid.lower(): + return key + return None + except Exception as e: + logger.error(f"Failed to find key for {email}: {e}") + return None + + +# @app.route("/api/setup/gnupg", methods=["GET"]) +# def setup_gnupg(): +# """Check if GnuPG is available through python-gnupg""" +# try: +# # Try to get version info +# version_info = gpg_instance.version +# logger.info("GnuPG accessible through python-gnupg") +# return jsonify( +# { +# "status": "GnuPG is available through python-gnupg", +# "version": version_info, +# } +# ) +# except Exception as e: +# logger.warning(f"GnuPG not accessible: {e}") +# return ( +# jsonify( +# { +# "error": "GnuPG is not accessible. Please ensure it's properly installed." +# } +# ), +# 404, +# ) + + +@app.route("/api/generate-key", methods=["POST"]) +def generate_key(): + data = request.json + + # Input validation + if not data or not all(key in data for key in ["name", "email", "passphrase"]): + return jsonify({"error": "Missing required fields"}), 400 + + name = data["name"].strip() + email = data["email"].strip() + comment = data.get("comment", "").strip() + passphrase = data["passphrase"] + + # Validate inputs + if not valid_email(email): + return jsonify({"error": "Invalid email format"}), 400 + + if len(name) < 2 or len(name) > 100: + return jsonify({"error": "Name must be between 2 and 100 characters"}), 400 + + if len(passphrase) < 8: + return jsonify({"error": "Passphrase must be at least 8 characters"}), 400 + + try: + # Generate key using python-gnupg + input_data = gpg_instance.gen_key_input( + name_real=name, + name_email=email, + name_comment=comment if comment else None, + key_type="RSA", + key_length=4096, + expire_date=0, # No expiration + passphrase=passphrase, + ) + + result = gpg_instance.gen_key(input_data) + + if result.status == "ok": + logger.info(f"GPG key generated successfully for {email}") + return jsonify( + { + "status": "Key generated successfully", + "fingerprint": str(result.fingerprint), + } + ) + else: + logger.error(f"Key generation failed for {email}: {result.stderr}") + return ( + jsonify({"error": "Key generation failed", "details": result.stderr}), + 500, + ) + except Exception as e: + logger.error(f"Key generation failed for {email}: {e}") + return jsonify({"error": "Key generation failed", "details": str(e)}), 500 + + +@app.route("/api/download/public-key", methods=["POST"]) +def export_key(): + email = request.json.get("email") + + if not email: + return jsonify({"error": "Email parameter is required"}), 400 + + if not valid_email(email): + return jsonify({"error": "Invalid email format"}), 400 + + try: + # Find the key by email + key = find_key_by_email(email, secret=False) + if not key: + return jsonify({"error": "No public key found for this email"}), 404 + + # Export the key to a string + exported_key = gpg_instance.export_keys(key["fingerprint"], armor=True) + + if not exported_key: + return jsonify({"error": "Failed to export key"}), 500 + + # Create an in-memory stream + file_stream = io.BytesIO(exported_key.encode("utf-8")) + filename = f"{secure_filename(email)}_public.asc" + + logger.info(f"Public key exported for {email}") + return send_file( + file_stream, + as_attachment=True, + download_name=filename, + mimetype="application/pgp-keys", + ) + except Exception as e: + logger.error(f"Export failed for {email}: {e}") + return jsonify({"error": "Export failed"}), 500 + + +@app.route("/api/upload-decrypt", methods=["POST"]) +def upload_and_decrypt(): + if "file" not in request.files: + return jsonify({"error": "Missing file"}), 400 + file = request.files["file"] + passphrase = request.form.get("passphrase") + + if not file.filename: + return jsonify({"error": "No file selected"}), 400 + + if not passphrase: + return jsonify({"error": "Passphrase is required"}), 400 + + filename = secure_filename(file.filename) + if not filename.endswith(".gpg"): + return jsonify({"error": "File must have .gpg extension"}), 400 + + try: + # Read the encrypted file directly into memory + encrypted_data = file.read() + + # Decrypt the data using python-gnupg + result = gpg_instance.decrypt(encrypted_data, passphrase=passphrase) + + if not result.ok: + logger.error(f"Decryption failed for {filename}: {result.stderr}") + return ( + jsonify({"error": "Decryption failed", "details": result.stderr}), + 500, + ) + + # Create an in-memory stream for the decrypted data + base_name = filename[:-4] if filename.endswith(".gpg") else filename + output_filename = f"decrypted_{base_name}.zip" + + decrypted_stream = io.BytesIO(result.data) + + logger.info(f"File {filename} decrypted successfully") + return send_file( + decrypted_stream, + as_attachment=True, + download_name=output_filename, + mimetype="application/zip", + ) + except Exception as e: + logger.error(f"Decryption failed for {filename}: {e}") + return jsonify({"error": "Decryption failed", "details": str(e)}), 500 + + +@app.route("/api/encrypt-file", methods=["POST"]) +def encrypt_file(): + if "file" not in request.files: + return jsonify({"error": "Missing file"}), 400 + + file = request.files["file"] + email = request.form.get("email") + + if not file.filename: + return jsonify({"error": "No file selected"}), 400 + + if not email: + return jsonify({"error": "Email parameter is required"}), 400 + + if not valid_email(email): + return jsonify({"error": "Invalid email format"}), 400 + + try: + # Find the public key by email + key = find_key_by_email(email, secret=False) + if not key: + return jsonify({"error": "No public key found for this email"}), 404 + + # Read the file data + file_data = file.read() + + # Encrypt the file using the public key + result = gpg_instance.encrypt( + file_data, + recipients=[key["fingerprint"]], + armor=False, + always_trust=True, + ) + + if not result.ok: + logger.error(f"Encryption failed for {file.filename}: {result.stderr}") + return ( + jsonify({"error": "Encryption failed", "details": result.stderr}), + 500, + ) + + # Create filename for encrypted file + filename = secure_filename(file.filename) + encrypted_filename = f"{filename}.gpg" + + # Create an in-memory stream for the encrypted data + encrypted_stream = io.BytesIO(result.data) + + logger.info(f"File {filename} encrypted successfully for {email}") + return send_file( + encrypted_stream, + as_attachment=True, + download_name=encrypted_filename, + mimetype="application/pgp-encrypted", + ) + except Exception as e: + logger.error(f"Encryption failed for {file.filename}: {e}") + return jsonify({"error": "Encryption failed", "details": str(e)}), 500 + + +# @app.route("/api/list-keys", methods=["GET"]) +# def list_keys(): +# """List all available GPG keys""" +# try: +# keys = [] +# public_keys = gpg_instance.list_keys() +# private_keys = gpg_instance.list_keys(secret=True) + +# for key in public_keys: +# key_info = { +# "fingerprint": key.get("fingerprint", ""), +# "keyid": key.get("keyid", ""), +# "type": key.get("type", ""), +# "length": key.get("length", ""), +# "date": key.get("date", ""), +# "expires": key.get("expires", ""), +# "uids": key.get("uids", []), +# "trust": key.get("trust", ""), +# "secret": any( +# priv_key["fingerprint"] == key.get("fingerprint", "") +# for priv_key in private_keys +# ), +# } +# keys.append(key_info) + +# logger.info(f"Listed {len(keys)} keys") +# return jsonify({"keys": keys, "count": len(keys)}) +# except Exception as e: +# logger.error(f"Failed to list keys: {e}") +# return jsonify({"error": "Failed to list keys", "details": str(e)}), 500 + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8080, debug=True) diff --git a/gpg_key_ui_backend.py b/gpg_key_ui_backend.py deleted file mode 100644 index 2f5d777..0000000 --- a/gpg_key_ui_backend.py +++ /dev/null @@ -1,90 +0,0 @@ -# gpg_api.py - -from flask import Flask, request, jsonify, send_file -import subprocess -import tempfile -import os -from werkzeug.utils import secure_filename - -app = Flask(__name__) -UPLOAD_DIR = "/tmp/gpg_files" -os.makedirs(UPLOAD_DIR, exist_ok=True) - -@app.route("/api/setup/gnupg", methods=["GET"]) -def setup_gnupg(): - try: - subprocess.run(["gpg", "--version"], check=True) - return jsonify({"status": "GnuPG is already installed"}) - except Exception: - subprocess.run(["apt", "update"], check=True) - subprocess.run(["apt", "install", "-y", "gnupg"], check=True) - return jsonify({"status": "GnuPG installed"}) - -@app.route("/api/generate-key", methods=["POST"]) -def generate_key(): - data = request.json - name = data['name'] - email = data['email'] - comment = data.get('comment', '') - passphrase = data['passphrase'] - - key_input = f""" - %echo Generating GPG Key - Key-Type: RSA - Key-Length: 4096 - Name-Real: {name} - Name-Email: {email} - Name-Comment: {comment} - Expire-Date: 0 - Passphrase: {passphrase} - %commit - %echo Done - """ - - with tempfile.NamedTemporaryFile(mode='w+', delete=False) as f: - f.write(key_input) - keyfile_path = f.name - - try: - subprocess.run(["gpg", "--batch", "--generate-key", keyfile_path], check=True) - return jsonify({"status": "Key generated successfully"}) - except subprocess.CalledProcessError as e: - return jsonify({"error": "Key generation failed", "details": str(e)}), 500 - finally: - os.remove(keyfile_path) - -@app.route("/api/download/public-key", methods=["GET"]) -def export_key(): - email = request.args.get('email') - filename = os.path.join(UPLOAD_DIR, f"{secure_filename(email)}_public.asc") - try: - subprocess.run(["gpg", "--armor", "--output", filename, "--export", email], check=True) - return send_file(filename, as_attachment=True) - except Exception as e: - return jsonify({"error": "Export failed", "details": str(e)}), 500 - -@app.route("/api/upload-decrypt", methods=["POST"]) -def upload_and_decrypt(): - if 'file' not in request.files: - return jsonify({"error": "Missing file"}), 400 - file = request.files['file'] - passphrase = request.form.get('passphrase') - - filename = secure_filename(file.filename) - gpg_path = os.path.join(UPLOAD_DIR, filename) - output_path = os.path.join(UPLOAD_DIR, f"decrypted_{filename[:-4]}.zip") - file.save(gpg_path) - - try: - subprocess.run([ - "gpg", "--batch", "--yes", - "--passphrase", passphrase, - "--output", output_path, - "--decrypt", gpg_path - ], check=True) - return send_file(output_path, as_attachment=True) - except subprocess.CalledProcessError as e: - return jsonify({"error": "Decryption failed", "details": str(e)}), 500 - -if __name__ == '__main__': - app.run(host='0.0.0.0', port=8080, debug=True)