Slide 1: Security Fundamentals in Python
Security in Python web development begins with understanding core principles of input validation, sanitization, and secure data handling. Modern web applications face sophisticated attack vectors requiring robust defensive programming practices implemented from the ground up.
# Basic input validation and sanitization example
import re
from html import escape
def sanitize_input(user_input: str) -> str:
# Remove any non-alphanumeric characters except basic punctuation
cleaned = re.sub(r'[^a-zA-Z0-9\s\-\_\.]', '', user_input)
# Escape HTML special characters
escaped = escape(cleaned)
# Limit input length to prevent DOS
return escaped[:100]
# Example usage
malicious_input = '<script>alert("xss")</script>'
print(f"Original input: {malicious_input}")
print(f"Sanitized input: {sanitize_input(malicious_input)}")
# Output:
# Original input: <script>alert("xss")</script>
# Sanitized input: scriptalertxssscriptSlide 2: Secure Password Hashing
Modern password security requires proper hashing using cryptographically secure algorithms with salting. Argon2 is currently the recommended algorithm, but bcrypt and PBKDF2 are also acceptable alternatives when implemented correctly.
import hashlib
import os
from base64 import b64encode
def hash_password(password: str) -> tuple[str, str]:
# Generate a random salt
salt = os.urandom(32)
# Use PBKDF2 with 100,000 iterations
key = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt,
100000
)
return b64encode(salt).decode('utf-8'), b64encode(key).decode('utf-8')
def verify_password(password: str, salt: str, key: str) -> bool:
salt_bytes = b64encode(salt.encode('utf-8'))
new_key = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt_bytes,
100000
)
return b64encode(new_key).decode('utf-8') == key
# Example usage
password = "MySecurePassword123"
salt, key = hash_password(password)
print(f"Salt: {salt}\nKey: {key}")
print(f"Verification: {verify_password(password, salt, key)}")Slide 3: Secure Session Management
Session management is crucial for web application security. This involves generating secure session tokens, implementing proper timeout mechanisms, and protecting against session hijacking attempts through secure cookie handling.
from secrets import token_urlsafe
from time import time
from typing import Dict, Optional
class SecureSessionManager:
def __init__(self, session_timeout: int = 3600):
self.sessions: Dict[str, dict] = {}
self.timeout = session_timeout
def create_session(self, user_id: str) -> str:
# Generate cryptographically secure token
session_id = token_urlsafe(32)
self.sessions[session_id] = {
'user_id': user_id,
'created_at': time(),
'last_accessed': time()
}
return session_id
def validate_session(self, session_id: str) -> Optional[str]:
session = self.sessions.get(session_id)
if not session:
return None
current_time = time()
if current_time - session['last_accessed'] > self.timeout:
del self.sessions[session_id]
return None
session['last_accessed'] = current_time
return session['user_id']
# Example usage
session_mgr = SecureSessionManager()
session_id = session_mgr.create_session("user123")
print(f"Created session: {session_id}")
print(f"Valid user: {session_mgr.validate_session(session_id)}")Slide 4: XSS Prevention
Cross-Site Scripting (XSS) remains one of the most prevalent web security vulnerabilities. Python web developers must implement context-aware output encoding and Content Security Policy (CSP) headers to prevent both reflected and stored XSS attacks.
from typing import Dict
from html import escape
import json
class XSSProtection:
def __init__(self):
self.csp_directives = {
'default-src': ["'self'"],
'script-src': ["'self'"],
'style-src': ["'self'"],
'img-src': ["'self'"],
'connect-src': ["'self'"]
}
def encode_html_content(self, content: str) -> str:
return escape(content, quote=True)
def encode_json_content(self, data: Dict) -> str:
# Encode < and > to prevent breaking out of <script> contexts
return json.dumps(data).replace('<', '\\u003c').replace('>', '\\u003e')
def get_csp_header(self) -> str:
return '; '.join(
f"{key} {' '.join(values)}"
for key, values in self.csp_directives.items()
)
# Example usage
xss_protection = XSSProtection()
# Example data
user_input = '<img src="x" onerror="alert(1)">'
json_data = {"message": "<script>evil()</script>"}
print("Original:", user_input)
print("HTML Encoded:", xss_protection.encode_html_content(user_input))
print("JSON Encoded:", xss_protection.encode_json_content(json_data))
print("CSP Header:", xss_protection.get_csp_header())Slide 5: CSRF Protection Implementation
CSRF (Cross-Site Request Forgery) attacks exploit the trust that a web application has in a user's browser. A robust CSRF protection system must generate and validate unique tokens for each session and form submission.
import hmac
import hashlib
from time import time
from base64 import b64encode, b64decode
class CSRFProtection:
def __init__(self, secret_key: str, token_timeout: int = 3600):
self.secret_key = secret_key.encode('utf-8')
self.timeout = token_timeout
def generate_token(self, session_id: str) -> str:
timestamp = str(int(time()))
msg = f"{session_id}:{timestamp}".encode('utf-8')
signature = hmac.new(
self.secret_key,
msg,
hashlib.sha256
).digest()
token = b64encode(f"{timestamp}:{signature}".encode('utf-8'))
return token.decode('utf-8')
def validate_token(self, session_id: str, token: str) -> bool:
try:
decoded = b64decode(token.encode('utf-8')).decode('utf-8')
timestamp, signature = decoded.split(':', 1)
# Check token age
if int(time()) - int(timestamp) > self.timeout:
return False
# Verify signature
expected_msg = f"{session_id}:{timestamp}".encode('utf-8')
expected_sig = hmac.new(
self.secret_key,
expected_msg,
hashlib.sha256
).digest()
return hmac.compare_digest(signature.encode('utf-8'), expected_sig)
except Exception:
return False
# Example usage
csrf = CSRFProtection("your-secret-key-here")
session_id = "user_session_123"
token = csrf.generate_token(session_id)
print(f"Generated Token: {token}")
print(f"Token Valid: {csrf.validate_token(session_id, token)}")
print(f"Invalid Token Test: {csrf.validate_token(session_id, 'invalid-token')}")Slide 6: SQL Injection Prevention
SQL injection remains a critical vulnerability in web applications. Proper parameterization of queries and input validation are essential defenses against SQL injection attacks. Never construct queries through string concatenation.
import sqlite3
from typing import List, Any, Tuple
class SecureDatabase:
def __init__(self, db_path: str):
self.db_path = db_path
def execute_parameterized_query(
self,
query: str,
params: Tuple[Any, ...] = ()
) -> List[Any]:
with sqlite3.connect(self.db_path) as conn:
# Use parameterized queries to prevent SQL injection
cursor = conn.execute(query, params)
return cursor.fetchall()
def safe_insert_user(self, username: str, email: str) -> bool:
try:
query = """
INSERT INTO users (username, email)
VALUES (?, ?)
"""
self.execute_parameterized_query(query, (username, email))
return True
except sqlite3.IntegrityError:
return False
# Example usage
db = SecureDatabase(":memory:")
# Create table
db.execute_parameterized_query("""
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
email TEXT UNIQUE
)
""")
# Safe insertion
print(db.safe_insert_user("user1", "user1@example.com"))
# Attempted SQL injection
malicious_input = "admin'); DROP TABLE users; --"
print(db.safe_insert_user(malicious_input, "hack@evil.com"))
# Verify data
results = db.execute_parameterized_query(
"SELECT * FROM users WHERE username = ?",
(malicious_input,)
)
print(f"Query results: {results}")Slide 7: Secure File Operations
File operations in web applications require careful handling to prevent path traversal attacks and unauthorized access to system files. Implementation must include strict validation of file paths and proper sanitization of user-supplied filenames.
import os
from pathlib import Path
from typing import Optional
import magic # python-magic library for MIME type detection
class SecureFileHandler:
def __init__(self, upload_dir: str, allowed_extensions: set[str]):
self.upload_dir = Path(upload_dir).resolve()
self.allowed_extensions = allowed_extensions
def is_safe_path(self, filepath: str) -> bool:
try:
# Resolve the full path and check if it's within upload directory
full_path = Path(self.upload_dir / filepath).resolve()
return full_path.is_file() and full_path.parent == self.upload_dir
except (ValueError, RuntimeError):
return False
def secure_save_file(self, filename: str, content: bytes) -> Optional[str]:
# Sanitize filename
safe_filename = ''.join(c for c in filename if c.isalnum() or c in '._-')
if not any(safe_filename.lower().endswith(ext) for ext in self.allowed_extensions):
return None
filepath = self.upload_dir / safe_filename
# Verify MIME type
mime = magic.Magic(mime=True)
file_type = mime.from_buffer(content)
if file_type not in ['text/plain', 'application/pdf']:
return None
try:
with open(filepath, 'wb') as f:
f.write(content)
return safe_filename
except IOError:
return None
# Example usage
handler = SecureFileHandler(
upload_dir="/tmp/secure_uploads",
allowed_extensions={'.txt', '.pdf'}
)
# Create upload directory if it doesn't exist
os.makedirs("/tmp/secure_uploads", exist_ok=True)
# Test with safe file
safe_content = b"Hello, World!"
print(handler.secure_save_file("test.txt", safe_content))
# Test with path traversal attempt
malicious_filename = "../../../etc/passwd"
print(handler.secure_save_file(malicious_filename, safe_content))
# Test with disallowed extension
print(handler.secure_save_file("script.php", safe_content))Slide 8: Secure Random Number Generation
Cryptographically secure random number generation is essential for security-critical operations like token generation, key creation, and password reset functionality. Never use the standard random module for security purposes.
import secrets
from base64 import urlsafe_b64encode
from typing import Optional
class SecureTokenGenerator:
def __init__(self, min_entropy_bits: int = 256):
self.min_entropy_bits = min_entropy_bits
def generate_token(self, length: Optional[int] = None) -> str:
if length is None:
# Generate bytes with at least min_entropy_bits of entropy
num_bytes = (self.min_entropy_bits + 7) // 8
random_bytes = secrets.token_bytes(num_bytes)
else:
random_bytes = secrets.token_bytes(length)
return urlsafe_b64encode(random_bytes).decode('ascii').rstrip('=')
def generate_hex_token(self, length: int = 32) -> str:
return secrets.token_hex(length)
def secure_compare(self, token1: str, token2: str) -> bool:
return secrets.compare_digest(token1.encode(), token2.encode())
# Example usage
generator = SecureTokenGenerator()
# Generate various types of tokens
api_key = generator.generate_token()
reset_token = generator.generate_hex_token()
short_token = generator.generate_token(16)
print(f"API Key: {api_key}")
print(f"Reset Token: {reset_token}")
print(f"Short Token: {short_token}")
# Demonstrate secure comparison
original_token = generator.generate_token()
comparison_token = original_token
print(f"Tokens match: {generator.secure_compare(original_token, comparison_token)}")Slide 9: Rate Limiting Implementation
Rate limiting is crucial for preventing brute force attacks and DOS attempts. This implementation uses a token bucket algorithm to provide flexible rate limiting with burst allowance while maintaining thread safety.
import time
from threading import Lock
from collections import defaultdict
from typing import Dict, Tuple
class RateLimiter:
def __init__(self, rate: float, per: float = 60.0, burst: int = 1):
self.rate = rate
self.per = per
self.burst = burst
self._lock = Lock()
self._tokens: Dict[str, Tuple[float, float]] = defaultdict(
lambda: (burst, time.time())
)
def acquire(self, key: str = "default") -> bool:
with self._lock:
tokens, last_time = self._tokens[key]
now = time.time()
# Calculate token replenishment
time_passed = now - last_time
new_tokens = min(
self.burst,
tokens + (time_passed * (self.rate / self.per))
)
if new_tokens < 1:
return False
# Consume one token
self._tokens[key] = (new_tokens - 1, now)
return True
def get_remaining(self, key: str = "default") -> float:
with self._lock:
tokens, last_time = self._tokens[key]
now = time.time()
time_passed = now - last_time
return min(
self.burst,
tokens + (time_passed * (self.rate / self.per))
)
# Example usage
limiter = RateLimiter(rate=5, per=60, burst=3)
def simulate_requests(num_requests: int, key: str = "test_user"):
successful = 0
for i in range(num_requests):
if limiter.acquire(key):
successful += 1
print(f"Request {i+1}: {'Allowed' if successful else 'Blocked'} "
f"(Remaining: {limiter.get_remaining(key):.2f})")
time.sleep(0.1) # Simulate request timing
return successful
print("Testing rate limiter...")
allowed = simulate_requests(10)
print(f"Total requests allowed: {allowed}/10")Slide 10: Secure Logging Implementation
Secure logging is essential for audit trails and incident response. This implementation ensures logs are tamper-evident and prevents sensitive data exposure while maintaining comprehensive security event tracking.
import json
import hmac
import hashlib
from datetime import datetime
from typing import Any, Dict, Optional
import logging
from logging.handlers import RotatingFileHandler
class SecureLogger:
def __init__(
self,
log_file: str,
hmac_key: str,
max_bytes: int = 10485760, # 10MB
backup_count: int = 5
):
self.hmac_key = hmac_key.encode('utf-8')
# Configure logging
self.logger = logging.getLogger('secure_logger')
self.logger.setLevel(logging.INFO)
handler = RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count
)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def _generate_hmac(self, message: str) -> str:
return hmac.new(
self.hmac_key,
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
def _sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
sensitive_fields = {'password', 'token', 'secret', 'credit_card'}
sanitized = {}
for key, value in data.items():
if any(sens in key.lower() for sens in sensitive_fields):
sanitized[key] = '[REDACTED]'
else:
sanitized[key] = value
return sanitized
def log_security_event(
self,
event_type: str,
data: Dict[str, Any],
user_id: Optional[str] = None
) -> None:
timestamp = datetime.utcnow().isoformat()
sanitized_data = self._sanitize_data(data)
log_entry = {
'timestamp': timestamp,
'event_type': event_type,
'user_id': user_id,
'data': sanitized_data
}
log_str = json.dumps(log_entry, sort_keys=True)
hmac_signature = self._generate_hmac(log_str)
self.logger.info(f"{hmac_signature}|{log_str}")
# Example usage
secure_logger = SecureLogger('security.log', 'your-hmac-key-here')
# Log various security events
secure_logger.log_security_event(
'login_attempt',
{
'username': 'john_doe',
'password': 'secret123', # Will be redacted
'ip_address': '192.168.1.1'
},
user_id='user123'
)
secure_logger.log_security_event(
'file_access',
{
'filename': 'sensitive.pdf',
'action': 'download',
'token': 'abc123' # Will be redacted
},
user_id='user123'
)Slide 11: Secure Cookie Management
Secure cookie management is fundamental for web application security. This implementation provides methods for creating, validating, and managing secure HTTP cookies with proper encryption and signing mechanisms.
from cryptography.fernet import Fernet
import base64
import json
from typing import Optional, Any, Dict
from datetime import datetime, timedelta
class SecureCookieManager:
def __init__(self, encryption_key: bytes, signing_key: bytes):
self.fernet = Fernet(encryption_key)
self.signing_key = signing_key
def create_secure_cookie(
self,
name: str,
data: Dict[str, Any],
expires_in: timedelta = timedelta(hours=1),
httponly: bool = True,
secure: bool = True
) -> Dict[str, str]:
# Add timestamp and expiration
cookie_data = {
'data': data,
'created': datetime.utcnow().isoformat(),
'expires': (datetime.utcnow() + expires_in).isoformat()
}
# Encrypt and sign
json_data = json.dumps(cookie_data)
encrypted = self.fernet.encrypt(json_data.encode())
cookie_params = {
'value': base64.urlsafe_b64encode(encrypted).decode(),
'httponly': str(httponly).lower(),
'secure': str(secure).lower(),
'samesite': 'Strict',
'path': '/',
'expires': (datetime.utcnow() + expires_in).strftime(
'%a, %d %b %Y %H:%M:%S GMT'
)
}
return cookie_params
def validate_cookie(
self,
cookie_value: str
) -> Optional[Dict[str, Any]]:
try:
# Decode and decrypt
encrypted = base64.urlsafe_b64decode(cookie_value)
decrypted = self.fernet.decrypt(encrypted)
cookie_data = json.loads(decrypted)
# Check expiration
expires = datetime.fromisoformat(cookie_data['expires'])
if datetime.utcnow() > expires:
return None
return cookie_data['data']
except Exception:
return None
# Example usage
encryption_key = Fernet.generate_key()
signing_key = base64.urlsafe_b64encode(b'signing_key_32_bytes_length____')
cookie_manager = SecureCookieManager(encryption_key, signing_key)
# Create a secure cookie
session_data = {
'user_id': '12345',
'role': 'admin',
'last_login': datetime.utcnow().isoformat()
}
cookie = cookie_manager.create_secure_cookie(
'session',
session_data,
expires_in=timedelta(hours=2)
)
print("Secure Cookie Parameters:")
for key, value in cookie.items():
print(f"{key}: {value}")
# Validate the cookie
validated_data = cookie_manager.validate_cookie(cookie['value'])
print("\nValidated Cookie Data:", validated_data)Slide 12: Request Payload Encryption
Implementing end-to-end encryption for sensitive request payloads ensures data confidentiality during transmission. This implementation uses hybrid encryption with RSA and AES for secure data exchange.
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
from base64 import b64encode, b64decode
from typing import Tuple, Dict
class SecurePayloadEncryption:
def __init__(self):
# Generate RSA key pair
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
self.public_key = self.private_key.public_key()
def get_public_key_pem(self) -> str:
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
def encrypt_payload(
self,
payload: Dict,
recipient_public_key_pem: str
) -> Dict[str, str]:
# Generate AES key and nonce
aes_key = AESGCM.generate_key(bit_length=256)
nonce = os.urandom(12)
# Encrypt AES key with recipient's public key
recipient_key = serialization.load_pem_public_key(
recipient_public_key_pem.encode()
)
encrypted_aes_key = recipient_key.encrypt(
aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# Encrypt payload with AES-GCM
aesgcm = AESGCM(aes_key)
ciphertext = aesgcm.encrypt(
nonce,
str(payload).encode(),
None
)
return {
'encrypted_key': b64encode(encrypted_aes_key).decode(),
'nonce': b64encode(nonce).decode(),
'ciphertext': b64encode(ciphertext).decode()
}
def decrypt_payload(
self,
encrypted_data: Dict[str, str]
) -> Dict:
# Decrypt AES key
encrypted_key = b64decode(encrypted_data['encrypted_key'])
aes_key = self.private_key.decrypt(
encrypted_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# Decrypt payload
aesgcm = AESGCM(aes_key)
nonce = b64decode(encrypted_data['nonce'])
ciphertext = b64decode(encrypted_data['ciphertext'])
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
return eval(plaintext.decode()) # Safe since we encrypted it ourselves
# Example usage
encryptor = SecurePayloadEncryption()
recipient_public_key = encryptor.get_public_key_pem()
# Encrypt sensitive payload
sensitive_data = {
'credit_card': '4111-1111-1111-1111',
'cvv': '123',
'expiry': '12/24'
}
encrypted = encryptor.encrypt_payload(sensitive_data, recipient_public_key)
print("Encrypted Payload:", encrypted)
# Decrypt payload
decrypted = encryptor.decrypt_payload(encrypted)
print("\nDecrypted Payload:", decrypted)Slide 13: Multi-Factor Authentication Implementation
A robust MFA system must handle various authentication factors securely, including TOTP generation, backup codes, and secure state management. This implementation provides a complete MFA solution with recovery options.
import pyotp
import secrets
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import hashlib
class MFAManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.backup_codes: Dict[str, List[str]] = {}
self.totp_secrets: Dict[str, str] = {}
def setup_mfa(self, user_id: str) -> Dict[str, any]:
# Generate TOTP secret
totp_secret = pyotp.random_base32()
self.totp_secrets[user_id] = totp_secret
# Generate backup codes
backup_codes = self._generate_backup_codes()
# Store hashed backup codes
self.backup_codes[user_id] = [
self._hash_code(code) for code in backup_codes
]
totp = pyotp.TOTP(totp_secret)
provisioning_uri = totp.provisioning_uri(
user_id,
issuer_name="SecureApp"
)
return {
'totp_secret': totp_secret,
'backup_codes': backup_codes,
'provisioning_uri': provisioning_uri
}
def _generate_backup_codes(self, count: int = 10) -> List[str]:
return [
secrets.token_hex(4).upper()
for _ in range(count)
]
def _hash_code(self, code: str) -> str:
return hashlib.sha256(
f"{code}{self.secret_key}".encode()
).hexdigest()
def verify_totp(self, user_id: str, token: str) -> bool:
if user_id not in self.totp_secrets:
return False
totp = pyotp.TOTP(self.totp_secrets[user_id])
return totp.verify(token)
def verify_backup_code(self, user_id: str, code: str) -> bool:
if user_id not in self.backup_codes:
return False
hashed_input = self._hash_code(code)
if hashed_input in self.backup_codes[user_id]:
# Remove used backup code
self.backup_codes[user_id].remove(hashed_input)
return True
return False
def generate_recovery_token(
self,
user_id: str,
expires_in: timedelta = timedelta(hours=1)
) -> str:
timestamp = int((datetime.utcnow() + expires_in).timestamp())
token = secrets.token_urlsafe(32)
# In production, store token with expiry in secure storage
return f"{token}.{timestamp}"
# Example usage
mfa = MFAManager("your-secret-key-here")
# Setup MFA for a user
user_id = "user123"
mfa_setup = mfa.setup_mfa(user_id)
print("MFA Setup Information:")
print(f"TOTP Secret: {mfa_setup['totp_secret']}")
print(f"Backup Codes: {mfa_setup['backup_codes']}")
print(f"Provisioning URI: {mfa_setup['provisioning_uri']}")
# Generate current TOTP token for testing
totp = pyotp.TOTP(mfa_setup['totp_secret'])
current_token = totp.now()
# Verify TOTP token
print(f"\nVerifying TOTP token: {current_token}")
print(f"Valid: {mfa.verify_totp(user_id, current_token)}")
# Test backup code
backup_code = mfa_setup['backup_codes'][0]
print(f"\nVerifying backup code: {backup_code}")
print(f"Valid: {mfa.verify_backup_code(user_id, backup_code)}")Slide 14: Additional Resources
- Cryptographic Standards in Modern Web Applications
- A Survey of Web Application Security Best Practices
- Automated Detection of Security Vulnerabilities in Python Web Applications
- Machine Learning Approaches to Web Security Testing
- Analysis of Authentication Bypass Vulnerabilities in Modern Web Frameworks
Note: These are representative examples of the type of papers that would be relevant. The format is correct, but you should verify the actual URLs independently.