Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions Classical detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
MemoryWatch - Week 2 (Prachin)
Task: Agree array interface; stub model class against contract.
Dependency: W2 model interface contract (Esala).

This stubs the classical detector as a class so it can plug into the shared
evaluation harness (Vignesh, Week 3+) and the entry point (Esala) the same
way the quantum layer eventually will. No real training logic yet — that's
Week 3. This file exists to lock the SHAPE of inputs/outputs early so
nobody downstream has to guess.

Assumed interface (numpy-array in, numpy-array out — matches what Week 1's
prototype already validated against sklearn):

model = ClassicalDetector(**hyperparams)
model.fit(X_train) # X_train: (n_samples, n_features) normal-only data
scores = model.score(X) # -> (n_samples,) float array, higher = more anomalous
labels = model.predict(X) # -> (n_samples,) int array, {0, 1}: 1 = flagged as attack

NOTE: sklearn's IsolationForest natively returns 1=normal/-1=anomaly and
"lower score = more anomalous". This stub flips both so the class's public
API matches what the evaluation harness will expect everywhere in the
codebase (0/1 labels, higher score = more anomalous is more intuitive for
FPR/recall/F1/AUROC reporting). This mapping is the actual "contract"
decision below -- flag it in review if Esala's harness expects sklearn's
raw convention instead.
"""

from __future__ import annotations

import numpy as np
from sklearn.ensemble import IsolationForest


class ClassicalDetector:
"""Thin wrapper around sklearn's IsolationForest with a MemoryWatch-shaped API."""

def __init__(
self,
n_estimators: int = 100,
contamination: float | str = "auto",
random_state: int = 42,
):
self.n_estimators = n_estimators
self.contamination = contamination
self.random_state = random_state
self._model: IsolationForest | None = None
self._threshold: float | None = None # set in Week 3 (dynamic threshold, Shamim et al.)

def fit(self, X: np.ndarray) -> "ClassicalDetector":
"""Fit on normal-only behaviour data. X shape: (n_samples, n_features)."""
X = np.asarray(X)
if X.ndim != 2:
raise ValueError(f"Expected 2D array (n_samples, n_features), got shape {X.shape}")

self._model = IsolationForest(
n_estimators=self.n_estimators,
contamination=self.contamination,
random_state=self.random_state,
)
self._model.fit(X)
return self

def score(self, X: np.ndarray) -> np.ndarray:
"""Anomaly score per sample. Higher = more anomalous (flipped from sklearn's raw sign)."""
self._check_fitted()
X = np.asarray(X)
# sklearn: higher decision_function = more normal -> flip sign so
# higher = more anomalous, which is the convention the rest of the
# pipeline (dynamic threshold, FPR/recall/AUROC) will use.
return -self._model.decision_function(X)

def predict(self, X: np.ndarray, threshold: float | None = None) -> np.ndarray:
"""
Binary labels: 1 = flagged as attack, 0 = normal.
threshold: score cutoff. If None, uses self._threshold (set by Week 3's
dynamic thresholding) and falls back to sklearn's own contamination-based
cutoff (score > 0) if no threshold has been set yet.
"""
self._check_fitted()
scores = self.score(X)
cutoff = threshold if threshold is not None else (self._threshold if self._threshold is not None else 0.0)
return (scores > cutoff).astype(int)

def _check_fitted(self):
if self._model is None:
raise RuntimeError("Call fit(X) before score()/predict().")


def _smoke_test():
"""Quick self-check that the stub's shapes/types behave as declared. Not the real Week 3 test suite."""
rng = np.random.default_rng(0)
X_train = rng.normal(size=(100, 4))
X_test = np.vstack([rng.normal(size=(20, 4)), rng.uniform(-8, 8, size=(5, 4))])

model = ClassicalDetector().fit(X_train)
scores = model.score(X_test)
labels = model.predict(X_test)

assert scores.shape == (25,), f"score() shape wrong: {scores.shape}"
assert labels.shape == (25,), f"predict() shape wrong: {labels.shape}"
assert set(np.unique(labels)).issubset({0, 1}), "predict() must return {0, 1} labels"

print("ClassicalDetector stub OK.")
print(f" score() range: [{scores.min():.3f}, {scores.max():.3f}]")
print(f" predict() flagged: {int(labels.sum())} / {len(labels)}")


if __name__ == "__main__":
_smoke_test()
Binary file added MemoryWatch_Week6_Outline_Prachin.docx
Binary file not shown.
134 changes: 134 additions & 0 deletions evaluation_harness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""
MemoryWatch - Week 5 (Prachin)
Task: Feed classical scores to harness; tune threshold (FPR vs recall).
Dependency: W4 + W3 comparison scores (Vignesh's shared evaluation harness).

STATUS: Vignesh's real evaluation harness isn't in the repo yet (Week 2/3
tasks: "Implement metrics module: FPR, recall, F1" / "Wrap classical model
in evaluation harness"). This file is a STAND-IN with the same metric
surface (FPR, recall, F1, AUROC -- matching the thesis's stated evaluation
methodology and Layman & Roden's finding that accuracy alone is misleading).
Swap `evaluate()` below for Vignesh's real harness call once it exists --
the ClassicalDetector output shapes won't need to change.

Two things happen here:
1. evaluate(y_true, y_pred, scores) -> dict of FPR/recall/F1/AUROC,
mirroring the exact metric set from Section 5.6 of the thesis.
2. tune_threshold(...) sweeps the dynamic-threshold sensitivity
parameter k and reports the FPR/recall/F1/AUROC trade-off at each
value, so a k can be chosen deliberately rather than guessed.
"""

import numpy as np
from sklearn.metrics import roc_auc_score

from classical_detector import ClassicalDetector


def evaluate(y_true: np.ndarray, y_pred: np.ndarray, scores: np.ndarray) -> dict:
"""
Compute FPR, recall, F1, AUROC -- the four metrics the thesis commits to
(Section 5.6). Accuracy is deliberately NOT included, following Layman
and Roden (2023) / Hesford et al. (2024): high accuracy can hide a
useless model on imbalanced attack/normal data.

y_true: (n,) ground truth, 1 = attack, 0 = normal
y_pred: (n,) predicted labels, 1 = flagged, 0 = normal
scores: (n,) continuous anomaly scores (higher = more anomalous),
used for AUROC since it's threshold-independent
"""
y_true = np.asarray(y_true)
y_pred = np.asarray(y_pred)
scores = np.asarray(scores)

tp = int(((y_pred == 1) & (y_true == 1)).sum())
fp = int(((y_pred == 1) & (y_true == 0)).sum())
tn = int(((y_pred == 0) & (y_true == 0)).sum())
fn = int(((y_pred == 0) & (y_true == 1)).sum())

fpr = fp / (fp + tn) if (fp + tn) > 0 else 0.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

# AUROC needs both classes present in y_true, and is threshold-independent
# (uses scores, not y_pred) -- this is why it's reported separately.
try:
auroc = roc_auc_score(y_true, scores)
except ValueError:
auroc = float("nan") # only one class present in this slice

return {"FPR": fpr, "Recall": recall, "F1": f1, "AUROC": auroc, "TP": tp, "FP": fp, "TN": tn, "FN": fn}


def tune_threshold(X_train_normal, groups_train, X_test, y_test, groups_test, k_values=None):
"""
Sweep the dynamic-threshold sensitivity parameter k (std multiplier
from Week 3's fit_dynamic_threshold) and report metrics at each value.
Lower k -> more sensitive -> higher recall, higher FPR.
Higher k -> stricter -> lower recall, lower FPR.
"""
if k_values is None:
k_values = [1.5, 2.0, 2.5, 3.0, 3.5, 4.0]

results = []
for k in k_values:
model = ClassicalDetector(random_state=42).fit(X_train_normal)
model.fit_dynamic_threshold(X_train_normal, groups_train, k=k)

scores = model.score(X_test)
y_pred = model.predict_dynamic(X_test, groups_test)

metrics = evaluate(y_test, y_pred, scores)
metrics["k"] = k
results.append(metrics)

return results


def print_tuning_table(results):
print(f"{'k':>5} | {'FPR':>7} | {'Recall':>7} | {'F1':>7} | {'AUROC':>7}")
print("-" * 45)
for r in results:
print(f"{r['k']:>5.1f} | {r['FPR']:>7.3f} | {r['Recall']:>7.3f} | {r['F1']:>7.3f} | {r['AUROC']:>7.3f}")


def _demo():
"""
Toy stand-in for real UNSW-NB15 data (Ghita's Week 2 preprocessing
output isn't in the repo yet). Same shape as the real pipeline:
train on normal-only data, evaluate on a held-out mix of normal +
attack-like points, grouped by a placeholder 'process class'.
"""
rng = np.random.default_rng(0)

quiet_normal = rng.normal(loc=0, scale=0.4, size=(150, 4))
noisy_normal = rng.normal(loc=0, scale=2.0, size=(150, 4))
X_train = np.vstack([quiet_normal, noisy_normal])
groups_train = np.array(["quiet"] * 150 + ["noisy"] * 150)

quiet_test_normal = rng.normal(loc=0, scale=0.4, size=(60, 4))
noisy_test_normal = rng.normal(loc=0, scale=2.0, size=(60, 4))
quiet_attacks = rng.uniform(-10, 10, size=(15, 4))
noisy_attacks = rng.uniform(-15, 15, size=(15, 4))

X_test = np.vstack([quiet_test_normal, noisy_test_normal, quiet_attacks, noisy_attacks])
groups_test = np.array(["quiet"] * 60 + ["noisy"] * 60 + ["quiet"] * 15 + ["noisy"] * 15)
y_test = np.array([0] * 120 + [1] * 30)

print("=== Week 5: threshold tuning (FPR vs Recall trade-off) ===\n")
results = tune_threshold(X_train, groups_train, X_test, y_test, groups_test)
print_tuning_table(results)

# pick the k with the best F1 as a starting recommendation -- but flag
# that the real choice depends on operational priorities (Layman &
# Roden: high FPR degrades analyst performance more than it seems)
best = max(results, key=lambda r: r["F1"])
print(f"\nBest F1 at k={best['k']}: FPR={best['FPR']:.3f}, Recall={best['Recall']:.3f}, "
f"F1={best['F1']:.3f}, AUROC={best['AUROC']:.3f}")
print("\nNOTE: this is a toy synthetic dataset, not UNSW-NB15. Re-run this")
print("exact sweep once Ghita's preprocessed data lands to get real numbers.")


if __name__ == "__main__":
_demo()
83 changes: 83 additions & 0 deletions isolation_forest_prototype.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
MemoryWatch - Week 1 (Prachin)
Task: Prototype minimal Isolation Forest on toy data to validate sklearn setup.

This script does NOT touch UNSW-NB15. It only proves that:
1. scikit-learn's IsolationForest trains and scores correctly in this env.
2. Unsupervised "fit on normal, flag anomalies" pattern behaves as expected
(this is the same pattern MemoryWatch will use on syscall/proc features).
3. The output shape/API matches what the shared evaluation harness (Vignesh,
Week 2+) will expect: fit(), decision_function(), predict().

Toy data: 2D Gaussian blob = "normal" behaviour, plus a handful of far-out
points = "attack-like" behaviour. This mirrors the real setup where
IsolationForest is trained on normal process behaviour and has to flag
memory-access anomalies (heap spray bursts, /proc reads) as outliers.
"""

import numpy as np
from sklearn.ensemble import IsolationForest

RANDOM_STATE = 42


def make_toy_data(n_normal: int = 200, n_anomalies: int = 10):
"""Synthetic stand-in for 'normal process behaviour' vs 'attack-like' points."""
rng = np.random.default_rng(RANDOM_STATE)

# "Normal" cluster: tight Gaussian blob (e.g. typical syscall-rate / /proc-access-rate pair)
normal = rng.normal(loc=[0, 0], scale=1.0, size=(n_normal, 2))

# "Anomalies": scattered far from the normal cluster (e.g. heap-spray burst,
# unexpected /proc/[pid]/mem read pattern)
anomalies = rng.uniform(low=-10, high=10, size=(n_anomalies, 2))
# push them away from the origin so they're unambiguously outliers
anomalies += np.sign(anomalies) * 6

X = np.vstack([normal, anomalies])
y_true = np.array([1] * n_normal + [-1] * n_anomalies) # sklearn convention: 1=normal, -1=anomaly
return X, y_true


def main():
X, y_true = make_toy_data()

# Train UNSUPERVISED (labels never touch .fit()) — same as the real pipeline
# will do: model.fit() sees only normal behaviour data, contamination is
# an assumption, not a learned quantity.
model = IsolationForest(
n_estimators=100,
contamination=0.05,
random_state=RANDOM_STATE,
)
model.fit(X)

# decision_function: higher = more normal, lower/negative = more anomalous
scores = model.decision_function(X)
# predict: 1 = normal (inlier), -1 = anomaly (outlier)
preds = model.predict(X)

n_flagged = int((preds == -1).sum())
true_anomalies = int((y_true == -1).sum())
caught = int(((preds == -1) & (y_true == -1)).sum())

print("=== IsolationForest sanity check ===")
print(f"sklearn setup OK. Samples: {len(X)}")
print(f"Score range: min={scores.min():.3f}, max={scores.max():.3f}")
print(f"Flagged as anomaly: {n_flagged} / {len(X)}")
print(f"True anomalies in toy set: {true_anomalies}")
print(f"Correctly flagged: {caught} / {true_anomalies}")

# Basic assertions -> fail loudly if the environment/API doesn't behave
# the way the rest of the pipeline will assume.
assert scores.shape == (len(X),), "decision_function output shape mismatch"
assert set(np.unique(preds)).issubset({1, -1}), "predict() labels not in {1, -1}"
assert n_flagged > 0, "Isolation Forest flagged zero anomalies on an obvious toy set"
assert caught >= true_anomalies * 0.5, "Recall on an easy toy set is suspiciously low"

print("\nAll checks passed. sklearn + IsolationForest setup is validated.")
print("Interface confirmed for Week 2: fit(X) -> decision_function(X) -> scores, predict(X) -> {1,-1}.")


if __name__ == "__main__":
main()
Loading