Compare commits
4 Commits
4be72c9a5a
...
276fceee68
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
276fceee68 | ||
|
|
4dc05294ff | ||
|
|
bcc6ea2951 | ||
|
|
aaabe83379 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -168,3 +168,6 @@ cython_debug/
|
|||||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||||
#.idea/
|
#.idea/
|
||||||
|
|
||||||
|
|
||||||
|
tmp/
|
||||||
|
tests/resources/*.gpg
|
||||||
|
|||||||
336
app.py
Normal file
336
app.py
Normal file
@@ -0,0 +1,336 @@
|
|||||||
|
# gpg_api.py
|
||||||
|
|
||||||
|
from flask import Flask, Request, request, jsonify, send_file
|
||||||
|
import gnupg
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from werkzeug.utils import secure_filename
|
||||||
|
import io
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
app.config
|
||||||
|
|
||||||
|
UPLOAD_DIR = os.environ.get("GPG_UPLOAD_DIR", "tmp/gpg_files")
|
||||||
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
||||||
|
os.chmod(UPLOAD_DIR, 0o700) # Ensure proper permissions
|
||||||
|
|
||||||
|
GPG_KEYRING_DIR = os.environ.get("GPG_KEYRING_DIR", "tmp/gpg_keyring")
|
||||||
|
os.makedirs(GPG_KEYRING_DIR, exist_ok=True)
|
||||||
|
os.chmod(GPG_KEYRING_DIR, 0o700) # Ensure proper permissions
|
||||||
|
|
||||||
|
# Setup logging
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Initialize GPG instance
|
||||||
|
gpg_instance = gnupg.GPG(
|
||||||
|
gnupghome=GPG_KEYRING_DIR,
|
||||||
|
)
|
||||||
|
gpg_instance.encoding = "utf-8"
|
||||||
|
|
||||||
|
|
||||||
|
def valid_email(email):
|
||||||
|
"""Validate email format using regex pattern"""
|
||||||
|
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
|
||||||
|
return re.match(pattern, email) is not None
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_file(filepath):
|
||||||
|
"""Safely cleanup a file with error handling"""
|
||||||
|
try:
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
os.remove(filepath)
|
||||||
|
logger.info(f"Cleaned up file: {filepath}")
|
||||||
|
except OSError as e:
|
||||||
|
logger.warning(f"Failed to clean up {filepath}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def get_gpg_context(armor=False, passphrase=None):
|
||||||
|
"""Create a GPG context with common settings"""
|
||||||
|
try:
|
||||||
|
# For python-gnupg, we return the global instance
|
||||||
|
# Armor and passphrase are handled per operation
|
||||||
|
return gpg_instance
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create GPG context: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def find_key_by_email(email, secret=False):
|
||||||
|
"""Find a GPG key by email address"""
|
||||||
|
try:
|
||||||
|
keys = gpg_instance.list_keys(secret=secret)
|
||||||
|
for key in keys:
|
||||||
|
for uid in key["uids"]:
|
||||||
|
if email.lower() in uid.lower():
|
||||||
|
return key
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to find key for {email}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# @app.route("/api/setup/gnupg", methods=["GET"])
|
||||||
|
# def setup_gnupg():
|
||||||
|
# """Check if GnuPG is available through python-gnupg"""
|
||||||
|
# try:
|
||||||
|
# # Try to get version info
|
||||||
|
# version_info = gpg_instance.version
|
||||||
|
# logger.info("GnuPG accessible through python-gnupg")
|
||||||
|
# return jsonify(
|
||||||
|
# {
|
||||||
|
# "status": "GnuPG is available through python-gnupg",
|
||||||
|
# "version": version_info,
|
||||||
|
# }
|
||||||
|
# )
|
||||||
|
# except Exception as e:
|
||||||
|
# logger.warning(f"GnuPG not accessible: {e}")
|
||||||
|
# return (
|
||||||
|
# jsonify(
|
||||||
|
# {
|
||||||
|
# "error": "GnuPG is not accessible. Please ensure it's properly installed."
|
||||||
|
# }
|
||||||
|
# ),
|
||||||
|
# 404,
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/api/generate-key", methods=["POST"])
|
||||||
|
def generate_key():
|
||||||
|
data = request.json
|
||||||
|
|
||||||
|
# Input validation
|
||||||
|
if not data or not all(key in data for key in ["name", "email", "passphrase"]):
|
||||||
|
return jsonify({"error": "Missing required fields"}), 400
|
||||||
|
|
||||||
|
name = data["name"].strip()
|
||||||
|
email = data["email"].strip()
|
||||||
|
comment = data.get("comment", "").strip()
|
||||||
|
passphrase = data["passphrase"]
|
||||||
|
|
||||||
|
# Validate inputs
|
||||||
|
if not valid_email(email):
|
||||||
|
return jsonify({"error": "Invalid email format"}), 400
|
||||||
|
|
||||||
|
if len(name) < 2 or len(name) > 100:
|
||||||
|
return jsonify({"error": "Name must be between 2 and 100 characters"}), 400
|
||||||
|
|
||||||
|
if len(passphrase) < 8:
|
||||||
|
return jsonify({"error": "Passphrase must be at least 8 characters"}), 400
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Generate key using python-gnupg
|
||||||
|
input_data = gpg_instance.gen_key_input(
|
||||||
|
name_real=name,
|
||||||
|
name_email=email,
|
||||||
|
name_comment=comment if comment else None,
|
||||||
|
key_type="RSA",
|
||||||
|
key_length=4096,
|
||||||
|
expire_date=0, # No expiration
|
||||||
|
passphrase=passphrase,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = gpg_instance.gen_key(input_data)
|
||||||
|
|
||||||
|
if result.status == "ok":
|
||||||
|
logger.info(f"GPG key generated successfully for {email}")
|
||||||
|
return jsonify(
|
||||||
|
{
|
||||||
|
"status": "Key generated successfully",
|
||||||
|
"fingerprint": str(result.fingerprint),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.error(f"Key generation failed for {email}: {result.stderr}")
|
||||||
|
return (
|
||||||
|
jsonify({"error": "Key generation failed", "details": result.stderr}),
|
||||||
|
500,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Key generation failed for {email}: {e}")
|
||||||
|
return jsonify({"error": "Key generation failed", "details": str(e)}), 500
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/api/download/public-key", methods=["POST"])
|
||||||
|
def export_key():
|
||||||
|
email = request.json.get("email")
|
||||||
|
|
||||||
|
if not email:
|
||||||
|
return jsonify({"error": "Email parameter is required"}), 400
|
||||||
|
|
||||||
|
if not valid_email(email):
|
||||||
|
return jsonify({"error": "Invalid email format"}), 400
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Find the key by email
|
||||||
|
key = find_key_by_email(email, secret=False)
|
||||||
|
if not key:
|
||||||
|
return jsonify({"error": "No public key found for this email"}), 404
|
||||||
|
|
||||||
|
# Export the key to a string
|
||||||
|
exported_key = gpg_instance.export_keys(key["fingerprint"], armor=True)
|
||||||
|
|
||||||
|
if not exported_key:
|
||||||
|
return jsonify({"error": "Failed to export key"}), 500
|
||||||
|
|
||||||
|
# Create an in-memory stream
|
||||||
|
file_stream = io.BytesIO(exported_key.encode("utf-8"))
|
||||||
|
filename = f"{secure_filename(email)}_public.asc"
|
||||||
|
|
||||||
|
logger.info(f"Public key exported for {email}")
|
||||||
|
return send_file(
|
||||||
|
file_stream,
|
||||||
|
as_attachment=True,
|
||||||
|
download_name=filename,
|
||||||
|
mimetype="application/pgp-keys",
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Export failed for {email}: {e}")
|
||||||
|
return jsonify({"error": "Export failed"}), 500
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/api/upload-decrypt", methods=["POST"])
|
||||||
|
def upload_and_decrypt():
|
||||||
|
if "file" not in request.files:
|
||||||
|
return jsonify({"error": "Missing file"}), 400
|
||||||
|
file = request.files["file"]
|
||||||
|
passphrase = request.form.get("passphrase")
|
||||||
|
|
||||||
|
if not file.filename:
|
||||||
|
return jsonify({"error": "No file selected"}), 400
|
||||||
|
|
||||||
|
if not passphrase:
|
||||||
|
return jsonify({"error": "Passphrase is required"}), 400
|
||||||
|
|
||||||
|
filename = secure_filename(file.filename)
|
||||||
|
if not filename.endswith(".gpg"):
|
||||||
|
return jsonify({"error": "File must have .gpg extension"}), 400
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read the encrypted file directly into memory
|
||||||
|
encrypted_data = file.read()
|
||||||
|
|
||||||
|
# Decrypt the data using python-gnupg
|
||||||
|
result = gpg_instance.decrypt(encrypted_data, passphrase=passphrase)
|
||||||
|
|
||||||
|
if not result.ok:
|
||||||
|
logger.error(f"Decryption failed for {filename}: {result.stderr}")
|
||||||
|
return (
|
||||||
|
jsonify({"error": "Decryption failed", "details": result.stderr}),
|
||||||
|
500,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create an in-memory stream for the decrypted data
|
||||||
|
base_name = filename[:-4] if filename.endswith(".gpg") else filename
|
||||||
|
output_filename = f"decrypted_{base_name}.zip"
|
||||||
|
|
||||||
|
decrypted_stream = io.BytesIO(result.data)
|
||||||
|
|
||||||
|
logger.info(f"File {filename} decrypted successfully")
|
||||||
|
return send_file(
|
||||||
|
decrypted_stream,
|
||||||
|
as_attachment=True,
|
||||||
|
download_name=output_filename,
|
||||||
|
mimetype="application/zip",
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Decryption failed for {filename}: {e}")
|
||||||
|
return jsonify({"error": "Decryption failed", "details": str(e)}), 500
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/api/encrypt-file", methods=["POST"])
|
||||||
|
def encrypt_file():
|
||||||
|
if "file" not in request.files:
|
||||||
|
return jsonify({"error": "Missing file"}), 400
|
||||||
|
|
||||||
|
file = request.files["file"]
|
||||||
|
email = request.form.get("email")
|
||||||
|
|
||||||
|
if not file.filename:
|
||||||
|
return jsonify({"error": "No file selected"}), 400
|
||||||
|
|
||||||
|
if not email:
|
||||||
|
return jsonify({"error": "Email parameter is required"}), 400
|
||||||
|
|
||||||
|
if not valid_email(email):
|
||||||
|
return jsonify({"error": "Invalid email format"}), 400
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Find the public key by email
|
||||||
|
key = find_key_by_email(email, secret=False)
|
||||||
|
if not key:
|
||||||
|
return jsonify({"error": "No public key found for this email"}), 404
|
||||||
|
|
||||||
|
# Read the file data
|
||||||
|
file_data = file.read()
|
||||||
|
|
||||||
|
# Encrypt the file using the public key
|
||||||
|
result = gpg_instance.encrypt(
|
||||||
|
file_data,
|
||||||
|
recipients=[key["fingerprint"]],
|
||||||
|
armor=False,
|
||||||
|
always_trust=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not result.ok:
|
||||||
|
logger.error(f"Encryption failed for {file.filename}: {result.stderr}")
|
||||||
|
return (
|
||||||
|
jsonify({"error": "Encryption failed", "details": result.stderr}),
|
||||||
|
500,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create filename for encrypted file
|
||||||
|
filename = secure_filename(file.filename)
|
||||||
|
encrypted_filename = f"{filename}.gpg"
|
||||||
|
|
||||||
|
# Create an in-memory stream for the encrypted data
|
||||||
|
encrypted_stream = io.BytesIO(result.data)
|
||||||
|
|
||||||
|
logger.info(f"File {filename} encrypted successfully for {email}")
|
||||||
|
return send_file(
|
||||||
|
encrypted_stream,
|
||||||
|
as_attachment=True,
|
||||||
|
download_name=encrypted_filename,
|
||||||
|
mimetype="application/pgp-encrypted",
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Encryption failed for {file.filename}: {e}")
|
||||||
|
return jsonify({"error": "Encryption failed", "details": str(e)}), 500
|
||||||
|
|
||||||
|
|
||||||
|
# @app.route("/api/list-keys", methods=["GET"])
|
||||||
|
# def list_keys():
|
||||||
|
# """List all available GPG keys"""
|
||||||
|
# try:
|
||||||
|
# keys = []
|
||||||
|
# public_keys = gpg_instance.list_keys()
|
||||||
|
# private_keys = gpg_instance.list_keys(secret=True)
|
||||||
|
|
||||||
|
# for key in public_keys:
|
||||||
|
# key_info = {
|
||||||
|
# "fingerprint": key.get("fingerprint", ""),
|
||||||
|
# "keyid": key.get("keyid", ""),
|
||||||
|
# "type": key.get("type", ""),
|
||||||
|
# "length": key.get("length", ""),
|
||||||
|
# "date": key.get("date", ""),
|
||||||
|
# "expires": key.get("expires", ""),
|
||||||
|
# "uids": key.get("uids", []),
|
||||||
|
# "trust": key.get("trust", ""),
|
||||||
|
# "secret": any(
|
||||||
|
# priv_key["fingerprint"] == key.get("fingerprint", "")
|
||||||
|
# for priv_key in private_keys
|
||||||
|
# ),
|
||||||
|
# }
|
||||||
|
# keys.append(key_info)
|
||||||
|
|
||||||
|
# logger.info(f"Listed {len(keys)} keys")
|
||||||
|
# return jsonify({"keys": keys, "count": len(keys)})
|
||||||
|
# except Exception as e:
|
||||||
|
# logger.error(f"Failed to list keys: {e}")
|
||||||
|
# return jsonify({"error": "Failed to list keys", "details": str(e)}), 500
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
app.run(host="0.0.0.0", port=8080, debug=True)
|
||||||
@@ -1,90 +0,0 @@
|
|||||||
# gpg_api.py
|
|
||||||
|
|
||||||
from flask import Flask, request, jsonify, send_file
|
|
||||||
import subprocess
|
|
||||||
import tempfile
|
|
||||||
import os
|
|
||||||
from werkzeug.utils import secure_filename
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
UPLOAD_DIR = "/tmp/gpg_files"
|
|
||||||
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
||||||
|
|
||||||
@app.route("/api/setup/gnupg", methods=["GET"])
|
|
||||||
def setup_gnupg():
|
|
||||||
try:
|
|
||||||
subprocess.run(["gpg", "--version"], check=True)
|
|
||||||
return jsonify({"status": "GnuPG is already installed"})
|
|
||||||
except Exception:
|
|
||||||
subprocess.run(["apt", "update"], check=True)
|
|
||||||
subprocess.run(["apt", "install", "-y", "gnupg"], check=True)
|
|
||||||
return jsonify({"status": "GnuPG installed"})
|
|
||||||
|
|
||||||
@app.route("/api/generate-key", methods=["POST"])
|
|
||||||
def generate_key():
|
|
||||||
data = request.json
|
|
||||||
name = data['name']
|
|
||||||
email = data['email']
|
|
||||||
comment = data.get('comment', '')
|
|
||||||
passphrase = data['passphrase']
|
|
||||||
|
|
||||||
key_input = f"""
|
|
||||||
%echo Generating GPG Key
|
|
||||||
Key-Type: RSA
|
|
||||||
Key-Length: 4096
|
|
||||||
Name-Real: {name}
|
|
||||||
Name-Email: {email}
|
|
||||||
Name-Comment: {comment}
|
|
||||||
Expire-Date: 0
|
|
||||||
Passphrase: {passphrase}
|
|
||||||
%commit
|
|
||||||
%echo Done
|
|
||||||
"""
|
|
||||||
|
|
||||||
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as f:
|
|
||||||
f.write(key_input)
|
|
||||||
keyfile_path = f.name
|
|
||||||
|
|
||||||
try:
|
|
||||||
subprocess.run(["gpg", "--batch", "--generate-key", keyfile_path], check=True)
|
|
||||||
return jsonify({"status": "Key generated successfully"})
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
return jsonify({"error": "Key generation failed", "details": str(e)}), 500
|
|
||||||
finally:
|
|
||||||
os.remove(keyfile_path)
|
|
||||||
|
|
||||||
@app.route("/api/download/public-key", methods=["GET"])
|
|
||||||
def export_key():
|
|
||||||
email = request.args.get('email')
|
|
||||||
filename = os.path.join(UPLOAD_DIR, f"{secure_filename(email)}_public.asc")
|
|
||||||
try:
|
|
||||||
subprocess.run(["gpg", "--armor", "--output", filename, "--export", email], check=True)
|
|
||||||
return send_file(filename, as_attachment=True)
|
|
||||||
except Exception as e:
|
|
||||||
return jsonify({"error": "Export failed", "details": str(e)}), 500
|
|
||||||
|
|
||||||
@app.route("/api/upload-decrypt", methods=["POST"])
|
|
||||||
def upload_and_decrypt():
|
|
||||||
if 'file' not in request.files:
|
|
||||||
return jsonify({"error": "Missing file"}), 400
|
|
||||||
file = request.files['file']
|
|
||||||
passphrase = request.form.get('passphrase')
|
|
||||||
|
|
||||||
filename = secure_filename(file.filename)
|
|
||||||
gpg_path = os.path.join(UPLOAD_DIR, filename)
|
|
||||||
output_path = os.path.join(UPLOAD_DIR, f"decrypted_{filename[:-4]}.zip")
|
|
||||||
file.save(gpg_path)
|
|
||||||
|
|
||||||
try:
|
|
||||||
subprocess.run([
|
|
||||||
"gpg", "--batch", "--yes",
|
|
||||||
"--passphrase", passphrase,
|
|
||||||
"--output", output_path,
|
|
||||||
"--decrypt", gpg_path
|
|
||||||
], check=True)
|
|
||||||
return send_file(output_path, as_attachment=True)
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
return jsonify({"error": "Decryption failed", "details": str(e)}), 500
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
app.run(host='0.0.0.0', port=8080, debug=True)
|
|
||||||
3
requirements.txt
Normal file
3
requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
Flask==3.1.1
|
||||||
|
Werkzeug==3.1.3
|
||||||
|
python-gnupg==0.4.5
|
||||||
1
tests/resources/testing_file.txt
Normal file
1
tests/resources/testing_file.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Test text
|
||||||
144
tests/test_api.py
Normal file
144
tests/test_api.py
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test script for the GPG API using python-gpg library
|
||||||
|
"""
|
||||||
|
|
||||||
|
from app import UPLOAD_DIR, GPG_KEYRING_DIR, app
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import gnupg
|
||||||
|
|
||||||
|
|
||||||
|
class TestGPGAPI(unittest.TestCase):
|
||||||
|
"""Test class for GPG API endpoints"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
"""Set up test fixtures before each test method."""
|
||||||
|
cls.test_name = "Test User"
|
||||||
|
cls.test_email = "test@example.com"
|
||||||
|
cls.test_passphrase = "testpassphrase123"
|
||||||
|
cls.app = app.test_client() # Create a test client for the Flask app
|
||||||
|
cls.app.testing = True
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
"""Clean up after all tests in this class."""
|
||||||
|
print("\nTearing down test class...")
|
||||||
|
try:
|
||||||
|
if os.path.exists(UPLOAD_DIR):
|
||||||
|
shutil.rmtree(UPLOAD_DIR) # Remove directory and all contents
|
||||||
|
if os.path.exists(GPG_KEYRING_DIR):
|
||||||
|
shutil.rmtree(GPG_KEYRING_DIR) # Remove directory and all contents
|
||||||
|
except OSError as e:
|
||||||
|
print(f"Warning: Failed to clean up directories: {e}")
|
||||||
|
print("Test class teardown complete.")
|
||||||
|
|
||||||
|
def test_1_key_generation(self):
|
||||||
|
"""Test key generation"""
|
||||||
|
print("\nTesting key generation...")
|
||||||
|
|
||||||
|
key_data = {
|
||||||
|
"name": "Test User",
|
||||||
|
"email": self.test_email,
|
||||||
|
"comment": "Test Key",
|
||||||
|
"passphrase": self.test_passphrase,
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.app.post("/api/generate-key", json=key_data)
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
def test_2_export_key(self):
|
||||||
|
"""Test key export"""
|
||||||
|
print("\nTesting key export...")
|
||||||
|
response = self.app.post(
|
||||||
|
"/api/download/public-key", json={"email": self.test_email}
|
||||||
|
)
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
def test_3_encrypt_file(self):
|
||||||
|
"""Test file encryption using the API"""
|
||||||
|
print("\nTesting file encryption...")
|
||||||
|
|
||||||
|
# Test file path
|
||||||
|
test_file_path = Path(__file__).parent / "resources" / "testing_file.txt"
|
||||||
|
if not test_file_path.exists():
|
||||||
|
print(f"Warning: Test file not found at {test_file_path}")
|
||||||
|
self.skipTest("Test file not found")
|
||||||
|
|
||||||
|
# Encrypt file using the API
|
||||||
|
with open(test_file_path, "rb") as f:
|
||||||
|
data = {"email": self.test_email, "file": (f, "testing_file.txt")}
|
||||||
|
response = self.app.post("/api/encrypt-file", data=data)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(f"Encryption Status: {response.status_code}")
|
||||||
|
print(f"Response: {response.get_json()}")
|
||||||
|
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
# Save the encrypted file for the decrypt test
|
||||||
|
encrypted_file_path = (
|
||||||
|
Path(__file__).parent / "resources" / "testing_encrypted.gpg"
|
||||||
|
)
|
||||||
|
with open(encrypted_file_path, "wb") as f:
|
||||||
|
f.write(response.data)
|
||||||
|
|
||||||
|
return encrypted_file_path
|
||||||
|
|
||||||
|
def test_4_decrypt_file(self):
|
||||||
|
"""Test file decryption"""
|
||||||
|
print("\nTesting file decryption...")
|
||||||
|
|
||||||
|
# First encrypt a file using the API
|
||||||
|
encrypted_file_path = self.test_3_encrypt_file()
|
||||||
|
|
||||||
|
# Now decrypt it using the decrypt endpoint
|
||||||
|
with open(encrypted_file_path, "rb") as f:
|
||||||
|
data = {
|
||||||
|
"passphrase": self.test_passphrase,
|
||||||
|
"file": (f, "api_encrypted.gpg"),
|
||||||
|
}
|
||||||
|
response = self.app.post("/api/upload-decrypt", data=data)
|
||||||
|
|
||||||
|
print(f"Decryption Status: {response.status_code}")
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(f"Response: {response.get_json()}")
|
||||||
|
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
|
# tests = [
|
||||||
|
# test_setup,
|
||||||
|
# test_key_generation,
|
||||||
|
# test_list_keys,
|
||||||
|
# test_export_key,
|
||||||
|
# test_text_encryption,
|
||||||
|
# ]
|
||||||
|
#
|
||||||
|
# passed = 0
|
||||||
|
# for test in tests:
|
||||||
|
# try:
|
||||||
|
# if test():
|
||||||
|
# passed += 1
|
||||||
|
# print("✓ PASSED")
|
||||||
|
# else:
|
||||||
|
# print("✗ FAILED")
|
||||||
|
# except Exception as e:
|
||||||
|
# print(f"✗ ERROR: {e}")
|
||||||
|
# print("-" * 50)
|
||||||
|
#
|
||||||
|
# print(f"\nTest Results: {passed}/{len(tests)} tests passed")
|
||||||
|
#
|
||||||
|
# if passed == len(tests):
|
||||||
|
# print("🎉 All tests passed!")
|
||||||
|
# else:
|
||||||
|
# print("❌ Some tests failed")
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# if __name__ == "__main__":
|
||||||
|
# sys.exit(main())
|
||||||
Reference in New Issue
Block a user