This advanced module prepares students to deploy and operate AI systems at enterprise scale. Students will master distributed inference, implement sophisticated optimization strategies, manage compliance and governance, and build systems that balance performance with cost-effectiveness. This level focuses on real-world production challenges and solutions.
By the end of this module, students will be able to:
Framework Comparison (2024-2025):
Framework | Key Features | Best For | Performance |
---|---|---|---|
vLLM | PyTorch ecosystem, FP8 quantization | High throughput | Excellent |
Ray Serve | Distributed computing, auto-scaling | Complex pipelines | Very Good |
TGI (Text Generation Inference) | Hugging Face native | Easy deployment | Good |
Triton Inference Server | Multi-framework support | Heterogeneous models | Excellent |
Dynamo | Rust-based, engine agnostic | Flexibility | Excellent |
from vllm import LLM, SamplingParams
from vllm.distributed import initialize_distributed_environment
import torch
from typing import List, Dict, Optional
import asyncio
class DistributedInferenceEngine:
def __init__(self, model_name: str, tensor_parallel_size: int = 2,
pipeline_parallel_size: int = 1):
self.model_name = model_name
self.tensor_parallel_size = tensor_parallel_size
self.pipeline_parallel_size = pipeline_parallel_size
self.setup_engine()
def setup_engine(self):
"""Initialize vLLM with distributed settings"""
# Configure for distributed inference
self.llm = LLM(
model=self.model_name,
tensor_parallel_size=self.tensor_parallel_size,
pipeline_parallel_size=self.pipeline_parallel_size,
trust_remote_code=True,
dtype="float16", # or "bfloat16" for newer models
max_model_len=32768, # Adjust based on model
gpu_memory_utilization=0.95, # Maximize GPU usage
swap_space=4, # GB of CPU swap space
enforce_eager=False, # Use CUDA graphs for speed
enable_prefix_caching=True, # Cache common prefixes
enable_chunked_prefill=True, # Better handling of long contexts
max_num_batched_tokens=8192,
max_num_seqs=256 # Maximum concurrent sequences
)
# Default sampling parameters
self.default_sampling = SamplingParams(
temperature=0.7,
top_p=0.95,
max_tokens=2048,
stop=["</s>", "\n\n"]
)
def generate(self, prompts: List[str],
sampling_params: Optional[SamplingParams] = None) -> List[str]:
"""Generate responses for batch of prompts"""
if sampling_params is None:
sampling_params = self.default_sampling
outputs = self.llm.generate(prompts, sampling_params)
responses = []
for output in outputs:
response = output.outputs[0].text
responses.append(response)
return responses
async def async_generate(self, prompts: List[str]) -> List[str]:
"""Asynchronous generation for better concurrency"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.generate, prompts)
def profile_performance(self, test_prompts: List[str]) -> Dict:
"""Profile inference performance"""
import time
import numpy as np
metrics = {
"throughput": [],
"latency": [],
"tokens_per_second": []
}
for batch_size in [1, 8, 16, 32, 64]:
batch = test_prompts[:batch_size]
start_time = time.time()
outputs = self.llm.generate(batch, self.default_sampling)
end_time = time.time()
total_time = end_time - start_time
total_tokens = sum(len(o.outputs[0].token_ids) for o in outputs)
metrics["throughput"].append(batch_size / total_time)
metrics["latency"].append(total_time / batch_size)
metrics["tokens_per_second"].append(total_tokens / total_time)
return {
"avg_throughput": np.mean(metrics["throughput"]),
"avg_latency": np.mean(metrics["latency"]),
"avg_tokens_per_second": np.mean(metrics["tokens_per_second"]),
"details": metrics
}
import ray
from ray import serve
from ray.serve.handle import RayServeHandle
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import List, Dict
import asyncio
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
serve.start(detached=True)
@serve.deployment(
num_replicas=4, # Number of model replicas
ray_actor_options={
"num_cpus": 2,
"num_gpus": 1,
"memory": 16 * 1024 * 1024 * 1024 # 16GB per replica
},
autoscaling_config={
"min_replicas": 2,
"max_replicas": 10,
"target_num_ongoing_requests_per_replica": 5,
"upscale_delay_s": 10,
"downscale_delay_s": 60
},
health_check_period_s=10,
health_check_timeout_s=30
)
class ModelServer:
def __init__(self, model_name: str):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.tokenizer.pad_token = self.tokenizer.eos_token
async def __call__(self, request: Dict) -> Dict:
"""Handle inference request"""
prompt = request["prompt"]
params = request.get("params", {})
# Tokenize input
inputs = self.tokenizer(
prompt,
return_tensors="pt",
padding=True,
truncation=True,
max_length=2048
).to(self.device)
# Generate
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=params.get("max_tokens", 256),
temperature=params.get("temperature", 0.7),
top_p=params.get("top_p", 0.95),
do_sample=True
)
# Decode
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return {
"response": response,
"model": self.model.config.name_or_path,
"tokens_generated": len(outputs[0]) - len(inputs["input_ids"][0])
}
# Deploy the model
deployment = ModelServer.bind(model_name="meta-llama/Llama-2-7b-chat-hf")
serve.run(deployment, name="llm_service", route_prefix="/generate")
class LoadBalancer:
def __init__(self, endpoints: List[str]):
self.endpoints = endpoints
self.current_index = 0
self.health_status = {ep: True for ep in endpoints}
self.request_counts = {ep: 0 for ep in endpoints}
async def route_request(self, request: Dict) -> Dict:
"""Route request to healthy endpoint with least load"""
# Get healthy endpoints sorted by load
healthy_endpoints = [
ep for ep in self.endpoints
if self.health_status[ep]
]
if not healthy_endpoints:
raise Exception("No healthy endpoints available")
# Select endpoint with least requests
selected = min(healthy_endpoints, key=lambda x: self.request_counts[x])
self.request_counts[selected] += 1
try:
# Make request to selected endpoint
response = await self.make_request(selected, request)
return response
except Exception as e:
# Mark endpoint as unhealthy
self.health_status[selected] = False
# Retry with different endpoint
return await self.route_request(request)
finally:
self.request_counts[selected] -= 1
async def health_check(self):
"""Periodic health check for all endpoints"""
while True:
for endpoint in self.endpoints:
try:
# Simple health check
await self.make_request(endpoint, {"prompt": "test", "params": {"max_tokens": 1}})
self.health_status[endpoint] = True
except:
self.health_status[endpoint] = False
await asyncio.sleep(30) # Check every 30 seconds
import heapq
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
import time
import threading
from queue import PriorityQueue
@dataclass(order=True)
class InferenceRequest:
priority: int
request_id: str = field(compare=False)
prompt: str = field(compare=False)
params: Dict = field(compare=False)
timestamp: float = field(compare=False)
callback: Any = field(compare=False)
class DynamicBatcher:
def __init__(self, model, max_batch_size: int = 32,
max_wait_time: float = 0.1):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.request_queue = PriorityQueue()
self.processing = False
self.start_processing()
def add_request(self, request: InferenceRequest):
"""Add request to batching queue"""
request.timestamp = time.time()
self.request_queue.put(request)
def start_processing(self):
"""Start batch processing thread"""
thread = threading.Thread(target=self._process_batches, daemon=True)
thread.start()
def _process_batches(self):
"""Process batches continuously"""
while True:
batch = self._collect_batch()
if batch:
self._process_batch(batch)
else:
time.sleep(0.01) # Small sleep if no requests
def _collect_batch(self) -> List[InferenceRequest]:
"""Collect requests into a batch"""
batch = []
deadline = time.time() + self.max_wait_time
while len(batch) < self.max_batch_size and time.time() < deadline:
if not self.request_queue.empty():
try:
request = self.request_queue.get_nowait()
batch.append(request)
except:
break
else:
# Wait a bit for more requests
time.sleep(0.001)
return batch
def _process_batch(self, batch: List[InferenceRequest]):
"""Process a batch of requests"""
# Sort by sequence length for efficient padding
batch.sort(key=lambda x: len(x.prompt))
# Extract prompts and parameters
prompts = [r.prompt for r in batch]
# Batch inference
responses = self.model.generate_batch(prompts)
# Return results via callbacks
for request, response in zip(batch, responses):
if request.callback:
request.callback(response)
class ContinuousBatching:
"""
Implement continuous batching for optimal throughput
"""
def __init__(self, model, max_batch_size: int = 128):
self.model = model
self.max_batch_size = max_batch_size
self.active_sequences = {}
self.kv_cache = {}
def add_sequence(self, seq_id: str, prompt: str):
"""Add new sequence to active batch"""
self.active_sequences[seq_id] = {
"prompt": prompt,
"generated": [],
"done": False,
"position": 0
}
def step(self) -> Dict[str, str]:
"""Perform one generation step for all active sequences"""
if not self.active_sequences:
return {}
# Prepare batch
batch_inputs = []
seq_ids = []
for seq_id, seq_data in self.active_sequences.items():
if not seq_data["done"]:
batch_inputs.append(seq_data)
seq_ids.append(seq_id)
# Generate next tokens
next_tokens = self.model.generate_next_tokens(
batch_inputs,
kv_cache=self.kv_cache
)
# Update sequences
results = {}
for seq_id, next_token in zip(seq_ids, next_tokens):
self.active_sequences[seq_id]["generated"].append(next_token)
# Check if sequence is done
if next_token in ["</s>", "<|endoftext|>"]:
self.active_sequences[seq_id]["done"] = True
results[seq_id] = "".join(self.active_sequences[seq_id]["generated"])
# Remove completed sequences
for seq_id in results:
del self.active_sequences[seq_id]
if seq_id in self.kv_cache:
del self.kv_cache[seq_id]
return results
class ContextWindowManager:
def __init__(self, max_context_length: int = 32768):
self.max_context_length = max_context_length
self.compression_strategies = {
"summarization": self.compress_by_summarization,
"selective": self.selective_context_retention,
"hierarchical": self.hierarchical_compression,
"sliding": self.sliding_window_compression
}
def manage_context(self, context: str, new_input: str,
strategy: str = "hierarchical") -> str:
"""Manage context to fit within window"""
total_length = len(context) + len(new_input)
if total_length <= self.max_context_length:
return context + "\n" + new_input
# Apply compression strategy
compression_func = self.compression_strategies[strategy]
compressed_context = compression_func(context, new_input)
return compressed_context
def compress_by_summarization(self, context: str, new_input: str) -> str:
"""Compress context by summarizing older parts"""
# Split context into chunks
chunks = self.split_into_chunks(context, chunk_size=1000)
# Keep recent chunks as-is, summarize older ones
cutoff = len(chunks) // 3
summarized = []
for i, chunk in enumerate(chunks):
if i < cutoff:
# Summarize older chunks more aggressively
summary = self.summarize_chunk(chunk, max_length=100)
summarized.append(f"[Summarized]: {summary}")
else:
# Keep recent chunks
summarized.append(chunk)
compressed = "\n".join(summarized) + "\n" + new_input
return self.truncate_to_fit(compressed)
def selective_context_retention(self, context: str, new_input: str) -> str:
"""Keep only relevant parts of context"""
# Extract key information
entities = self.extract_entities(context)
facts = self.extract_facts(context)
recent_exchanges = self.get_recent_exchanges(context, n=3)
# Reconstruct compressed context
compressed_parts = [
f"Key entities: {', '.join(entities)}",
f"Important facts: {' '.join(facts)}",
"Recent context:",
"\n".join(recent_exchanges)
]
compressed = "\n".join(compressed_parts) + "\n" + new_input
return self.truncate_to_fit(compressed)
def hierarchical_compression(self, context: str, new_input: str) -> str:
"""Multi-level compression based on importance"""
# Parse context into segments with importance scores
segments = self.segment_and_score(context)
# Sort by importance
segments.sort(key=lambda x: x["importance"], reverse=True)
# Build compressed context within limit
compressed_parts = []
current_length = len(new_input)
for segment in segments:
segment_text = segment["text"]
# Apply compression based on importance
if segment["importance"] > 0.8:
# Keep as-is
compressed_text = segment_text
elif segment["importance"] > 0.5:
# Light compression
compressed_text = self.light_compress(segment_text)
else:
# Heavy compression
compressed_text = self.heavy_compress(segment_text)
if current_length + len(compressed_text) < self.max_context_length:
compressed_parts.append(compressed_text)
current_length += len(compressed_text)
else:
break
return "\n".join(compressed_parts) + "\n" + new_input
def sliding_window_compression(self, context: str, new_input: str) -> str:
"""Maintain sliding window with decay"""
window_size = self.max_context_length - len(new_input) - 100 # Buffer
if len(context) <= window_size:
return context + "\n" + new_input
# Keep most recent content
return context[-window_size:] + "\n" + new_input
def segment_and_score(self, context: str) -> List[Dict]:
"""Segment context and assign importance scores"""
segments = []
# Simple heuristic-based scoring
lines = context.split("\n")
for line in lines:
importance = 0.5 # Base score
# Adjust based on content
if any(keyword in line.lower() for keyword in ["important", "key", "critical"]):
importance += 0.3
if any(keyword in line.lower() for keyword in ["user:", "assistant:"]):
importance += 0.2
if len(line) > 200: # Longer content might be more detailed
importance += 0.1
segments.append({
"text": line,
"importance": min(1.0, importance)
})
return segments
class MemoryEfficientLoader:
def __init__(self):
self.loaded_models = {}
self.memory_monitor = MemoryMonitor()
def load_model_sharded(self, model_name: str, num_shards: int = 4):
"""Load model in shards across multiple GPUs"""
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
# Initialize model with empty weights
with init_empty_weights():
model = AutoModelForCausalLM.from_config(
AutoConfig.from_pretrained(model_name)
)
# Load and dispatch across devices
model = load_checkpoint_and_dispatch(
model,
model_name,
device_map="auto",
max_memory={
0: "10GiB",
1: "10GiB",
2: "10GiB",
3: "10GiB",
"cpu": "30GiB"
},
no_split_module_classes=["Block"],
dtype=torch.float16
)
return model
def quantize_model(self, model, quantization_config: Dict):
"""Apply quantization for memory reduction"""
from transformers import BitsAndBytesConfig
import bitsandbytes as bnb
if quantization_config["method"] == "int8":
# 8-bit quantization
model = bnb.nn.Linear8bitLt(model)
elif quantization_config["method"] == "int4":
# 4-bit quantization
config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4"
)
model = model.quantize(config)
elif quantization_config["method"] == "fp8":
# FP8 quantization (for newest GPUs)
model = self.apply_fp8_quantization(model)
return model
def offload_to_cpu(self, model, offload_config: Dict):
"""Offload model layers to CPU when not in use"""
from accelerate import cpu_offload_with_hook
# Offload specific layers
offload_layers = offload_config.get("layers", [])
hooks = []
for layer_name in offload_layers:
layer = getattr(model, layer_name)
hook = cpu_offload_with_hook(layer, execution_device="cuda:0")
hooks.append(hook)
return model, hooks
def implement_gradient_checkpointing(self, model):
"""Enable gradient checkpointing to save memory during training"""
model.gradient_checkpointing_enable()
# Custom checkpointing for specific layers
def checkpoint_forward(module, *args, **kwargs):
return torch.utils.checkpoint.checkpoint(
module._forward,
*args,
use_reentrant=False,
**kwargs
)
# Apply to transformer layers
for layer in model.transformer.h:
layer.forward = lambda *args, **kwargs: checkpoint_forward(layer, *args, **kwargs)
return model
class MemoryMonitor:
def __init__(self):
self.gpu_memory_reserved = {}
self.gpu_memory_allocated = {}
def get_memory_stats(self) -> Dict:
"""Get current memory statistics"""
stats = {}
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
stats[f"gpu_{i}"] = {
"allocated": torch.cuda.memory_allocated(i) / 1024**3, # GB
"reserved": torch.cuda.memory_reserved(i) / 1024**3, # GB
"free": (torch.cuda.get_device_properties(i).total_memory -
torch.cuda.memory_allocated(i)) / 1024**3 # GB
}
# CPU memory
import psutil
mem = psutil.virtual_memory()
stats["cpu"] = {
"used": mem.used / 1024**3,
"available": mem.available / 1024**3,
"percent": mem.percent
}
return stats
def optimize_memory(self):
"""Optimize memory usage"""
# Clear cache
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Garbage collection
import gc
gc.collect()
# Clear unused variables
for obj in gc.get_objects():
if torch.is_tensor(obj):
if obj.device.type == 'cuda' and not obj.is_pinned():
del obj
class CostOptimizer:
def __init__(self):
self.model_registry = self.initialize_model_registry()
self.cost_tracker = CostTracker()
self.performance_monitor = PerformanceMonitor()
def initialize_model_registry(self) -> Dict:
"""Registry of available models with costs and capabilities"""
return {
"gpt-4o": {
"cost_per_1k_input": 0.005,
"cost_per_1k_output": 0.015,
"quality_score": 0.95,
"latency": 2.5,
"capabilities": ["complex_reasoning", "creative", "analysis"]
},
"gpt-4o-mini": {
"cost_per_1k_input": 0.00015,
"cost_per_1k_output": 0.0006,
"quality_score": 0.80,
"latency": 1.0,
"capabilities": ["general", "simple_tasks"]
},
"claude-3-5-sonnet": {
"cost_per_1k_input": 0.003,
"cost_per_1k_output": 0.015,
"quality_score": 0.93,
"latency": 2.0,
"capabilities": ["analysis", "coding", "long_context"]
},
"llama-3-70b": {
"cost_per_1k_input": 0.0008,
"cost_per_1k_output": 0.0008,
"quality_score": 0.85,
"latency": 1.5,
"capabilities": ["general", "multilingual"]
},
"mixtral-8x7b": {
"cost_per_1k_input": 0.0005,
"cost_per_1k_output": 0.0005,
"quality_score": 0.78,
"latency": 1.2,
"capabilities": ["general", "fast"]
}
}
def select_optimal_model(self, task: Dict, constraints: Dict) -> str:
"""Select most cost-effective model for task"""
required_quality = constraints.get("min_quality", 0.7)
max_latency = constraints.get("max_latency", 5.0)
max_cost = constraints.get("max_cost", float('inf'))
required_capabilities = set(task.get("capabilities", []))
candidates = []
for model_name, model_info in self.model_registry.items():
# Check constraints
if model_info["quality_score"] < required_quality:
continue
if model_info["latency"] > max_latency:
continue
# Check capabilities
model_capabilities = set(model_info["capabilities"])
if required_capabilities and not required_capabilities.issubset(model_capabilities):
continue
# Estimate cost for this task
estimated_cost = self.estimate_task_cost(task, model_info)
if estimated_cost > max_cost:
continue
# Calculate efficiency score
efficiency = model_info["quality_score"] / estimated_cost
candidates.append({
"model": model_name,
"cost": estimated_cost,
"quality": model_info["quality_score"],
"efficiency": efficiency
})
if not candidates:
# Fallback to cheapest model if no candidates meet criteria
return "mixtral-8x7b"
# Select model with best efficiency
best = max(candidates, key=lambda x: x["efficiency"])
return best["model"]
def estimate_task_cost(self, task: Dict, model_info: Dict) -> float:
"""Estimate cost for specific task"""
# Estimate token counts
input_tokens = task.get("estimated_input_tokens", 500)
output_tokens = task.get("estimated_output_tokens", 500)
input_cost = (input_tokens / 1000) * model_info["cost_per_1k_input"]
output_cost = (output_tokens / 1000) * model_info["cost_per_1k_output"]
return input_cost + output_cost
def implement_cascading_strategy(self, task: str) -> Dict:
"""Implement cost-efficient cascading strategy"""
# Start with cheapest model
models_tried = []
total_cost = 0
for model_name in sorted(self.model_registry.keys(),
key=lambda x: self.model_registry[x]["cost_per_1k_input"]):
model_info = self.model_registry[model_name]
# Try current model
result = self.try_model(model_name, task)
cost = self.calculate_actual_cost(result, model_info)
total_cost += cost
models_tried.append({
"model": model_name,
"cost": cost,
"quality": result.get("quality_score", 0)
})
# Check if quality is sufficient
if result.get("quality_score", 0) >= 0.8:
return {
"final_model": model_name,
"result": result,
"total_cost": total_cost,
"models_tried": models_tried
}
# Return best result if quality threshold not met
best_result = max(models_tried, key=lambda x: x["quality"])
return {
"final_model": best_result["model"],
"total_cost": total_cost,
"models_tried": models_tried,
"quality_warning": "Quality threshold not met"
}
class CostTracker:
def __init__(self):
self.usage_history = []
self.cost_by_model = {}
self.cost_by_user = {}
self.daily_budget = 1000 # $1000 per day
def track_usage(self, model: str, input_tokens: int,
output_tokens: int, user_id: str):
"""Track API usage and costs"""
timestamp = time.time()
# Calculate cost
model_costs = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
# Add more models
}
costs = model_costs.get(model, {"input": 0.001, "output": 0.001})
total_cost = (input_tokens / 1000) * costs["input"] + \
(output_tokens / 1000) * costs["output"]
# Record usage
usage = {
"timestamp": timestamp,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": total_cost,
"user_id": user_id
}
self.usage_history.append(usage)
# Update aggregates
self.cost_by_model[model] = self.cost_by_model.get(model, 0) + total_cost
self.cost_by_user[user_id] = self.cost_by_user.get(user_id, 0) + total_cost
# Check budget
self.check_budget_alerts()
def check_budget_alerts(self):
"""Monitor and alert on budget usage"""
# Calculate today's spending
today_start = time.time() - (time.time() % 86400)
today_cost = sum(
u["cost"] for u in self.usage_history
if u["timestamp"] >= today_start
)
if today_cost > self.daily_budget * 0.8:
self.send_alert(f"Daily budget 80% consumed: ${today_cost:.2f}")
if today_cost > self.daily_budget:
self.send_alert(f"Daily budget exceeded: ${today_cost:.2f}")
self.implement_throttling()
def generate_cost_report(self, period: str = "daily") -> Dict:
"""Generate cost analysis report"""
report = {
"period": period,
"total_cost": sum(u["cost"] for u in self.usage_history),
"by_model": self.cost_by_model,
"by_user": self.cost_by_user,
"top_users": sorted(
self.cost_by_user.items(),
key=lambda x: x[1],
reverse=True
)[:10],
"optimization_opportunities": self.identify_optimizations()
}
return report
def identify_optimizations(self) -> List[Dict]:
"""Identify cost optimization opportunities"""
optimizations = []
# Analyze model usage patterns
model_usage = {}
for usage in self.usage_history:
model = usage["model"]
if model not in model_usage:
model_usage[model] = {"count": 0, "total_cost": 0}
model_usage[model]["count"] += 1
model_usage[model]["total_cost"] += usage["cost"]
# Suggest model downgrades for simple tasks
for model, stats in model_usage.items():
if model == "gpt-4o" and stats["count"] > 100:
avg_tokens = sum(u["output_tokens"] for u in self.usage_history
if u["model"] == model) / stats["count"]
if avg_tokens < 200:
optimizations.append({
"type": "model_downgrade",
"current_model": model,
"suggested_model": "gpt-4o-mini",
"potential_savings": stats["total_cost"] * 0.8
})
return optimizations
class PrivacyPreservingAI:
def __init__(self):
self.setup_privacy_tools()
self.compliance_standards = ["GDPR", "HIPAA", "SOC2", "CCPA"]
def setup_privacy_tools(self):
"""Initialize privacy protection tools"""
self.pii_detector = PIIDetector()
self.data_anonymizer = DataAnonymizer()
self.differential_privacy = DifferentialPrivacy()
self.audit_logger = AuditLogger()
def process_with_privacy(self, data: str, user_consent: Dict) -> str:
"""Process data with privacy protection"""
# Log access
self.audit_logger.log_access(data, user_consent)
# Detect and handle PII
pii_scan = self.pii_detector.scan(data)
if pii_scan["contains_pii"]:
if not user_consent.get("allow_pii_processing", False):
# Anonymize PII
data = self.data_anonymizer.anonymize(data, pii_scan["entities"])
# Apply differential privacy for aggregations
if user_consent.get("require_differential_privacy", False):
data = self.differential_privacy.add_noise(data)
return data
class PIIDetector:
def __init__(self):
import spacy
self.nlp = spacy.load("en_core_web_sm")
self.patterns = {
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"credit_card": r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b(?:\+?1[-.]?)?\(?[0-9]{3}\)?[-.]?[0-9]{3}[-.]?[0-9]{4}\b',
"ip_address": r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
"medical_record": r'\b[A-Z]{2}\d{6,8}\b',
"passport": r'\b[A-Z][0-9]{8}\b'
}
def scan(self, text: str) -> Dict:
"""Scan text for PII"""
results = {
"contains_pii": False,
"entities": [],
"risk_level": "low"
}
# Named entity recognition
doc = self.nlp(text)
for ent in doc.ents:
if ent.label_ in ["PERSON", "ORG", "GPE", "DATE"]:
results["entities"].append({
"text": ent.text,
"type": ent.label_,
"start": ent.start_char,
"end": ent.end_char
})
# Pattern matching
import re
for pii_type, pattern in self.patterns.items():
matches = re.finditer(pattern, text)
for match in matches:
results["entities"].append({
"text": match.group(),
"type": pii_type,
"start": match.start(),
"end": match.end()
})
if results["entities"]:
results["contains_pii"] = True
results["risk_level"] = self.assess_risk_level(results["entities"])
return results
def assess_risk_level(self, entities: List[Dict]) -> str:
"""Assess privacy risk level"""
high_risk_types = ["ssn", "credit_card", "medical_record", "passport"]
medium_risk_types = ["email", "phone", "ip_address"]
for entity in entities:
if entity["type"] in high_risk_types:
return "high"
for entity in entities:
if entity["type"] in medium_risk_types:
return "medium"
return "low"
class DataAnonymizer:
def __init__(self):
self.replacement_strategies = {
"PERSON": self.anonymize_person,
"email": self.anonymize_email,
"phone": self.anonymize_phone,
"ssn": lambda x: "[SSN_REDACTED]",
"credit_card": lambda x: "[CC_REDACTED]"
}
def anonymize(self, text: str, entities: List[Dict]) -> str:
"""Anonymize PII in text"""
# Sort entities by position (reverse order for replacement)
entities.sort(key=lambda x: x["start"], reverse=True)
anonymized_text = text
for entity in entities:
entity_type = entity["type"]
if entity_type in self.replacement_strategies:
replacement = self.replacement_strategies[entity_type](entity["text"])
else:
replacement = f"[{entity_type}_REDACTED]"
anonymized_text = (
anonymized_text[:entity["start"]] +
replacement +
anonymized_text[entity["end"]:]
)
return anonymized_text
def anonymize_person(self, name: str) -> str:
"""Generate consistent anonymous name"""
import hashlib
# Generate consistent hash for name
name_hash = hashlib.md5(name.encode()).hexdigest()
# Use hash to select from anonymous names
first_names = ["Alex", "Jordan", "Casey", "Morgan", "Riley"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones"]
first_idx = int(name_hash[:8], 16) % len(first_names)
last_idx = int(name_hash[8:16], 16) % len(last_names)
return f"{first_names[first_idx]} {last_names[last_idx]}"
def anonymize_email(self, email: str) -> str:
"""Anonymize email while preserving domain"""
parts = email.split("@")
if len(parts) == 2:
return f"user_xxxx@{parts[1]}"
return "[EMAIL_REDACTED]"
def anonymize_phone(self, phone: str) -> str:
"""Anonymize phone number keeping area code"""
import re
# Extract area code if present
match = re.match(r'(\+?\d{1,3}[-.]?)?\(?(\d{3})\)?', phone)
if match:
area_code = match.group(2)
return f"({area_code}) XXX-XXXX"
return "[PHONE_REDACTED]"
class ComplianceManager:
def __init__(self):
self.compliance_checks = {
"GDPR": self.check_gdpr_compliance,
"HIPAA": self.check_hipaa_compliance,
"SOC2": self.check_soc2_compliance,
"CCPA": self.check_ccpa_compliance
}
self.data_retention_policies = self.load_retention_policies()
def check_compliance(self, data_processing: Dict) -> Dict:
"""Check compliance with regulations"""
results = {
"compliant": True,
"violations": [],
"warnings": [],
"recommendations": []
}
for regulation, check_func in self.compliance_checks.items():
check_result = check_func(data_processing)
if not check_result["compliant"]:
results["compliant"] = False
results["violations"].extend(check_result.get("violations", []))
results["warnings"].extend(check_result.get("warnings", []))
results["recommendations"].extend(check_result.get("recommendations", []))
return results
def check_gdpr_compliance(self, data_processing: Dict) -> Dict:
"""Check GDPR compliance"""
result = {"compliant": True, "violations": [], "warnings": []}
# Check for lawful basis
if not data_processing.get("lawful_basis"):
result["violations"].append("No lawful basis for processing")
result["compliant"] = False
# Check for user consent
if data_processing.get("personal_data") and not data_processing.get("user_consent"):
result["violations"].append("Processing personal data without consent")
result["compliant"] = False
# Check data minimization
if data_processing.get("data_collected_unnecessarily"):
result["warnings"].append("Potential violation of data minimization principle")
# Check right to erasure
if not data_processing.get("deletion_mechanism"):
result["warnings"].append("No mechanism for right to erasure")
return result
def implement_data_retention(self, data: Dict, policy: str):
"""Implement data retention policies"""
retention_period = self.data_retention_policies.get(policy, 90) # days
# Schedule deletion
deletion_date = time.time() + (retention_period * 86400)
return {
"data_id": data.get("id"),
"retention_period": retention_period,
"deletion_scheduled": deletion_date,
"policy": policy
}
class AuditLogger:
def __init__(self, log_path: str = "/var/log/ai_audit.log"):
self.log_path = log_path
self.setup_logger()
def setup_logger(self):
"""Setup secure audit logging"""
import logging
from logging.handlers import RotatingFileHandler
self.logger = logging.getLogger("ai_audit")
self.logger.setLevel(logging.INFO)
# Rotating file handler with encryption
handler = RotatingFileHandler(
self.log_path,
maxBytes=100 * 1024 * 1024, # 100MB
backupCount=10
)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_access(self, data: Any, user_context: Dict):
"""Log data access for audit trail"""
import json
import hashlib
log_entry = {
"timestamp": time.time(),
"user_id": user_context.get("user_id"),
"action": user_context.get("action", "access"),
"data_hash": hashlib.sha256(str(data).encode()).hexdigest(),
"ip_address": user_context.get("ip_address"),
"session_id": user_context.get("session_id"),
"compliance_flags": user_context.get("compliance_flags", [])
}
self.logger.info(json.dumps(log_entry))
def generate_audit_report(self, start_date: str, end_date: str) -> Dict:
"""Generate audit report for compliance"""
# Parse logs and generate report
report = {
"period": f"{start_date} to {end_date}",
"total_accesses": 0,
"unique_users": set(),
"actions": {},
"compliance_violations": []
}
# Read and parse logs
with open(self.log_path, 'r') as f:
for line in f:
try:
# Parse log entry
entry = json.loads(line.split(" - ")[-1])
# Update statistics
report["total_accesses"] += 1
report["unique_users"].add(entry.get("user_id"))
action = entry.get("action")
report["actions"][action] = report["actions"].get(action, 0) + 1
# Check for violations
if entry.get("compliance_flags"):
report["compliance_violations"].append(entry)
except Exception as e:
continue
report["unique_users"] = len(report["unique_users"])
return report
Build a complete distributed inference system that:
Create a cost optimization system that:
Implement a privacy-preserving AI system that:
Build a monitoring system that:
Level 4 represents the pinnacle of AI engineering education, preparing students to deploy and operate AI systems at enterprise scale. Students master distributed computing, advanced optimization, compliance requirements, and operational excellence. This level transforms students into AI architects capable of building and maintaining production systems that serve millions of users while balancing performance, cost, and compliance requirements.