# gpg_api.py from flask import Flask, Request, request, jsonify, send_file import gnupg import tempfile import os import logging import re from werkzeug.utils import secure_filename import io app = Flask(__name__) app.config UPLOAD_DIR = os.environ.get("GPG_UPLOAD_DIR", "tmp/gpg_files") os.makedirs(UPLOAD_DIR, exist_ok=True) os.chmod(UPLOAD_DIR, 0o700) # Ensure proper permissions GPG_KEYRING_DIR = os.environ.get("GPG_KEYRING_DIR", "tmp/gpg_keyring") os.makedirs(GPG_KEYRING_DIR, exist_ok=True) os.chmod(GPG_KEYRING_DIR, 0o700) # Ensure proper permissions # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize GPG instance gpg_instance = gnupg.GPG( gnupghome=GPG_KEYRING_DIR, ) gpg_instance.encoding = "utf-8" def valid_email(email): """Validate email format using regex pattern""" pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return re.match(pattern, email) is not None def cleanup_file(filepath): """Safely cleanup a file with error handling""" try: if os.path.exists(filepath): os.remove(filepath) logger.info(f"Cleaned up file: {filepath}") except OSError as e: logger.warning(f"Failed to clean up {filepath}: {e}") def get_gpg_context(armor=False, passphrase=None): """Create a GPG context with common settings""" try: # For python-gnupg, we return the global instance # Armor and passphrase are handled per operation return gpg_instance except Exception as e: logger.error(f"Failed to create GPG context: {e}") raise def find_key_by_email(email, secret=False): """Find a GPG key by email address""" try: keys = gpg_instance.list_keys(secret=secret) for key in keys: for uid in key["uids"]: if email.lower() in uid.lower(): return key return None except Exception as e: logger.error(f"Failed to find key for {email}: {e}") return None # @app.route("/api/setup/gnupg", methods=["GET"]) # def setup_gnupg(): # """Check if GnuPG is available through python-gnupg""" # try: # # Try to get version info # version_info = gpg_instance.version # logger.info("GnuPG accessible through python-gnupg") # return jsonify( # { # "status": "GnuPG is available through python-gnupg", # "version": version_info, # } # ) # except Exception as e: # logger.warning(f"GnuPG not accessible: {e}") # return ( # jsonify( # { # "error": "GnuPG is not accessible. Please ensure it's properly installed." # } # ), # 404, # ) @app.route("/api/generate-key", methods=["POST"]) def generate_key(): data = request.json # Input validation if not data or not all(key in data for key in ["name", "email", "passphrase"]): return jsonify({"error": "Missing required fields"}), 400 name = data["name"].strip() email = data["email"].strip() comment = data.get("comment", "").strip() passphrase = data["passphrase"] # Validate inputs if not valid_email(email): return jsonify({"error": "Invalid email format"}), 400 if len(name) < 2 or len(name) > 100: return jsonify({"error": "Name must be between 2 and 100 characters"}), 400 if len(passphrase) < 8: return jsonify({"error": "Passphrase must be at least 8 characters"}), 400 try: # Generate key using python-gnupg input_data = gpg_instance.gen_key_input( name_real=name, name_email=email, name_comment=comment if comment else None, key_type="RSA", key_length=4096, expire_date=0, # No expiration passphrase=passphrase, ) result = gpg_instance.gen_key(input_data) if result.status == "ok": logger.info(f"GPG key generated successfully for {email}") return jsonify( { "status": "Key generated successfully", "fingerprint": str(result.fingerprint), } ) else: logger.error(f"Key generation failed for {email}: {result.stderr}") return ( jsonify({"error": "Key generation failed", "details": result.stderr}), 500, ) except Exception as e: logger.error(f"Key generation failed for {email}: {e}") return jsonify({"error": "Key generation failed", "details": str(e)}), 500 @app.route("/api/download/public-key", methods=["POST"]) def export_key(): email = request.json.get("email") if not email: return jsonify({"error": "Email parameter is required"}), 400 if not valid_email(email): return jsonify({"error": "Invalid email format"}), 400 try: # Find the key by email key = find_key_by_email(email, secret=False) if not key: return jsonify({"error": "No public key found for this email"}), 404 # Export the key to a string exported_key = gpg_instance.export_keys(key["fingerprint"], armor=True) if not exported_key: return jsonify({"error": "Failed to export key"}), 500 # Create an in-memory stream file_stream = io.BytesIO(exported_key.encode("utf-8")) filename = f"{secure_filename(email)}_public.asc" logger.info(f"Public key exported for {email}") return send_file( file_stream, as_attachment=True, download_name=filename, mimetype="application/pgp-keys", ) except Exception as e: logger.error(f"Export failed for {email}: {e}") return jsonify({"error": "Export failed"}), 500 @app.route("/api/upload-decrypt", methods=["POST"]) def upload_and_decrypt(): if "file" not in request.files: return jsonify({"error": "Missing file"}), 400 file = request.files["file"] passphrase = request.form.get("passphrase") if not file.filename: return jsonify({"error": "No file selected"}), 400 if not passphrase: return jsonify({"error": "Passphrase is required"}), 400 filename = secure_filename(file.filename) if not filename.endswith(".gpg"): return jsonify({"error": "File must have .gpg extension"}), 400 try: # Read the encrypted file directly into memory encrypted_data = file.read() # Decrypt the data using python-gnupg result = gpg_instance.decrypt(encrypted_data, passphrase=passphrase) if not result.ok: logger.error(f"Decryption failed for {filename}: {result.stderr}") return ( jsonify({"error": "Decryption failed", "details": result.stderr}), 500, ) # Create an in-memory stream for the decrypted data base_name = filename[:-4] if filename.endswith(".gpg") else filename output_filename = f"decrypted_{base_name}.zip" decrypted_stream = io.BytesIO(result.data) logger.info(f"File {filename} decrypted successfully") return send_file( decrypted_stream, as_attachment=True, download_name=output_filename, mimetype="application/zip", ) except Exception as e: logger.error(f"Decryption failed for {filename}: {e}") return jsonify({"error": "Decryption failed", "details": str(e)}), 500 @app.route("/api/encrypt-file", methods=["POST"]) def encrypt_file(): if "file" not in request.files: return jsonify({"error": "Missing file"}), 400 file = request.files["file"] email = request.form.get("email") if not file.filename: return jsonify({"error": "No file selected"}), 400 if not email: return jsonify({"error": "Email parameter is required"}), 400 if not valid_email(email): return jsonify({"error": "Invalid email format"}), 400 try: # Find the public key by email key = find_key_by_email(email, secret=False) if not key: return jsonify({"error": "No public key found for this email"}), 404 # Read the file data file_data = file.read() # Encrypt the file using the public key result = gpg_instance.encrypt( file_data, recipients=[key["fingerprint"]], armor=False, always_trust=True, ) if not result.ok: logger.error(f"Encryption failed for {file.filename}: {result.stderr}") return ( jsonify({"error": "Encryption failed", "details": result.stderr}), 500, ) # Create filename for encrypted file filename = secure_filename(file.filename) encrypted_filename = f"{filename}.gpg" # Create an in-memory stream for the encrypted data encrypted_stream = io.BytesIO(result.data) logger.info(f"File {filename} encrypted successfully for {email}") return send_file( encrypted_stream, as_attachment=True, download_name=encrypted_filename, mimetype="application/pgp-encrypted", ) except Exception as e: logger.error(f"Encryption failed for {file.filename}: {e}") return jsonify({"error": "Encryption failed", "details": str(e)}), 500 # @app.route("/api/list-keys", methods=["GET"]) # def list_keys(): # """List all available GPG keys""" # try: # keys = [] # public_keys = gpg_instance.list_keys() # private_keys = gpg_instance.list_keys(secret=True) # for key in public_keys: # key_info = { # "fingerprint": key.get("fingerprint", ""), # "keyid": key.get("keyid", ""), # "type": key.get("type", ""), # "length": key.get("length", ""), # "date": key.get("date", ""), # "expires": key.get("expires", ""), # "uids": key.get("uids", []), # "trust": key.get("trust", ""), # "secret": any( # priv_key["fingerprint"] == key.get("fingerprint", "") # for priv_key in private_keys # ), # } # keys.append(key_info) # logger.info(f"Listed {len(keys)} keys") # return jsonify({"keys": keys, "count": len(keys)}) # except Exception as e: # logger.error(f"Failed to list keys: {e}") # return jsonify({"error": "Failed to list keys", "details": str(e)}), 500 if __name__ == "__main__": app.run(host="0.0.0.0", port=8080, debug=True)