Replace gpg_key_ui_backend.py with a new implementation in app.py for GPG key management and encryption features
This commit is contained in:
336
app.py
Normal file
336
app.py
Normal file
@@ -0,0 +1,336 @@
|
||||
# gpg_api.py
|
||||
|
||||
from flask import Flask, Request, request, jsonify, send_file
|
||||
import gnupg
|
||||
import tempfile
|
||||
import os
|
||||
import logging
|
||||
import re
|
||||
from werkzeug.utils import secure_filename
|
||||
import io
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config
|
||||
|
||||
UPLOAD_DIR = os.environ.get("GPG_UPLOAD_DIR", "tmp/gpg_files")
|
||||
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
||||
os.chmod(UPLOAD_DIR, 0o700) # Ensure proper permissions
|
||||
|
||||
GPG_KEYRING_DIR = os.environ.get("GPG_KEYRING_DIR", "tmp/gpg_keyring")
|
||||
os.makedirs(GPG_KEYRING_DIR, exist_ok=True)
|
||||
os.chmod(GPG_KEYRING_DIR, 0o700) # Ensure proper permissions
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Initialize GPG instance
|
||||
gpg_instance = gnupg.GPG(
|
||||
gnupghome=GPG_KEYRING_DIR,
|
||||
)
|
||||
gpg_instance.encoding = "utf-8"
|
||||
|
||||
|
||||
def valid_email(email):
|
||||
"""Validate email format using regex pattern"""
|
||||
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
|
||||
return re.match(pattern, email) is not None
|
||||
|
||||
|
||||
def cleanup_file(filepath):
|
||||
"""Safely cleanup a file with error handling"""
|
||||
try:
|
||||
if os.path.exists(filepath):
|
||||
os.remove(filepath)
|
||||
logger.info(f"Cleaned up file: {filepath}")
|
||||
except OSError as e:
|
||||
logger.warning(f"Failed to clean up {filepath}: {e}")
|
||||
|
||||
|
||||
def get_gpg_context(armor=False, passphrase=None):
|
||||
"""Create a GPG context with common settings"""
|
||||
try:
|
||||
# For python-gnupg, we return the global instance
|
||||
# Armor and passphrase are handled per operation
|
||||
return gpg_instance
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create GPG context: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def find_key_by_email(email, secret=False):
|
||||
"""Find a GPG key by email address"""
|
||||
try:
|
||||
keys = gpg_instance.list_keys(secret=secret)
|
||||
for key in keys:
|
||||
for uid in key["uids"]:
|
||||
if email.lower() in uid.lower():
|
||||
return key
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to find key for {email}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# @app.route("/api/setup/gnupg", methods=["GET"])
|
||||
# def setup_gnupg():
|
||||
# """Check if GnuPG is available through python-gnupg"""
|
||||
# try:
|
||||
# # Try to get version info
|
||||
# version_info = gpg_instance.version
|
||||
# logger.info("GnuPG accessible through python-gnupg")
|
||||
# return jsonify(
|
||||
# {
|
||||
# "status": "GnuPG is available through python-gnupg",
|
||||
# "version": version_info,
|
||||
# }
|
||||
# )
|
||||
# except Exception as e:
|
||||
# logger.warning(f"GnuPG not accessible: {e}")
|
||||
# return (
|
||||
# jsonify(
|
||||
# {
|
||||
# "error": "GnuPG is not accessible. Please ensure it's properly installed."
|
||||
# }
|
||||
# ),
|
||||
# 404,
|
||||
# )
|
||||
|
||||
|
||||
@app.route("/api/generate-key", methods=["POST"])
|
||||
def generate_key():
|
||||
data = request.json
|
||||
|
||||
# Input validation
|
||||
if not data or not all(key in data for key in ["name", "email", "passphrase"]):
|
||||
return jsonify({"error": "Missing required fields"}), 400
|
||||
|
||||
name = data["name"].strip()
|
||||
email = data["email"].strip()
|
||||
comment = data.get("comment", "").strip()
|
||||
passphrase = data["passphrase"]
|
||||
|
||||
# Validate inputs
|
||||
if not valid_email(email):
|
||||
return jsonify({"error": "Invalid email format"}), 400
|
||||
|
||||
if len(name) < 2 or len(name) > 100:
|
||||
return jsonify({"error": "Name must be between 2 and 100 characters"}), 400
|
||||
|
||||
if len(passphrase) < 8:
|
||||
return jsonify({"error": "Passphrase must be at least 8 characters"}), 400
|
||||
|
||||
try:
|
||||
# Generate key using python-gnupg
|
||||
input_data = gpg_instance.gen_key_input(
|
||||
name_real=name,
|
||||
name_email=email,
|
||||
name_comment=comment if comment else None,
|
||||
key_type="RSA",
|
||||
key_length=4096,
|
||||
expire_date=0, # No expiration
|
||||
passphrase=passphrase,
|
||||
)
|
||||
|
||||
result = gpg_instance.gen_key(input_data)
|
||||
|
||||
if result.status == "ok":
|
||||
logger.info(f"GPG key generated successfully for {email}")
|
||||
return jsonify(
|
||||
{
|
||||
"status": "Key generated successfully",
|
||||
"fingerprint": str(result.fingerprint),
|
||||
}
|
||||
)
|
||||
else:
|
||||
logger.error(f"Key generation failed for {email}: {result.stderr}")
|
||||
return (
|
||||
jsonify({"error": "Key generation failed", "details": result.stderr}),
|
||||
500,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Key generation failed for {email}: {e}")
|
||||
return jsonify({"error": "Key generation failed", "details": str(e)}), 500
|
||||
|
||||
|
||||
@app.route("/api/download/public-key", methods=["POST"])
|
||||
def export_key():
|
||||
email = request.json.get("email")
|
||||
|
||||
if not email:
|
||||
return jsonify({"error": "Email parameter is required"}), 400
|
||||
|
||||
if not valid_email(email):
|
||||
return jsonify({"error": "Invalid email format"}), 400
|
||||
|
||||
try:
|
||||
# Find the key by email
|
||||
key = find_key_by_email(email, secret=False)
|
||||
if not key:
|
||||
return jsonify({"error": "No public key found for this email"}), 404
|
||||
|
||||
# Export the key to a string
|
||||
exported_key = gpg_instance.export_keys(key["fingerprint"], armor=True)
|
||||
|
||||
if not exported_key:
|
||||
return jsonify({"error": "Failed to export key"}), 500
|
||||
|
||||
# Create an in-memory stream
|
||||
file_stream = io.BytesIO(exported_key.encode("utf-8"))
|
||||
filename = f"{secure_filename(email)}_public.asc"
|
||||
|
||||
logger.info(f"Public key exported for {email}")
|
||||
return send_file(
|
||||
file_stream,
|
||||
as_attachment=True,
|
||||
download_name=filename,
|
||||
mimetype="application/pgp-keys",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Export failed for {email}: {e}")
|
||||
return jsonify({"error": "Export failed"}), 500
|
||||
|
||||
|
||||
@app.route("/api/upload-decrypt", methods=["POST"])
|
||||
def upload_and_decrypt():
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "Missing file"}), 400
|
||||
file = request.files["file"]
|
||||
passphrase = request.form.get("passphrase")
|
||||
|
||||
if not file.filename:
|
||||
return jsonify({"error": "No file selected"}), 400
|
||||
|
||||
if not passphrase:
|
||||
return jsonify({"error": "Passphrase is required"}), 400
|
||||
|
||||
filename = secure_filename(file.filename)
|
||||
if not filename.endswith(".gpg"):
|
||||
return jsonify({"error": "File must have .gpg extension"}), 400
|
||||
|
||||
try:
|
||||
# Read the encrypted file directly into memory
|
||||
encrypted_data = file.read()
|
||||
|
||||
# Decrypt the data using python-gnupg
|
||||
result = gpg_instance.decrypt(encrypted_data, passphrase=passphrase)
|
||||
|
||||
if not result.ok:
|
||||
logger.error(f"Decryption failed for {filename}: {result.stderr}")
|
||||
return (
|
||||
jsonify({"error": "Decryption failed", "details": result.stderr}),
|
||||
500,
|
||||
)
|
||||
|
||||
# Create an in-memory stream for the decrypted data
|
||||
base_name = filename[:-4] if filename.endswith(".gpg") else filename
|
||||
output_filename = f"decrypted_{base_name}.zip"
|
||||
|
||||
decrypted_stream = io.BytesIO(result.data)
|
||||
|
||||
logger.info(f"File {filename} decrypted successfully")
|
||||
return send_file(
|
||||
decrypted_stream,
|
||||
as_attachment=True,
|
||||
download_name=output_filename,
|
||||
mimetype="application/zip",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Decryption failed for {filename}: {e}")
|
||||
return jsonify({"error": "Decryption failed", "details": str(e)}), 500
|
||||
|
||||
|
||||
@app.route("/api/encrypt-file", methods=["POST"])
|
||||
def encrypt_file():
|
||||
if "file" not in request.files:
|
||||
return jsonify({"error": "Missing file"}), 400
|
||||
|
||||
file = request.files["file"]
|
||||
email = request.form.get("email")
|
||||
|
||||
if not file.filename:
|
||||
return jsonify({"error": "No file selected"}), 400
|
||||
|
||||
if not email:
|
||||
return jsonify({"error": "Email parameter is required"}), 400
|
||||
|
||||
if not valid_email(email):
|
||||
return jsonify({"error": "Invalid email format"}), 400
|
||||
|
||||
try:
|
||||
# Find the public key by email
|
||||
key = find_key_by_email(email, secret=False)
|
||||
if not key:
|
||||
return jsonify({"error": "No public key found for this email"}), 404
|
||||
|
||||
# Read the file data
|
||||
file_data = file.read()
|
||||
|
||||
# Encrypt the file using the public key
|
||||
result = gpg_instance.encrypt(
|
||||
file_data,
|
||||
recipients=[key["fingerprint"]],
|
||||
armor=False,
|
||||
always_trust=True,
|
||||
)
|
||||
|
||||
if not result.ok:
|
||||
logger.error(f"Encryption failed for {file.filename}: {result.stderr}")
|
||||
return (
|
||||
jsonify({"error": "Encryption failed", "details": result.stderr}),
|
||||
500,
|
||||
)
|
||||
|
||||
# Create filename for encrypted file
|
||||
filename = secure_filename(file.filename)
|
||||
encrypted_filename = f"{filename}.gpg"
|
||||
|
||||
# Create an in-memory stream for the encrypted data
|
||||
encrypted_stream = io.BytesIO(result.data)
|
||||
|
||||
logger.info(f"File {filename} encrypted successfully for {email}")
|
||||
return send_file(
|
||||
encrypted_stream,
|
||||
as_attachment=True,
|
||||
download_name=encrypted_filename,
|
||||
mimetype="application/pgp-encrypted",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Encryption failed for {file.filename}: {e}")
|
||||
return jsonify({"error": "Encryption failed", "details": str(e)}), 500
|
||||
|
||||
|
||||
# @app.route("/api/list-keys", methods=["GET"])
|
||||
# def list_keys():
|
||||
# """List all available GPG keys"""
|
||||
# try:
|
||||
# keys = []
|
||||
# public_keys = gpg_instance.list_keys()
|
||||
# private_keys = gpg_instance.list_keys(secret=True)
|
||||
|
||||
# for key in public_keys:
|
||||
# key_info = {
|
||||
# "fingerprint": key.get("fingerprint", ""),
|
||||
# "keyid": key.get("keyid", ""),
|
||||
# "type": key.get("type", ""),
|
||||
# "length": key.get("length", ""),
|
||||
# "date": key.get("date", ""),
|
||||
# "expires": key.get("expires", ""),
|
||||
# "uids": key.get("uids", []),
|
||||
# "trust": key.get("trust", ""),
|
||||
# "secret": any(
|
||||
# priv_key["fingerprint"] == key.get("fingerprint", "")
|
||||
# for priv_key in private_keys
|
||||
# ),
|
||||
# }
|
||||
# keys.append(key_info)
|
||||
|
||||
# logger.info(f"Listed {len(keys)} keys")
|
||||
# return jsonify({"keys": keys, "count": len(keys)})
|
||||
# except Exception as e:
|
||||
# logger.error(f"Failed to list keys: {e}")
|
||||
# return jsonify({"error": "Failed to list keys", "details": str(e)}), 500
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=8080, debug=True)
|
||||
Reference in New Issue
Block a user