diff --git a/Classical detector.py b/Classical detector.py new file mode 100644 index 0000000..b5ae669 --- /dev/null +++ b/Classical detector.py @@ -0,0 +1,110 @@ +""" +MemoryWatch - Week 2 (Prachin) +Task: Agree array interface; stub model class against contract. +Dependency: W2 model interface contract (Esala). + +This stubs the classical detector as a class so it can plug into the shared +evaluation harness (Vignesh, Week 3+) and the entry point (Esala) the same +way the quantum layer eventually will. No real training logic yet — that's +Week 3. This file exists to lock the SHAPE of inputs/outputs early so +nobody downstream has to guess. + +Assumed interface (numpy-array in, numpy-array out — matches what Week 1's +prototype already validated against sklearn): + + model = ClassicalDetector(**hyperparams) + model.fit(X_train) # X_train: (n_samples, n_features) normal-only data + scores = model.score(X) # -> (n_samples,) float array, higher = more anomalous + labels = model.predict(X) # -> (n_samples,) int array, {0, 1}: 1 = flagged as attack + +NOTE: sklearn's IsolationForest natively returns 1=normal/-1=anomaly and +"lower score = more anomalous". This stub flips both so the class's public +API matches what the evaluation harness will expect everywhere in the +codebase (0/1 labels, higher score = more anomalous is more intuitive for +FPR/recall/F1/AUROC reporting). This mapping is the actual "contract" +decision below -- flag it in review if Esala's harness expects sklearn's +raw convention instead. +""" + +from __future__ import annotations + +import numpy as np +from sklearn.ensemble import IsolationForest + + +class ClassicalDetector: + """Thin wrapper around sklearn's IsolationForest with a MemoryWatch-shaped API.""" + + def __init__( + self, + n_estimators: int = 100, + contamination: float | str = "auto", + random_state: int = 42, + ): + self.n_estimators = n_estimators + self.contamination = contamination + self.random_state = random_state + self._model: IsolationForest | None = None + self._threshold: float | None = None # set in Week 3 (dynamic threshold, Shamim et al.) + + def fit(self, X: np.ndarray) -> "ClassicalDetector": + """Fit on normal-only behaviour data. X shape: (n_samples, n_features).""" + X = np.asarray(X) + if X.ndim != 2: + raise ValueError(f"Expected 2D array (n_samples, n_features), got shape {X.shape}") + + self._model = IsolationForest( + n_estimators=self.n_estimators, + contamination=self.contamination, + random_state=self.random_state, + ) + self._model.fit(X) + return self + + def score(self, X: np.ndarray) -> np.ndarray: + """Anomaly score per sample. Higher = more anomalous (flipped from sklearn's raw sign).""" + self._check_fitted() + X = np.asarray(X) + # sklearn: higher decision_function = more normal -> flip sign so + # higher = more anomalous, which is the convention the rest of the + # pipeline (dynamic threshold, FPR/recall/AUROC) will use. + return -self._model.decision_function(X) + + def predict(self, X: np.ndarray, threshold: float | None = None) -> np.ndarray: + """ + Binary labels: 1 = flagged as attack, 0 = normal. + threshold: score cutoff. If None, uses self._threshold (set by Week 3's + dynamic thresholding) and falls back to sklearn's own contamination-based + cutoff (score > 0) if no threshold has been set yet. + """ + self._check_fitted() + scores = self.score(X) + cutoff = threshold if threshold is not None else (self._threshold if self._threshold is not None else 0.0) + return (scores > cutoff).astype(int) + + def _check_fitted(self): + if self._model is None: + raise RuntimeError("Call fit(X) before score()/predict().") + + +def _smoke_test(): + """Quick self-check that the stub's shapes/types behave as declared. Not the real Week 3 test suite.""" + rng = np.random.default_rng(0) + X_train = rng.normal(size=(100, 4)) + X_test = np.vstack([rng.normal(size=(20, 4)), rng.uniform(-8, 8, size=(5, 4))]) + + model = ClassicalDetector().fit(X_train) + scores = model.score(X_test) + labels = model.predict(X_test) + + assert scores.shape == (25,), f"score() shape wrong: {scores.shape}" + assert labels.shape == (25,), f"predict() shape wrong: {labels.shape}" + assert set(np.unique(labels)).issubset({0, 1}), "predict() must return {0, 1} labels" + + print("ClassicalDetector stub OK.") + print(f" score() range: [{scores.min():.3f}, {scores.max():.3f}]") + print(f" predict() flagged: {int(labels.sum())} / {len(labels)}") + + +if __name__ == "__main__": + _smoke_test() \ No newline at end of file diff --git a/MemoryWatch_Week6_Outline_Prachin.docx b/MemoryWatch_Week6_Outline_Prachin.docx new file mode 100644 index 0000000..7d1c11a Binary files /dev/null and b/MemoryWatch_Week6_Outline_Prachin.docx differ diff --git a/evaluation_harness.py b/evaluation_harness.py new file mode 100644 index 0000000..1a0b733 --- /dev/null +++ b/evaluation_harness.py @@ -0,0 +1,134 @@ +""" +MemoryWatch - Week 5 (Prachin) +Task: Feed classical scores to harness; tune threshold (FPR vs recall). +Dependency: W4 + W3 comparison scores (Vignesh's shared evaluation harness). + +STATUS: Vignesh's real evaluation harness isn't in the repo yet (Week 2/3 +tasks: "Implement metrics module: FPR, recall, F1" / "Wrap classical model +in evaluation harness"). This file is a STAND-IN with the same metric +surface (FPR, recall, F1, AUROC -- matching the thesis's stated evaluation +methodology and Layman & Roden's finding that accuracy alone is misleading). +Swap `evaluate()` below for Vignesh's real harness call once it exists -- +the ClassicalDetector output shapes won't need to change. + +Two things happen here: + 1. evaluate(y_true, y_pred, scores) -> dict of FPR/recall/F1/AUROC, + mirroring the exact metric set from Section 5.6 of the thesis. + 2. tune_threshold(...) sweeps the dynamic-threshold sensitivity + parameter k and reports the FPR/recall/F1/AUROC trade-off at each + value, so a k can be chosen deliberately rather than guessed. +""" + +import numpy as np +from sklearn.metrics import roc_auc_score + +from classical_detector import ClassicalDetector + + +def evaluate(y_true: np.ndarray, y_pred: np.ndarray, scores: np.ndarray) -> dict: + """ + Compute FPR, recall, F1, AUROC -- the four metrics the thesis commits to + (Section 5.6). Accuracy is deliberately NOT included, following Layman + and Roden (2023) / Hesford et al. (2024): high accuracy can hide a + useless model on imbalanced attack/normal data. + + y_true: (n,) ground truth, 1 = attack, 0 = normal + y_pred: (n,) predicted labels, 1 = flagged, 0 = normal + scores: (n,) continuous anomaly scores (higher = more anomalous), + used for AUROC since it's threshold-independent + """ + y_true = np.asarray(y_true) + y_pred = np.asarray(y_pred) + scores = np.asarray(scores) + + tp = int(((y_pred == 1) & (y_true == 1)).sum()) + fp = int(((y_pred == 1) & (y_true == 0)).sum()) + tn = int(((y_pred == 0) & (y_true == 0)).sum()) + fn = int(((y_pred == 0) & (y_true == 1)).sum()) + + fpr = fp / (fp + tn) if (fp + tn) > 0 else 0.0 + recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 + precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 + f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 + + # AUROC needs both classes present in y_true, and is threshold-independent + # (uses scores, not y_pred) -- this is why it's reported separately. + try: + auroc = roc_auc_score(y_true, scores) + except ValueError: + auroc = float("nan") # only one class present in this slice + + return {"FPR": fpr, "Recall": recall, "F1": f1, "AUROC": auroc, "TP": tp, "FP": fp, "TN": tn, "FN": fn} + + +def tune_threshold(X_train_normal, groups_train, X_test, y_test, groups_test, k_values=None): + """ + Sweep the dynamic-threshold sensitivity parameter k (std multiplier + from Week 3's fit_dynamic_threshold) and report metrics at each value. + Lower k -> more sensitive -> higher recall, higher FPR. + Higher k -> stricter -> lower recall, lower FPR. + """ + if k_values is None: + k_values = [1.5, 2.0, 2.5, 3.0, 3.5, 4.0] + + results = [] + for k in k_values: + model = ClassicalDetector(random_state=42).fit(X_train_normal) + model.fit_dynamic_threshold(X_train_normal, groups_train, k=k) + + scores = model.score(X_test) + y_pred = model.predict_dynamic(X_test, groups_test) + + metrics = evaluate(y_test, y_pred, scores) + metrics["k"] = k + results.append(metrics) + + return results + + +def print_tuning_table(results): + print(f"{'k':>5} | {'FPR':>7} | {'Recall':>7} | {'F1':>7} | {'AUROC':>7}") + print("-" * 45) + for r in results: + print(f"{r['k']:>5.1f} | {r['FPR']:>7.3f} | {r['Recall']:>7.3f} | {r['F1']:>7.3f} | {r['AUROC']:>7.3f}") + + +def _demo(): + """ + Toy stand-in for real UNSW-NB15 data (Ghita's Week 2 preprocessing + output isn't in the repo yet). Same shape as the real pipeline: + train on normal-only data, evaluate on a held-out mix of normal + + attack-like points, grouped by a placeholder 'process class'. + """ + rng = np.random.default_rng(0) + + quiet_normal = rng.normal(loc=0, scale=0.4, size=(150, 4)) + noisy_normal = rng.normal(loc=0, scale=2.0, size=(150, 4)) + X_train = np.vstack([quiet_normal, noisy_normal]) + groups_train = np.array(["quiet"] * 150 + ["noisy"] * 150) + + quiet_test_normal = rng.normal(loc=0, scale=0.4, size=(60, 4)) + noisy_test_normal = rng.normal(loc=0, scale=2.0, size=(60, 4)) + quiet_attacks = rng.uniform(-10, 10, size=(15, 4)) + noisy_attacks = rng.uniform(-15, 15, size=(15, 4)) + + X_test = np.vstack([quiet_test_normal, noisy_test_normal, quiet_attacks, noisy_attacks]) + groups_test = np.array(["quiet"] * 60 + ["noisy"] * 60 + ["quiet"] * 15 + ["noisy"] * 15) + y_test = np.array([0] * 120 + [1] * 30) + + print("=== Week 5: threshold tuning (FPR vs Recall trade-off) ===\n") + results = tune_threshold(X_train, groups_train, X_test, y_test, groups_test) + print_tuning_table(results) + + # pick the k with the best F1 as a starting recommendation -- but flag + # that the real choice depends on operational priorities (Layman & + # Roden: high FPR degrades analyst performance more than it seems) + best = max(results, key=lambda r: r["F1"]) + print(f"\nBest F1 at k={best['k']}: FPR={best['FPR']:.3f}, Recall={best['Recall']:.3f}, " + f"F1={best['F1']:.3f}, AUROC={best['AUROC']:.3f}") + print("\nNOTE: this is a toy synthetic dataset, not UNSW-NB15. Re-run this") + print("exact sweep once Ghita's preprocessed data lands to get real numbers.") + + +if __name__ == "__main__": + _demo() \ No newline at end of file diff --git a/isolation_forest_prototype.py b/isolation_forest_prototype.py new file mode 100644 index 0000000..ec1e9fb --- /dev/null +++ b/isolation_forest_prototype.py @@ -0,0 +1,83 @@ +""" +MemoryWatch - Week 1 (Prachin) +Task: Prototype minimal Isolation Forest on toy data to validate sklearn setup. + +This script does NOT touch UNSW-NB15. It only proves that: + 1. scikit-learn's IsolationForest trains and scores correctly in this env. + 2. Unsupervised "fit on normal, flag anomalies" pattern behaves as expected + (this is the same pattern MemoryWatch will use on syscall/proc features). + 3. The output shape/API matches what the shared evaluation harness (Vignesh, + Week 2+) will expect: fit(), decision_function(), predict(). + +Toy data: 2D Gaussian blob = "normal" behaviour, plus a handful of far-out +points = "attack-like" behaviour. This mirrors the real setup where +IsolationForest is trained on normal process behaviour and has to flag +memory-access anomalies (heap spray bursts, /proc reads) as outliers. +""" + +import numpy as np +from sklearn.ensemble import IsolationForest + +RANDOM_STATE = 42 + + +def make_toy_data(n_normal: int = 200, n_anomalies: int = 10): + """Synthetic stand-in for 'normal process behaviour' vs 'attack-like' points.""" + rng = np.random.default_rng(RANDOM_STATE) + + # "Normal" cluster: tight Gaussian blob (e.g. typical syscall-rate / /proc-access-rate pair) + normal = rng.normal(loc=[0, 0], scale=1.0, size=(n_normal, 2)) + + # "Anomalies": scattered far from the normal cluster (e.g. heap-spray burst, + # unexpected /proc/[pid]/mem read pattern) + anomalies = rng.uniform(low=-10, high=10, size=(n_anomalies, 2)) + # push them away from the origin so they're unambiguously outliers + anomalies += np.sign(anomalies) * 6 + + X = np.vstack([normal, anomalies]) + y_true = np.array([1] * n_normal + [-1] * n_anomalies) # sklearn convention: 1=normal, -1=anomaly + return X, y_true + + +def main(): + X, y_true = make_toy_data() + + # Train UNSUPERVISED (labels never touch .fit()) — same as the real pipeline + # will do: model.fit() sees only normal behaviour data, contamination is + # an assumption, not a learned quantity. + model = IsolationForest( + n_estimators=100, + contamination=0.05, + random_state=RANDOM_STATE, + ) + model.fit(X) + + # decision_function: higher = more normal, lower/negative = more anomalous + scores = model.decision_function(X) + # predict: 1 = normal (inlier), -1 = anomaly (outlier) + preds = model.predict(X) + + n_flagged = int((preds == -1).sum()) + true_anomalies = int((y_true == -1).sum()) + caught = int(((preds == -1) & (y_true == -1)).sum()) + + print("=== IsolationForest sanity check ===") + print(f"sklearn setup OK. Samples: {len(X)}") + print(f"Score range: min={scores.min():.3f}, max={scores.max():.3f}") + print(f"Flagged as anomaly: {n_flagged} / {len(X)}") + print(f"True anomalies in toy set: {true_anomalies}") + print(f"Correctly flagged: {caught} / {true_anomalies}") + + # Basic assertions -> fail loudly if the environment/API doesn't behave + # the way the rest of the pipeline will assume. + assert scores.shape == (len(X),), "decision_function output shape mismatch" + assert set(np.unique(preds)).issubset({1, -1}), "predict() labels not in {1, -1}" + assert n_flagged > 0, "Isolation Forest flagged zero anomalies on an obvious toy set" + assert caught >= true_anomalies * 0.5, "Recall on an easy toy set is suspiciously low" + + print("\nAll checks passed. sklearn + IsolationForest setup is validated.") + print("Interface confirmed for Week 2: fit(X) -> decision_function(X) -> scores, predict(X) -> {1,-1}.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_classical_detector.py b/test_classical_detector.py new file mode 100644 index 0000000..a3113d2 --- /dev/null +++ b/test_classical_detector.py @@ -0,0 +1,204 @@ +""" +MemoryWatch - Week 4 (Prachin) +Task: Stabilise baseline; add unit tests; document score ranges. +Dependency: W3 baseline (classical_detector.py). + +Run with: + pytest test_classical_detector.py -v + +These tests cover: + - the plain fit/score/predict path (Week 2 interface) + - the dynamic per-group threshold path (Week 3) + - error handling for misuse (unfitted model, bad shapes, mismatched arrays) + - reproducibility (same random_state -> same output) + - documented score-range expectations (see SCORE_RANGE_NOTES below) +""" + +import numpy as np +import pytest + +from classical_detector import ClassicalDetector + + +# --------------------------------------------------------------------------- +# Score range documentation (part of the Week 4 task, not just a comment). +# +# ClassicalDetector.score() = -sklearn_IsolationForest.decision_function(X) +# +# sklearn's decision_function() is centred so that: +# - 0 is roughly the boundary between "normal" and "anomalous" learned +# from the training contamination assumption +# - typical values fall in roughly [-0.5, 0.5], though this is NOT a hard +# mathematical bound -- it comes from how path-length averaging works +# across the forest, and can exceed that range for very extreme outliers +# or very small/unusual trees. +# +# Because ClassicalDetector flips the sign, in OUR convention: +# - higher score = more anomalous +# - lower score = more normal +# - values are NOT guaranteed to be in [0, 1] or any fixed interval -- +# do not assume a fixed range when setting a manual threshold; use +# fit_dynamic_threshold() or calibrate against a validation split. +# --------------------------------------------------------------------------- +SCORE_RANGE_NOTES = """ +score() range is data-dependent (NOT a fixed [0,1] or [-1,1] interval). +Empirically, well-behaved datasets fall roughly within [-0.5, 0.5] before +the sign flip. Do not hardcode a threshold based on this range -- always +compute it from the normal-training-data distribution +(fit_dynamic_threshold) or validate empirically on held-out data. +""" + + +@pytest.fixture +def rng(): + return np.random.default_rng(0) + + +@pytest.fixture +def normal_data(rng): + return rng.normal(size=(100, 4)) + + +@pytest.fixture +def fitted_model(normal_data): + return ClassicalDetector(random_state=42).fit(normal_data) + + +# --- fit / score / predict (Week 2 interface) ------------------------------- + +def test_fit_returns_self(normal_data): + model = ClassicalDetector() + result = model.fit(normal_data) + assert result is model + + +def test_fit_rejects_1d_input(): + model = ClassicalDetector() + with pytest.raises(ValueError): + model.fit(np.array([1, 2, 3])) + + +def test_score_shape_matches_input(fitted_model, rng): + X_test = rng.normal(size=(15, 4)) + scores = fitted_model.score(X_test) + assert scores.shape == (15,) + assert scores.dtype.kind == "f" + + +def test_predict_returns_binary_labels(fitted_model, rng): + X_test = rng.normal(size=(15, 4)) + labels = fitted_model.predict(X_test) + assert labels.shape == (15,) + assert set(np.unique(labels)).issubset({0, 1}) + + +def test_score_before_fit_raises(): + model = ClassicalDetector() + with pytest.raises(RuntimeError): + model.score(np.zeros((5, 4))) + + +def test_predict_before_fit_raises(): + model = ClassicalDetector() + with pytest.raises(RuntimeError): + model.predict(np.zeros((5, 4))) + + +def test_obvious_outliers_score_higher_than_normal(fitted_model, rng): + """Attack-like points should score as MORE anomalous than in-distribution points.""" + normal_like = rng.normal(size=(20, 4)) + far_outliers = rng.uniform(-20, 20, size=(20, 4)) + + normal_scores = fitted_model.score(normal_like) + outlier_scores = fitted_model.score(far_outliers) + + assert outlier_scores.mean() > normal_scores.mean() + + +# --- dynamic per-group threshold (Week 3) ----------------------------------- + +def test_dynamic_threshold_mismatched_lengths_raises(fitted_model, normal_data): + bad_groups = np.array(["a"] * (len(normal_data) - 1)) # one short + with pytest.raises(ValueError): + fitted_model.fit_dynamic_threshold(normal_data, bad_groups) + + +def test_predict_dynamic_without_fit_dynamic_raises(fitted_model, rng): + X_test = rng.normal(size=(10, 4)) + groups = np.array(["a"] * 10) + with pytest.raises(RuntimeError): + fitted_model.predict_dynamic(X_test, groups) + + +def test_dynamic_thresholds_differ_across_groups(rng): + """Two process classes with different natural variance should get + different thresholds -- this is the whole point of per-group + thresholding (Shamim et al.) over a single global cutoff.""" + quiet = rng.normal(loc=0, scale=0.3, size=(60, 4)) + noisy = rng.normal(loc=0, scale=3.0, size=(60, 4)) + X = np.vstack([quiet, noisy]) + groups = np.array(["quiet"] * 60 + ["noisy"] * 60) + + model = ClassicalDetector(random_state=1).fit(X) + model.fit_dynamic_threshold(X, groups, k=3.0) + + assert model._group_thresholds["quiet"] != model._group_thresholds["noisy"] + # the noisier class should tolerate a higher raw score before flagging + assert model._group_thresholds["noisy"] > model._group_thresholds["quiet"] + + +def test_predict_dynamic_low_false_positive_rate_on_normal_data(rng): + """On in-distribution data from the SAME groups used to fit the + threshold, false positive rate should be low (not necessarily zero -- + a k=3 std threshold still has some tail probability).""" + quiet = rng.normal(loc=0, scale=0.3, size=(100, 4)) + noisy = rng.normal(loc=0, scale=3.0, size=(100, 4)) + X_train = np.vstack([quiet, noisy]) + groups_train = np.array(["quiet"] * 100 + ["noisy"] * 100) + + model = ClassicalDetector(random_state=1).fit(X_train) + model.fit_dynamic_threshold(X_train, groups_train, k=3.0) + + # fresh in-distribution test data, same groups + quiet_test = rng.normal(loc=0, scale=0.3, size=(50, 4)) + noisy_test = rng.normal(loc=0, scale=3.0, size=(50, 4)) + X_test = np.vstack([quiet_test, noisy_test]) + groups_test = np.array(["quiet"] * 50 + ["noisy"] * 50) + + labels = model.predict_dynamic(X_test, groups_test) + false_positive_rate = labels.mean() + assert false_positive_rate < 0.15 # generous bound for a toy random dataset + + +def test_predict_dynamic_unseen_group_falls_back_to_global(rng): + X_train = rng.normal(size=(80, 4)) + groups_train = np.array(["known"] * 80) + + model = ClassicalDetector(random_state=1).fit(X_train) + model.fit_dynamic_threshold(X_train, groups_train, k=3.0) + + X_test = rng.normal(size=(5, 4)) + groups_test = np.array(["never_seen_before"] * 5) + + # should not raise -- falls back to self._threshold + labels = model.predict_dynamic(X_test, groups_test) + assert labels.shape == (5,) + + +# --- reproducibility --------------------------------------------------------- + +def test_same_random_state_gives_same_scores(normal_data, rng): + X_test = rng.normal(size=(10, 4)) + model_a = ClassicalDetector(random_state=7).fit(normal_data) + model_b = ClassicalDetector(random_state=7).fit(normal_data) + np.testing.assert_array_equal(model_a.score(X_test), model_b.score(X_test)) + + +def test_different_random_state_can_differ(normal_data, rng): + X_test = rng.normal(size=(10, 4)) + model_a = ClassicalDetector(random_state=1).fit(normal_data) + model_b = ClassicalDetector(random_state=2).fit(normal_data) + # not asserting inequality strictly (could coincidentally match), just + # documenting that random_state controls reproducibility, not a fixed + # deterministic-regardless-of-seed algorithm + assert model_a.score(X_test).shape == model_b.score(X_test).shape \ No newline at end of file