From ffebaa4df3a479a4061b75611d3151d7ccdde67c Mon Sep 17 00:00:00 2001 From: Will Metcalf Date: Sun, 19 Apr 2026 21:38:33 -0500 Subject: [PATCH 1/6] Process-to-network attribution for sandbox analyses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tie every network artifact CAPE captures (suricata alerts/tls/http/files, network.tcp/udp/dns/hosts) back to the originating Windows process so analysts and downstream signatures can answer "which process did this?" without manual correlation work. Sources, in confidence order: 1. Sysmon Event ID 3 (NetworkConnect) from evtx.zip — full image path, destination hostname, src/dst 5-tuple 2. Microsoft-Windows-Kernel-Network ETW captured live by a new analyzer auxiliary, periodically uploaded so attribution survives crashes 3. Sigma EID 3 matched events — tertiary catch for late-fire flows 4. DNS-Client ETW (originating-process DNS) cross-referenced with suricata.dns/network.dns/network.hosts/sigma EID 22 QueryResults to attribute IPs we never saw a direct connect for. Avoids the "everything routes to svchost (dnscache)" failure mode. 5. Sysmon EID 22 (DnsQuery) — covers queries that fired before the DNS-ETW auxiliary subscribed (early CDN resolutions, etc.) 6. Sysmon EID 1 (ProcessCreate) — names processes that aren't monitored by capemon but show up in connection/DNS data A single AttributionIndex consumes all sources; per-target enrichment (suricata.alerts/tls/http/files, network.tcp/udp/dns/hosts, sigma detections) goes through one of four query methods. 5-tuple matching with src_port disambiguates multi-process flows to the same destination. New analyzer aux: analyzer/windows/modules/auxiliary/network_etw.py Captures TCP/UDP connect events from Microsoft-Windows-Kernel-Network with periodic full-snapshot uploads. Off by default in auxiliary.conf.default — opt-in with [Network_ETW] enabled = yes. Stop-priority on Network_ETW + Evtx: Set stop_priority = -20 so they shut down AFTER the capemon-related auxiliaries. Late-fire C2 callbacks that fire between the analysis- stop signal and VM teardown still get captured + attributed. Result-server transport: resultserver.py refactor (allowlisted RESULT_UPLOADABLE + RESULT_DIRECTORIES) tightens path-traversal protection and adds an is_replaceable_result_upload() helper so periodic re-uploads from the auxiliaries (tlsdump.log, dns_etw.json, network_etw.json, wmi_etw.json, sslkeylogfile/sslkeys.log) truncate-write instead of silently failing with EEXIST after the first upload. Decryption pipeline: decryptpcap.py wraps gogorobocap to produce dump_decrypted.pcap + dump_mixed.pcap. network.py + suricata.py honour a new pcapsrc config knob (auto/original/mixed/decrypted) so users explicitly choose which pcap variant to analyse. UI: _suricata_http.html, _suricata_files.html, _hosts.html render process attribution columns/rows, gated on the existing NETWORK_PROC_MAP setting. _hosts.html shows multi-process attribution as badges with hover tooltips that explain the attribution chain (direct connect / DNS-resolved / etc). Pre-existing "asn cell skipped when empty causes column shift" bug fixed in the same edit. Tests: tests/test_network_capture_integration.py covers AttributionIndex build + query, pcapsrc resolution, and replaceable-upload behaviour. --- analyzer/windows/modules/auxiliary/evtx.py | 9 + .../windows/modules/auxiliary/network_etw.py | 242 ++++++ conf/default/auxiliary.conf.default | 1 + conf/default/processing.conf.default | 17 +- lib/cuckoo/core/resultserver.py | 26 +- modules/processing/decryptpcap.py | 50 ++ modules/processing/network.py | 6 + modules/processing/network_etw.py | 737 ++++++++++++++++++ modules/processing/suricata.py | 6 + tests/test_network_capture_integration.py | 178 +++++ web/templates/analysis/network/_hosts.html | 19 +- .../analysis/network/_suricata_files.html | 4 + .../analysis/network/_suricata_http.html | 30 +- 13 files changed, 1301 insertions(+), 24 deletions(-) create mode 100644 analyzer/windows/modules/auxiliary/network_etw.py create mode 100644 modules/processing/network_etw.py create mode 100644 tests/test_network_capture_integration.py diff --git a/analyzer/windows/modules/auxiliary/evtx.py b/analyzer/windows/modules/auxiliary/evtx.py index ccd1fd6f847..e9cfc3469e6 100644 --- a/analyzer/windows/modules/auxiliary/evtx.py +++ b/analyzer/windows/modules/auxiliary/evtx.py @@ -12,6 +12,15 @@ class Evtx(Thread, Auxiliary): + # Stop AFTER capemon-related auxiliaries so the final EVTX snapshot + # captures sysmon events from late-fire callbacks that fire between + # the analysis-stopping signal and the VM teardown (e.g. C2 callbacks + # the malware schedules after a delay). Without this priority bump, + # those events happen after the last EVTX snapshot and never reach + # the host-side processing modules. + start_priority = 0 + stop_priority = -20 + evtx_dump = "evtx.zip" # Event log channels to collect diff --git a/analyzer/windows/modules/auxiliary/network_etw.py b/analyzer/windows/modules/auxiliary/network_etw.py new file mode 100644 index 00000000000..283e9bf42f9 --- /dev/null +++ b/analyzer/windows/modules/auxiliary/network_etw.py @@ -0,0 +1,242 @@ +import json +import logging +import os +import shutil +import socket +import time +from threading import Thread + +from lib.common.results import upload_to_host +from lib.common.rand import random_string +from lib.core.config import Config +from lib.common.etw_utils import ( + ETWAuxiliaryWrapper, + ETWProviderWrapper, + HAVE_ETW, + ProviderInfo, + GUID, + et, + encode, +) + +log = logging.getLogger(__name__) + +__author__ = "DNS-GEE-O (@wmetcalf)" + +KERNEL_NETWORK_GUID = "{7DD42A49-5329-4832-8DFD-43D979153A88}" + +CONNECT_EVENT_IDS = [12, 15, 28, 31, 42, 58] + +EVENT_NAMES = { + 12: "tcp_connect_v4", + 15: "tcp_accept_v4", + 28: "tcp_connect_v6", + 31: "tcp_accept_v6", + 42: "udp_send_v4", + 58: "udp_send_v6", +} + +# Periodic upload interval in seconds +UPLOAD_INTERVAL = 15 + + +if HAVE_ETW: + + class NetworkETWProvider(ETWProviderWrapper): + def __init__( + self, + level=et.TRACE_LEVEL_INFORMATION, + logfile=None, + no_conout=False, + any_keywords=None, + all_keywords=None, + filter_ips=None, + filter_ports=None, + ): + self._filter_ips = filter_ips or set() + self._filter_ports = filter_ports or set() + + providers = [ + ProviderInfo( + "Microsoft-Windows-Kernel-Network", + GUID(KERNEL_NETWORK_GUID), + level, + any_keywords or 0x30, + all_keywords, + ) + ] + super().__init__( + session_name="ETW_KernelNetwork", + providers=providers, + event_id_filters=CONNECT_EVENT_IDS, + logfile=logfile, + no_conout=no_conout, + ) + + def _should_filter(self, event, event_id): + src_ip = str(event.get("saddr", "")) + dst_ip = str(event.get("daddr", "")) + src_port = event.get("sport", 0) + dst_port = event.get("dport", 0) + + # Try int conversion for port comparison + try: + src_port = int(src_port) + except (ValueError, TypeError): + pass + try: + dst_port = int(dst_port) + except (ValueError, TypeError): + pass + + if dst_ip in self._filter_ips: + return True + if event_id in (15, 31) and src_ip in self._filter_ips: + return True + if dst_port in self._filter_ports or src_port in self._filter_ports: + return True + if dst_ip in ("127.0.0.1", "::1", "0.0.0.0", ""): + return True + return False + + def on_event(self, event_tufo): + event_id, event = event_tufo + if event_id not in self.event_id_filters: + return + if self._should_filter(event, event_id): + return + if self.logfile: + self.write_to_log(self.logfile, event_id, event) + + def write_to_log(self, file_handle, event_id, event): + header = event.get("EventHeader", {}) + pid = event.get("PID") or header.get("ProcessId", 0) + proto = "TCP" if event_id in (12, 15, 28, 31) else "UDP" + direction = "outbound" if event_id in (12, 28, 42, 58) else "inbound" + + entry = { + "event_type": EVENT_NAMES.get(event_id, "unknown"), + "event_id": event_id, + "pid": pid, + "protocol": proto, + "direction": direction, + "src_ip": str(event.get("saddr", "")), + "src_port": event.get("sport", 0), + "dst_ip": str(event.get("daddr", "")), + "dst_port": event.get("dport", 0), + "timestamp": str(header.get("TimeStamp", "")), + } + connid = event.get("connid") + if connid: + entry["connid"] = connid + + json.dump(entry, file_handle) + file_handle.write("\n") + + +class Network_ETW(ETWAuxiliaryWrapper): + """Captures TCP/UDP connection events via Microsoft-Windows-Kernel-Network ETW. + + Provides process-to-network 5-tuple mapping. + Periodically uploads captured data to ensure availability if analysis + terminates unexpectedly. + + Output: aux/network_etw.json (NDJSON) + """ + + # Stop AFTER capemon-related modules so late-firing network calls get attributed + start_priority = 0 + stop_priority = -20 + + def __init__(self, options, config): + super().__init__(options, config, "network_etw") + + self.output_dir = os.path.join("C:\\", random_string(5, 10)) + try: + os.mkdir(self.output_dir) + except FileExistsError: + pass + + self.log_file_path = os.path.join(self.output_dir, "%s.log" % random_string(5, 10)) + self.log_file = None + self._do_periodic = False + self._periodic_thread = None + + if HAVE_ETW and self.enabled: + filter_ips = set() + filter_ports = set() + + try: + analysis_cfg = Config(cfg="analysis.conf") + host_ip = getattr(analysis_cfg, "ip", "") + if host_ip: + filter_ips.add(host_ip) + rs_port = getattr(analysis_cfg, "port", 0) + if rs_port: + filter_ports.add(int(rs_port)) + except Exception as e: + log.debug("Could not read analysis config for filters: %s", e) + + filter_ports.add(8000) + filter_ports.add(53) + + log.info("NetworkETW filters: ips=%s ports=%s", filter_ips, filter_ports) + + try: + self.log_file = open(self.log_file_path, "w", encoding="utf-8") + self.capture = NetworkETWProvider( + logfile=self.log_file, + level=255, + no_conout=True, + filter_ips=filter_ips, + filter_ports=filter_ports, + ) + except Exception as e: + log.error("Failed to open Network ETW log file: %s", e) + + def start(self): + result = super().start() + # Start periodic upload thread + if self.enabled and self.log_file: + self._do_periodic = True + self._periodic_thread = Thread(target=self._periodic_upload, daemon=True) + self._periodic_thread.start() + return result + + def _periodic_upload(self): + """Periodically flush and upload current data.""" + while self._do_periodic: + for _ in range(UPLOAD_INTERVAL): + if not self._do_periodic: + break + time.sleep(1) + if self._do_periodic and self.log_file: + try: + self.log_file.flush() + # Copy the file so we don't interfere with ongoing writes + snap_path = self.log_file_path + ".snap" + shutil.copy2(self.log_file_path, snap_path) + upload_to_host(snap_path, os.path.join("aux", "network_etw.json")) + log.debug("Periodic network_etw upload: %d bytes", os.path.getsize(snap_path)) + os.remove(snap_path) + except Exception as e: + log.debug("Periodic network_etw upload failed: %s", e) + + def upload_results(self): + """Final upload on stop.""" + self._do_periodic = False + if self._periodic_thread: + self._periodic_thread.join(timeout=5) + + if self.log_file: + try: + self.log_file.close() + except Exception: + pass + self.log_file = None + + if os.path.isfile(self.log_file_path) and os.path.getsize(self.log_file_path) > 0: + try: + upload_to_host(self.log_file_path, os.path.join("aux", "network_etw.json")) + except Exception as e: + log.error("Final network_etw upload failed: %s", e) diff --git a/conf/default/auxiliary.conf.default b/conf/default/auxiliary.conf.default index 3e7b137f452..21018cdb1db 100644 --- a/conf/default/auxiliary.conf.default +++ b/conf/default/auxiliary.conf.default @@ -32,6 +32,7 @@ browsermonitor = no wmi_etw = no dns_etw = no amsi_etw = no +network_etw = no watchdownloads = no [AzSniffer] diff --git a/conf/default/processing.conf.default b/conf/default/processing.conf.default index 8fe384c0eaa..b453a1ede97 100644 --- a/conf/default/processing.conf.default +++ b/conf/default/processing.conf.default @@ -101,6 +101,9 @@ enabled = no [network] enabled = yes sort_pcap = no +# Which capture to analyze when decryptpcap produced additional outputs: +# auto | original | mixed | decrypted +pcapsrc = auto # Enable mapping of network events to specific processes using behavioral analysis data process_map = no # Adds network connections seen in behavior but not in PCAP. Requires process_map = yes @@ -181,6 +184,9 @@ urlscrub = (^http:\/\/serw\.clicksor\.com\/redir\.php\?url=|&InjectedParam=.+$) [suricata] enabled = no runmode = cli +# Which capture to analyze when decryptpcap produced additional outputs: +# auto | original | mixed | decrypted +pcapsrc = auto # Outputfiles # if evelog is specified, it will be used instead of the per-protocol log files evelog = eve.json @@ -248,6 +254,16 @@ file_cache = no # Store pefile objects for later usage? useful if you doing something in signatures/reporting pefile_store = no +[decryptpcap] +enabled = no +gogorobocap = data/gogorobocap/gogorobocap-linux-amd64 +# Select how decrypted captures are generated: +# auto | pcap_with_keylog | sslproxy_synth_pcap +pcapsrc = auto + +[network_etw] +enabled = no + # Deduplicate screenshots - You need to install dependency ImageHash>=4.3.1 [deduplication] # @@ -348,4 +364,3 @@ enabled = no # Enable when using the PolarProxy option during analysis. This will merge the tls.pcap containing # plain-text TLS streams into the task PCAP. enabled = no - diff --git a/lib/cuckoo/core/resultserver.py b/lib/cuckoo/core/resultserver.py index 7e675d335d2..fa98dc259e3 100644 --- a/lib/cuckoo/core/resultserver.py +++ b/lib/cuckoo/core/resultserver.py @@ -81,6 +81,14 @@ RESULT_DIRECTORIES = RESULT_UPLOADABLE + (b"reports", b"logs") +REPLACEABLE_RESULT_UPLOADS = ( + b"tlsdump/", + b"aux/dns_etw.json", + b"aux/network_etw.json", + b"aux/wmi_etw.json", + b"aux/sslkeylogfile/sslkeys.log", +) + def netlog_sanitize_fname(path): """Validate agent-provided path for result files""" @@ -90,11 +98,17 @@ def netlog_sanitize_fname(path): raise CuckooOperationalError(f"Netlog client requested banned path: {path}") if any(c in BANNED_PATH_CHARS for c in name): for c in BANNED_PATH_CHARS: - path.replace(bytes([c]), b"X") + path = path.replace(bytes([c]), b"X") return path +def is_replaceable_result_upload(path): + """Return True for result uploads that are expected to overwrite prior + content with a full snapshot rather than append a distinct artifact.""" + return path.startswith(REPLACEABLE_RESULT_UPLOADS) + + class Disconnect(Exception): pass @@ -254,7 +268,15 @@ def handle(self): try: if file_path.endswith("_script.log"): self.fd = open_inclusive(file_path) - elif not path_exists(file_path): + elif is_replaceable_result_upload(dump_path) and path_exists(file_path): + # Auxiliary modules (tlsdump, network_etw, sslkeylogfile…) + # upload the SAME dump_path periodically so accumulated + # key / connection data survives an unexpected analysis + # termination. Each upload is a full replacement of the + # prior content — truncate and rewrite rather than failing + # silently with EEXIST. + self.fd = open(file_path, "wb") + else: # open_exclusive will fail if file_path already exists self.fd = open_exclusive(file_path) except OSError as e: diff --git a/modules/processing/decryptpcap.py b/modules/processing/decryptpcap.py index ae1adf71cd1..72e4bc9d3c5 100644 --- a/modules/processing/decryptpcap.py +++ b/modules/processing/decryptpcap.py @@ -14,6 +14,56 @@ PCAP_HEADER_SIZE = 24 +def _get_option(options, key, default=None): + """Read `key` from `options` whether it's a dict-like or an attribute bag. + + CAPE's processing modules receive `self.options` from different callers + in slightly different shapes (dict from task options, namespace from + config parsing). This helper handles both without TypeErrors.""" + if options is None: + return default + getter = getattr(options, "get", None) + if callable(getter): + try: + return getter(key, default) + except TypeError: + pass + return getattr(options, key, default) + + +def _is_usable_pcap(path): + return bool(path and os.path.exists(path) and os.path.getsize(path) > PCAP_HEADER_SIZE) + + +def resolve_processing_pcap_path(analysis_path, default_pcap_path, pcapsrc="auto"): + """Pick the best PCAP for downstream processing modules. + + `pcapsrc` may explicitly request `mixed`, `decrypted`, or `original`. + Any other value falls back to auto-selection: prefer `dump_mixed.pcap`, + then `dump_decrypted.pcap`, then the original capture. + """ + mixed_path = os.path.join(analysis_path, "dump_mixed.pcap") + decrypted_path = os.path.join(analysis_path, "dump_decrypted.pcap") + requested = (pcapsrc or "auto").lower() + + explicit = { + "mixed": mixed_path, + "decrypted": decrypted_path, + "original": default_pcap_path, + "default": default_pcap_path, + "dump": default_pcap_path, + } + if requested in explicit: + candidate = explicit[requested] + return candidate if candidate == default_pcap_path or _is_usable_pcap(candidate) else default_pcap_path + + for candidate in (mixed_path, decrypted_path, default_pcap_path): + if candidate == default_pcap_path or _is_usable_pcap(candidate): + return candidate + + return default_pcap_path + + class DecryptPcap(Processing): """Generate decrypted pcaps from TLS traffic using GoGoRoboCap. diff --git a/modules/processing/network.py b/modules/processing/network.py index bb0b814a32a..372efd8605b 100644 --- a/modules/processing/network.py +++ b/modules/processing/network.py @@ -41,6 +41,7 @@ from lib.cuckoo.common.path_utils import path_delete, path_exists, path_mkdir, path_read_file, path_write_file from lib.cuckoo.common.safelist import is_safelisted_domain from lib.cuckoo.common.utils import convert_to_printable +from modules.processing.decryptpcap import resolve_processing_pcap_path # from lib.cuckoo.common.safelist import is_safelisted_ip log = logging.getLogger(__name__) @@ -1561,7 +1562,12 @@ def _merge_behavior_network(self, network): target_list = "udp" if port == 53 else "tcp" network.setdefault(target_list, []).append(entry) + def _resolve_pcap_path(self): + pcapsrc = self.options.get("pcapsrc", "auto") if self.options else "auto" + return resolve_processing_pcap_path(self.analysis_path, self.pcap_path, pcapsrc=pcapsrc) + def run(self): + self.pcap_path = self._resolve_pcap_path() if not path_exists(self.pcap_path): log.debug('The PCAP file does not exist at path "%s"', self.pcap_path) return {} diff --git a/modules/processing/network_etw.py b/modules/processing/network_etw.py new file mode 100644 index 00000000000..17ac65a8759 --- /dev/null +++ b/modules/processing/network_etw.py @@ -0,0 +1,737 @@ +# Process→Network attribution for CAPE. +# +# Consumes every process-to-network signal captured during analysis and feeds +# it into a single AttributionIndex. Each enrichment target (suricata alerts, +# tls, http, files; network.tcp/udp/dns/hosts; sigma detections) queries the +# index through one of four methods — no target-specific lookup tables. +# +# Signal sources, highest-confidence first: +# 1. Sysmon EID 3 (NetworkConnect) — from evtx.zip. Full image path. +# 2. Kernel-Network ETW (aux/network_etw.json) — periodic uploads. +# 3. Sigma EID 3 matched_events — tertiary (catches late-fire flows). +# 4. DNS-Client ETW (aux/dns_etw.json) — originating-process DNS. Used with +# resolution data (#5) to attribute by resolved IP. Avoids svchost bias. +# 5. Sysmon EID 22 (DnsQuery) — parallel to #4, covers queries that happened +# before DNS-Client ETW subscribed (common miss for early CDN resolutions). +# 6. Sigma EID 22 matched_events — subset of #5, has Image in the record. +# 7. Resolution data — suricata.dns, network.dns, network.hosts, sigma EID 22 +# QueryResults. Used to turn hostnames into IPs for the DNS cross-reference. +# 8. Sysmon EID 1 (ProcessCreate) — pid→image map for processes that made +# queries but aren't in behavior (capemon not attached). + +import json +import logging +import os +import shutil +import tempfile +import xml.etree.ElementTree as ET +import zipfile + +from lib.cuckoo.common.abstracts import Processing + +# Sysmon Event XML lives in this namespace; ElementTree returns tag names +# already qualified, so we strip the prefix when reading element names. +EVT_NS = "{http://schemas.microsoft.com/win/2004/08/events/event}" + +log = logging.getLogger(__name__) + +try: + import Evtx.Evtx as EvtxParser + HAVE_EVTX = True +except ImportError: + HAVE_EVTX = False + + +def _clean_ip(s): + if not s: + return "" + s = s.strip() + if s.startswith("::ffff:"): + s = s[7:] + return s + + +def _clean_host(s): + if not s: + return "" + return s.strip().rstrip(".").lower() + + +class AttributionIndex: + """Centralized process-to-network attribution index. + + Call order: + idx = AttributionIndex() + idx.add_pid_name(...) # any number of times + idx.add_connection(...) # any number of times + idx.add_dns_query(...) + idx.add_resolution(...) + idx.finalize() + idx.for_flow(...) # query API + ... + idx.set_http_owner(...) # after suricata.http enrichment + idx.for_http(...) # for files lookup + """ + + def __init__(self): + self._pid_to_name = {} # pid_str -> basename + self._by_ip = {} # ip -> [{pid, process_name, dst_port, protocol, source}] + self._dns_host_to_pid = {} # host -> (pid_str, name, source) + self._host_to_ips = {} # host -> set(ip) + self._ip_via_dns = {} # ip -> [(pid_str, host)] + self._http_by_uri = {} # (host, uri) -> (pid_str, name) + self._http_by_host = {} # host -> (pid_str, name) + # Counters surfaced via .stats() for logging + self.stats_counters = {"dns_etw": 0, "sysmon_eid22": 0, + "sigma_eid22": 0, "direct": 0, + "resolutions": 0} + + # ------------------------------------------------------------------ seed + def add_pid_name(self, pid, image_or_name): + if not pid or not image_or_name: + return + pid = str(pid) + name = os.path.basename(image_or_name) + self._pid_to_name.setdefault(pid, name) + for entries in self._by_ip.values(): + for entry in entries: + if entry["pid"] == pid and not entry["process_name"]: + entry["process_name"] = self._pid_to_name[pid] + + def name_of(self, pid): + return self._pid_to_name.get(str(pid), "") if pid else "" + + def pid_names(self): + """Read-only view of the pid->name map. Callers that need to seed + another helper with the current names should use this rather than + the underscored attribute.""" + return dict(self._pid_to_name) + + def add_connection(self, pid, dst_ip, dst_port=None, src_ip="", + src_port=None, protocol="", process_name="", source=""): + """Direct connection observed (kernel-ETW, sysmon EID 3, sigma EID 3). + src_port is the disambiguator when multiple processes share a (dst_ip, + dst_port) — every TCP flow has a unique client-side ephemeral port.""" + if not pid or not dst_ip: + return + pid = str(pid) + dst_ip = _clean_ip(dst_ip) + if not dst_ip or dst_ip in ("127.0.0.1", "::1", "0.0.0.0"): + return + if process_name: + self.add_pid_name(pid, process_name) + name = self._pid_to_name.get(pid, process_name or "") + entry = { + "pid": pid, + "process_name": name, + "src_ip": _clean_ip(src_ip), + "src_port": str(src_port) if src_port is not None and src_port != "" else "", + "dst_port": str(dst_port) if dst_port is not None and dst_port != "" else "", + "protocol": (protocol or "").upper(), + "source": source, + } + bucket = self._by_ip.setdefault(dst_ip, []) + # Dedupe on (pid, src_port, dst_port, protocol) so the same connection + # observed by multiple sources doesn't multiply, but distinct flows + # from the same process (different src_ports) stay separate. + dedup_key = (pid, entry["src_port"], entry["dst_port"], entry["protocol"]) + for existing in bucket: + if (existing["pid"], existing["src_port"], existing["dst_port"], + existing["protocol"]) == dedup_key: + return + bucket.append(entry) + self.stats_counters["direct"] += 1 + + def add_dns_query(self, pid, hostname, image_or_name="", source=""): + """pid asked for hostname (DNS-Client ETW / sysmon EID 22 / sigma EID 22).""" + if not pid or not hostname: + return + pid = str(pid) + h = _clean_host(hostname) + if not h: + return + if image_or_name: + self.add_pid_name(pid, image_or_name) + name = self._pid_to_name.get(pid, "") + # Skip positively-identified svchost (dnscache doing a delegated lookup + # isn't the real owner). Unknown names are kept — PID alone is still + # useful attribution. + if name and "svchost" in name.lower(): + return + self._dns_host_to_pid.setdefault(h, (pid, name, source)) + if source in self.stats_counters: + self.stats_counters[source] += 1 + + def add_resolution(self, hostname, ip): + """hostname resolves to ip (suricata.dns, network.dns, network.hosts, sigma EID 22).""" + h = _clean_host(hostname) + ip = _clean_ip(ip) + if not h or not ip: + return + # Basic garbage filter + if ":" in ip and ip.count(":") < 2: + return + self._host_to_ips.setdefault(h, set()).add(ip) + self.stats_counters["resolutions"] += 1 + + # --------------------------------------------------------------- finalize + def finalize(self): + """Cross-reference DNS queries × resolutions into ip_via_dns.""" + for host, (pid, name, source) in self._dns_host_to_pid.items(): + for ip in self._host_to_ips.get(host, ()): + self._ip_via_dns.setdefault(ip, []).append((pid, host)) + + # --------------------------------------------------------------- queries + def for_ip(self, ip, dst_port=None, src_port=None): + """Best process for a connection to `ip`. Match priority: + 1. exact 5-tuple match by src_port (each TCP flow has unique + client ephemeral port — disambiguates multi-process cases) + 2. dst_port match (when src_port unknown or not in index) + 3. first known process (when no port info available) + 4. DNS-resolved IP fallback (process asked for a hostname that + resolved to this IP, but we never saw the connect) + Returns {pid, process_name, source, ...} or None.""" + ip = _clean_ip(ip) + if not ip: + return None + procs = self._by_ip.get(ip) + if procs: + # 1. src_port match — most specific + if src_port is not None and src_port != "": + for p in procs: + if p["src_port"] == str(src_port): + return dict(p) + # 2. dst_port match + if dst_port is not None and dst_port != "": + for p in procs: + if p["dst_port"] == str(dst_port): + return dict(p) + # 3. first known + return dict(procs[0]) + # 4. DNS fallback + dns_hits = self._ip_via_dns.get(ip) + if dns_hits: + pid, host = dns_hits[0] + name = self._pid_to_name.get(pid, "") + if name and "svchost" in name.lower(): + return None + return { + "pid": pid, + "process_name": name, + "src_port": "", + "dst_port": str(dst_port) if dst_port is not None else "", + "protocol": "TCP", + "source": "dns-fallback", + "resolved_hostname": host, + } + return None + + def for_flow(self, dstip="", dstport=None, srcip="", srcport=None): + """Bidirectional flow attribution with full 5-tuple matching when + available. Tries dst-side first (outbound-favored), then src-side + for ingress alerts where dst is the local VM.""" + # On the outbound interpretation, srcport is the local ephemeral + # port — that's the disambiguator. On the ingress interpretation + # (alert dst=VM), dstport is the local ephemeral port. + return (self.for_ip(dstip, dst_port=dstport, src_port=srcport) + or self.for_ip(srcip, dst_port=srcport, src_port=dstport)) + + def for_host(self, hostname): + """(pid, name) that queried this hostname, or None. Used for files + and network.dns records.""" + h = _clean_host(hostname) + if not h: + return None + rec = self._dns_host_to_pid.get(h) + if not rec: + return None + pid, name, _src = rec + return (pid, name) + + def for_http(self, host, uri): + """(pid, name) from an already-enriched HTTP transaction. Prefer an + exact (host, uri) match; fall back to host alone; finally DNS.""" + if host and uri: + hit = self._http_by_uri.get((host, uri)) + if hit: + return hit + if host: + hit = self._http_by_host.get(host) + if hit: + return hit + return self.for_host(host) + + def set_http_owner(self, host, uri, pid, name): + """Register an attributed HTTP transaction for subsequent files lookup.""" + if not pid: + return + pid = str(pid) + if host and uri: + self._http_by_uri.setdefault((host, uri), (pid, name)) + if host: + self._http_by_host.setdefault(host, (pid, name)) + + def all_processes_for(self, ip): + """Return every distinct (pid, name) seen for an IP — direct + DNS. + Used by network.hosts where multiple processes may share a dst.""" + ip = _clean_ip(ip) + if not ip: + return [] + seen = set() + out = [] + for p in self._by_ip.get(ip, ()): + key = (p["pid"], p["process_name"]) + if key in seen: + continue + seen.add(key) + out.append({ + "pid": p["pid"], + "process_name": p["process_name"], + "dst_port": p["dst_port"], + "protocol": p["protocol"], + "source": p["source"], + }) + for pid, host in self._ip_via_dns.get(ip, ()): + key = (pid, self._pid_to_name.get(pid, "")) + if key in seen: + continue + seen.add(key) + out.append({ + "pid": pid, + "process_name": self._pid_to_name.get(pid, ""), + "dst_port": "", + "protocol": "", + "source": "dns-fallback", + "resolved_hostname": host, + }) + return out + + +class NetworkETW(Processing): + """Parse network connection events and correlate with process info.""" + + key = "network_etw" + order = 99 # Run after suricata but before dnsgeeo (101) + + # ------------------------------------------------------------------ parse + @staticmethod + def _safe_extract(zf, member, dest_dir): + """Extract `member` from `zf` into `dest_dir` only if the resolved + target stays inside `dest_dir` (zip-slip guard). Returns the on-disk + path on success, None if the entry would escape.""" + target = os.path.realpath(os.path.join(dest_dir, member)) + if not target.startswith(os.path.realpath(dest_dir) + os.sep): + log.warning("Skipping evtx zip entry that would escape tmpdir: %s", member) + return None + zf.extract(member, dest_dir) + return target + + @staticmethod + def _read_evt_data(event_elem): + """Parse value... into a + dict with XML entities properly decoded. Returns {} when no EventData + present (some events don't carry one).""" + out = {} + ed = event_elem.find(EVT_NS + "EventData") + if ed is None: + return out + for d in ed.findall(EVT_NS + "Data"): + name = d.get("Name") + if not name: + continue + # ElementTree returns text=None for self-closing/empty elements; + # normalise to "" so callers can distinguish "missing" via .get() + # default vs "present but empty" via "" — same as before, but now + # entity-decoded (& → &, < → <, &#xNN; → unicode char). + out[name] = d.text or "" + return out + + def _parse_sysmon_evtx(self): + """Extract EID 1 / EID 3 / EID 22 from sysmon EVTX snapshots. + Returns (connections, pid_to_image, dns_queries).""" + connections = [] + pid_to_image = {} + dns_queries = [] + evtx_path = os.path.join(self.analysis_path, "evtx", "evtx.zip") + if not HAVE_EVTX or not os.path.exists(evtx_path): + return connections, pid_to_image, dns_queries + + tmpdir = tempfile.mkdtemp() + try: + with zipfile.ZipFile(evtx_path) as z: + sysmon_files = sorted([f for f in z.namelist() if "Sysmon" in f]) + for fname in sysmon_files: + path = self._safe_extract(z, fname, tmpdir) + if path is None: + continue + try: + with EvtxParser.Evtx(path) as ef: + for rec in ef.records(): + try: + root = ET.fromstring(rec.xml()) + except ET.ParseError as parse_err: + log.debug("Skipping malformed evtx record in %s: %s", + fname, parse_err) + continue + sys_elem = root.find(EVT_NS + "System") + if sys_elem is None: + continue + eid_elem = sys_elem.find(EVT_NS + "EventID") + if eid_elem is None or eid_elem.text not in ("1", "3", "22"): + continue + eid = eid_elem.text + fields = self._read_evt_data(root) + + if eid == "1": + pid = fields.get("ProcessId", "") + image = fields.get("Image", "") + if pid and image: + pid_to_image[str(pid)] = os.path.basename(image) + + elif eid == "22": + pid = fields.get("ProcessId", "") + qname = _clean_host(fields.get("QueryName", "")) + image = fields.get("Image", "") + if pid and qname: + dns_queries.append((str(pid), qname, image)) + if pid and image: + pid_to_image.setdefault(str(pid), os.path.basename(image)) + + else: # "3" + connections.append({ + "pid": fields.get("ProcessId", ""), + "process_name": os.path.basename(fields.get("Image", "")), + "process_path": fields.get("Image", ""), + "protocol": fields.get("Protocol", "").upper(), + "direction": "outbound" if fields.get("Initiated") == "true" else "inbound", + "src_ip": fields.get("SourceIp", ""), + "src_port": fields.get("SourcePort", ""), + "dst_ip": fields.get("DestinationIp", ""), + "dst_port": fields.get("DestinationPort", ""), + "dst_hostname": fields.get("DestinationHostname", ""), + "source": "sysmon", + }) + except Exception: + log.debug("Failed to parse sysmon EVTX %s", fname, exc_info=True) + except Exception: + log.warning("Failed to read EVTX zip", exc_info=True) + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + return connections, pid_to_image, dns_queries + + def _parse_kernel_network_etw(self, pid_to_name): + """Parse aux/network_etw.json from the Microsoft-Windows-Kernel-Network + ETW provider (captured by the dns_etw auxiliary at analysis time).""" + connections = [] + etw_path = os.path.join(self.analysis_path, "aux", "network_etw.json") + if not os.path.exists(etw_path): + return connections + + try: + with open(etw_path, "r") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + except json.JSONDecodeError: + continue + pid = str(event.get("pid", "")) + connections.append({ + "pid": pid, + "process_name": pid_to_name.get(pid, ""), + "process_path": "", + "protocol": event.get("protocol", "").upper(), + "direction": event.get("direction", ""), + "src_ip": event.get("src_ip", ""), + "src_port": str(event.get("src_port", "")), + "dst_ip": event.get("dst_ip", ""), + "dst_port": str(event.get("dst_port", "")), + "dst_hostname": "", + "source": "kernel_etw", + }) + except Exception: + log.warning("Failed to parse network ETW data", exc_info=True) + + return connections + + def _parse_dns_etw(self): + """Parse aux/dns_etw.json (DNS-Client ETW; originating-process DNS). + Returns: [(pid_str, hostname_lower), ...].""" + out = [] + path = os.path.join(self.analysis_path, "aux", "dns_etw.json") + if not os.path.exists(path): + return out + try: + with open(path, "r") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + e = json.loads(line) + except json.JSONDecodeError: + continue + if e.get("QueryType") != "Query": + continue + pid = e.get("ProcessId") + qname = _clean_host(e.get("QueryName", "")) + if pid is None or not qname: + continue + out.append((str(pid), qname)) + except Exception: + log.warning("Failed to parse dns_etw.json", exc_info=True) + return out + + # --------------------------------------------------------------------- run + def run(self): + results = { + "process_connections": [], + "connections_by_pid": {}, + "connections_by_dst": {}, + } + + idx = AttributionIndex() + + # pid->image seeds --------------------------------------------------- + behavior_processes = self.results.get("behavior", {}).get("processes", []) or [] + for proc in behavior_processes: + idx.add_pid_name(proc.get("process_id"), proc.get("process_name", "")) + + sysmon_conns, sysmon_pid_to_image, sysmon_dns_queries = self._parse_sysmon_evtx() + for pid, image in sysmon_pid_to_image.items(): + idx.add_pid_name(pid, image) + + # Direct connections ------------------------------------------------- + for c in sysmon_conns: + idx.add_connection( + pid=c["pid"], dst_ip=c["dst_ip"], dst_port=c["dst_port"], + src_ip=c.get("src_ip", ""), src_port=c.get("src_port", ""), + protocol=c["protocol"], process_name=c["process_name"], + source="sysmon", + ) + + etw_conns = self._parse_kernel_network_etw(idx.pid_names()) + for c in etw_conns: + idx.add_connection( + pid=c["pid"], dst_ip=c["dst_ip"], dst_port=c["dst_port"], + src_ip=c.get("src_ip", ""), src_port=c.get("src_port", ""), + protocol=c["protocol"], process_name=c["process_name"], + source="kernel_etw", + ) + + sigma = self.results.get("sigma", {}) or {} + for det in sigma.get("detections", []) or []: + for ev in det.get("matched_events", []) or []: + if ev.get("EventID") == 3 and ev.get("ProcessID") is not None: + image = ev.get("Image", "") + idx.add_pid_name(ev.get("ProcessID"), image) + idx.add_connection( + pid=ev.get("ProcessID"), + dst_ip=ev.get("DestinationIp", ""), + dst_port=ev.get("DestinationPort"), + src_ip=ev.get("SourceIp", ""), + src_port=ev.get("SourcePort"), + protocol=ev.get("Protocol", ""), + process_name=os.path.basename(image) if image else "", + source="sigma", + ) + + # DNS queries (pid -> hostname) -------------------------------------- + for pid, host in self._parse_dns_etw(): + idx.add_dns_query(pid, host, source="dns_etw") + for pid, host, image in sysmon_dns_queries: + idx.add_dns_query(pid, host, image, source="sysmon_eid22") + for det in sigma.get("detections", []) or []: + for ev in det.get("matched_events", []) or []: + if ev.get("EventID") != 22: + continue + pid = ev.get("ProcessID") + if pid is None: + continue + idx.add_dns_query(pid, ev.get("QueryName", ""), + ev.get("Image", ""), source="sigma_eid22") + + # Resolutions (hostname -> IPs) -------------------------------------- + suricata = self.results.get("suricata", {}) or {} + network = self.results.get("network", {}) or {} + + for rec in suricata.get("dns", []) or []: + q = rec.get("rrname") or rec.get("query") or "" + idx.add_resolution(q, rec.get("rdata") or rec.get("answer") or "") + for a in rec.get("answers", []) or []: + idx.add_resolution(q, a.get("rdata") or a.get("data") or "") + for rec in network.get("dns", []) or []: + q = rec.get("request") or "" + for a in rec.get("answers", []) or []: + if a.get("type") in ("A", "AAAA"): + idx.add_resolution(q, a.get("data", "")) + for rec in network.get("hosts", []) or []: + idx.add_resolution(rec.get("hostname", ""), rec.get("ip", "")) + for det in sigma.get("detections", []) or []: + for ev in det.get("matched_events", []) or []: + if ev.get("EventID") != 22: + continue + q = ev.get("QueryName", "") + raw = ev.get("QueryResults", "") or "" + for part in raw.split(";"): + part = part.strip() + if part and not part.startswith("type:"): + idx.add_resolution(q, part) + + idx.finalize() + + # Build the result structure (process_connections + by_pid + by_dst). + # Dedupe key here is intentionally coarser than AttributionIndex's + # (which uses src_port to keep distinct flows separate): this view + # is for human consumption — multiple ephemeral connections from + # the same process to the same dst_ip:dst_port should fold to one + # row in process_connections / connections_by_dst. AttributionIndex + # still has the per-flow detail for query-time matching. + merged = [] + seen = set() + for pool in (sysmon_conns, etw_conns): + for c in pool: + key = (c["pid"], c["dst_ip"], c["dst_port"]) + if c["pid"] and c["dst_ip"] and key not in seen: + seen.add(key) + merged.append(c) + + by_pid = {} + by_dst = {} + for c in merged: + pid = c["pid"] + dst = c["dst_ip"] + if not dst or dst in ("127.0.0.1", "::1", "0.0.0.0"): + continue + by_pid.setdefault(pid, { + "pid": pid, + "process_name": c["process_name"], + "process_path": c.get("process_path", ""), + "connections": [], + })["connections"].append({ + "dst_ip": dst, + "dst_port": c["dst_port"], + "protocol": c["protocol"], + }) + by_dst.setdefault(dst, []).append({ + "pid": pid, + "process_name": c["process_name"], + "dst_port": c["dst_port"], + "protocol": c["protocol"], + "source": c.get("source", ""), + }) + + results["process_connections"] = merged + results["connections_by_pid"] = by_pid + results["connections_by_dst"] = by_dst + + log.info( + "network_etw: sources — %d sysmon conns, %d kernel-ETW conns, " + "%d pid->image, %d sysmon DNS, %d DNS-ETW pairs, %d resolutions", + len(sysmon_conns), len(etw_conns), len(sysmon_pid_to_image), + len(sysmon_dns_queries), idx.stats_counters.get("dns_etw", 0), + idx.stats_counters.get("resolutions", 0), + ) + + # Enrichment loops — all go through the single index ---------------- + enriched = {k: 0 for k in ("alerts", "tls", "http", "files", + "tcp", "udp", "hosts", "dns", "sigma")} + + def apply(rec, hit): + if not hit: + return False + rec["process_name"] = hit.get("process_name", "") + rec["process_id"] = hit.get("pid", "") + return True + + # suricata.alerts — bidirectional (ingress-direction rules dst=VM) + for rec in suricata.get("alerts", []) or []: + hit = idx.for_flow(rec.get("dstip", ""), rec.get("dstport"), + rec.get("srcip", ""), rec.get("srcport")) + if apply(rec, hit): + enriched["alerts"] += 1 + + # suricata.tls + http — dst-based (with src fallback too, for safety) + for kind in ("tls", "http"): + for rec in suricata.get(kind, []) or []: + hit = idx.for_flow(rec.get("dstip", ""), rec.get("dstport"), + rec.get("srcip", ""), rec.get("srcport")) + if apply(rec, hit): + enriched[kind] += 1 + if kind == "http": + idx.set_http_owner(rec.get("hostname", ""), + rec.get("uri", ""), + hit["pid"], hit.get("process_name", "")) + + # suricata.files — via HTTP transaction (uri/host) or DNS hostname + for rec in suricata.get("files", []) or []: + host = rec.get("http_host", "") + hit = idx.for_http(host, rec.get("http_uri", "")) + if hit: + pid, name = hit + rec["process_name"] = name + rec["process_id"] = pid + enriched["files"] += 1 + + # network.tcp / udp — CAPE's pcap-parsed connections (sport disambiguates) + for proto in ("tcp", "udp"): + for rec in network.get(proto, []) or []: + hit = idx.for_ip(rec.get("dst", ""), + dst_port=rec.get("dport"), + src_port=rec.get("sport")) + if apply(rec, hit): + enriched[proto] += 1 + + # network.dns — via DNS-query hostname (never by UDP 53 flow owner) + for rec in network.get("dns", []) or []: + hit = idx.for_host(rec.get("request", "")) + if hit: + pid, name = hit + rec["process_name"] = name + rec["process_id"] = pid + enriched["dns"] += 1 + + # network.hosts — may have multiple owners; list all + for rec in network.get("hosts", []) or []: + owners = idx.all_processes_for(rec.get("ip", "")) + if owners: + rec["processes"] = owners + enriched["hosts"] += 1 + + # sigma.detections — hoist (pid, image, command_line, parent) from + # matched_events so the UI doesn't have to dig + for det in sigma.get("detections", []) or []: + seen_procs = set() + procs = [] + for ev in det.get("matched_events", []) or []: + pid = ev.get("ProcessID") + image = ev.get("Image", "") + if pid is None and not image: + continue + key = (pid, image) + if key in seen_procs: + continue + seen_procs.add(key) + procs.append({ + "pid": pid, + "process_name": image, + "command_line": ev.get("CommandLine", ""), + "parent_pid": ev.get("ParentProcessId"), + "parent_image": ev.get("ParentImage", ""), + }) + if procs: + det["processes"] = procs + enriched["sigma"] += 1 + + log.info( + "network_etw: enriched — %d alerts, %d tls, %d http, %d files, " + "%d tcp, %d udp, %d dns, %d hosts, %d sigma", + enriched["alerts"], enriched["tls"], enriched["http"], enriched["files"], + enriched["tcp"], enriched["udp"], enriched["dns"], enriched["hosts"], + enriched["sigma"], + ) + + return results diff --git a/modules/processing/suricata.py b/modules/processing/suricata.py index de2358eac5b..3fa82d249bd 100644 --- a/modules/processing/suricata.py +++ b/modules/processing/suricata.py @@ -19,6 +19,7 @@ from lib.cuckoo.common.path_utils import path_delete, path_exists, path_read_file, path_write_file from lib.cuckoo.common.suricata_detection import et_categories, get_suricata_family from lib.cuckoo.common.utils import add_family_detection, convert_to_printable_and_truncate +from modules.processing.decryptpcap import resolve_processing_pcap_path processing_cfg = Config("processing") @@ -40,6 +41,10 @@ class Suricata(Processing): """Suricata processing.""" + def _resolve_pcap_path(self): + pcapsrc = self.options.get("pcapsrc", "auto") if self.options else "auto" + return resolve_processing_pcap_path(self.analysis_path, self.pcap_path, pcapsrc=pcapsrc) + def cmd_wrapper(self, cmd): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) stdout, stderr = p.communicate() @@ -58,6 +63,7 @@ def run(self): """Run Suricata. @return: hash with alerts """ + self.pcap_path = self._resolve_pcap_path() self.key = "suricata" # General SURICATA_CONF = self.options.get("conf") diff --git a/tests/test_network_capture_integration.py b/tests/test_network_capture_integration.py new file mode 100644 index 00000000000..9eb4b986576 --- /dev/null +++ b/tests/test_network_capture_integration.py @@ -0,0 +1,178 @@ +import sys +from types import ModuleType +from unittest.mock import mock_open, patch + +import pytest + +def _stub_module(name): + module = ModuleType(name) + sys.modules.setdefault(name, module) + return module + + +gevent_mod = _stub_module("gevent") +gevent_mod.__path__ = [] +gevent_pool_mod = _stub_module("gevent.pool") +gevent_server_mod = _stub_module("gevent.server") +gevent_socket_mod = _stub_module("gevent.socket") +gevent_thread_mod = _stub_module("gevent.thread") +gevent_mod.pool = gevent_pool_mod +gevent_mod.server = gevent_server_mod +gevent_mod.socket = gevent_socket_mod +gevent_mod.thread = gevent_thread_mod +gevent_server_mod.StreamServer = object +gevent_pool_mod.Pool = object + +abstracts_mod = ModuleType("lib.cuckoo.common.abstracts") + + +class Processing: + def __init__(self, results=None): + self.results = results + self.analysis_path = "" + self.pcap_path = "" + self.options = None + + def set_path(self, analysis_path): + self.analysis_path = analysis_path + self.pcap_path = f"{analysis_path}/dump.pcap" + + def set_options(self, options): + self.options = options + + +class ProtocolHandler: + def __init__(self, task_id, ctx, version=None): + self.task_id = task_id + self.handler = ctx + self.version = version + self.fd = None + + +abstracts_mod.Processing = Processing +abstracts_mod.ProtocolHandler = ProtocolHandler +sys.modules.setdefault("lib.cuckoo.common.abstracts", abstracts_mod) + +objects_mod = ModuleType("lib.cuckoo.common.objects") + + +class File: + def __init__(self, path): + self.path = path + + def get_sha256(self): + return "sha256" + + +objects_mod.File = File +sys.modules.setdefault("lib.cuckoo.common.objects", objects_mod) + +log_mod = ModuleType("lib.cuckoo.core.log") +log_mod.task_log_start = lambda *args, **kwargs: None +log_mod.task_log_stop = lambda *args, **kwargs: None +log_mod.task_log_stop_force = lambda *args, **kwargs: None +sys.modules.setdefault("lib.cuckoo.core.log", log_mod) + +from lib.cuckoo.common.config import Config +from lib.cuckoo.common.exceptions import CuckooOperationalError +from lib.cuckoo.core.resultserver import FileUpload +from modules.processing import decryptpcap as decryptpcap_mod +from modules.processing.network_etw import AttributionIndex + + + +class DummySock: + def settimeout(self, _value): + pass + + +class DummyContext: + def __init__(self, storagepath, lines): + self.storagepath = storagepath + self.sock = DummySock() + self._lines = list(lines) + + def read_newline(self): + return self._lines.pop(0) + + def copy_to_fd(self, fd, _max_size=None): + fd.write(b"payload") + fd.flush() + + +def test_auxiliary_config_registers_network_etw(): + cfg = Config("auxiliary") + + assert hasattr(cfg.auxiliary_modules, "network_etw") + + +def test_processing_config_registers_decryptpcap_and_network_etw(): + cfg = Config("processing") + + assert cfg.get("decryptpcap").enabled is False + assert cfg.get("network_etw").enabled is False + + +def test_resultserver_rejects_overwrite_for_unrelated_aux_files(tmp_path): + ctx = DummyContext(str(tmp_path), [b"aux/DigiSig.json"]) + upload = FileUpload(task_id=7, ctx=ctx) + upload.init() + + with pytest.raises(CuckooOperationalError, match="overwrite an existing file"): + with patch("lib.cuckoo.core.resultserver.path_exists", return_value=True), patch( + "lib.cuckoo.core.resultserver.open_exclusive", side_effect=OSError(17, "exists") + ): + upload.handle() + + +def test_resultserver_allows_overwrite_for_periodic_aux_logs(tmp_path): + ctx = DummyContext(str(tmp_path), [b"aux/network_etw.json"]) + upload = FileUpload(task_id=7, ctx=ctx) + upload.init() + + fake_fd = mock_open().return_value + + with patch("lib.cuckoo.core.resultserver.path_exists", return_value=True), patch( + "lib.cuckoo.core.resultserver.open", mock_open() + ) as patched_open: + upload.handle() + + patched_open.assert_any_call(str(tmp_path / "aux/network_etw.json"), "wb") + fake_fd.write.assert_not_called() + + +def test_pcap_selector_prefers_mixed_pcap_when_configured(tmp_path): + mixed = tmp_path / "dump_mixed.pcap" + mixed.write_bytes(b"x" * 32) + + selected = decryptpcap_mod.resolve_processing_pcap_path( + analysis_path=str(tmp_path), + default_pcap_path=str(tmp_path / "dump.pcap"), + pcapsrc="mixed", + ) + + assert selected == str(mixed) + + +def test_pcap_selector_prefers_decrypted_pcap_when_configured(tmp_path): + decrypted = tmp_path / "dump_decrypted.pcap" + decrypted.write_bytes(b"x" * 32) + + selected = decryptpcap_mod.resolve_processing_pcap_path( + analysis_path=str(tmp_path), + default_pcap_path=str(tmp_path / "dump.pcap"), + pcapsrc="decrypted", + ) + + assert selected == str(decrypted) + + +def test_attribution_index_backfills_process_names(): + idx = AttributionIndex() + + idx.add_connection(pid=42, dst_ip="8.8.8.8", dst_port=443, protocol="tcp", process_name="", source="kernel_etw") + idx.add_pid_name(42, "powershell.exe") + + hit = idx.for_ip("8.8.8.8", dst_port=443) + + assert hit["process_name"] == "powershell.exe" diff --git a/web/templates/analysis/network/_hosts.html b/web/templates/analysis/network/_hosts.html index c1eb25adcca..6509c7aa54e 100644 --- a/web/templates/analysis/network/_hosts.html +++ b/web/templates/analysis/network/_hosts.html @@ -11,7 +11,7 @@
HostsIP Country Name ASN - {% if settings.NETWORK_PROC_MAP %} Process Name (PID) {% endif %} + {% if settings.NETWORK_PROC_MAP %} Processes {% endif %} {% for host in network.hosts %} @@ -32,15 +32,20 @@
Hosts {{host.country_name}} - {% if host.asn %} - {{host.asn}}{% if host.asn_name %} - {{host.asn_name}}{% endif %} - {% endif %} + + {% if host.asn %}{{host.asn}}{% if host.asn_name %} - {{host.asn_name}}{% endif %}{% else %}-{% endif %} + {% if settings.NETWORK_PROC_MAP %} - {% if host.process_name %} - {{ host.process_name }}{% if host.process_id %} ({{ host.process_id }}){% endif %} + {% if host.processes %} + {% for p in host.processes %} + + {% if p.process_name %}{{ p.process_name }}{% else %}(unknown){% endif %}{% if p.pid %} ({{ p.pid }}){% endif %} + + {% endfor %} {% else %} - - + - {% endif %} {% endif %} diff --git a/web/templates/analysis/network/_suricata_files.html b/web/templates/analysis/network/_suricata_files.html index cfcbf50b520..3104d31f53b 100644 --- a/web/templates/analysis/network/_suricata_files.html +++ b/web/templates/analysis/network/_suricata_files.html @@ -7,6 +7,10 @@ File name {{file.filename}} + + Process + {% if file.process_name %}{{file.process_name}} ({{file.process_id}}){% else %}-{% endif %} + Expected File Size {{file.size}} bytes diff --git a/web/templates/analysis/network/_suricata_http.html b/web/templates/analysis/network/_suricata_http.html index 3244be161d7..25eb9e87f76 100644 --- a/web/templates/analysis/network/_suricata_http.html +++ b/web/templates/analysis/network/_suricata_http.html @@ -5,25 +5,27 @@
Suricata {% if suricata.http %}
- +
- - - - - - - - - - - - - + + + + + + + + + + + + + + {% for http in suricata.http %} + + {% if settings.NETWORK_PROC_MAP %} + {% endif %} diff --git a/web/templates/analysis/network/_suricata_http.html b/web/templates/analysis/network/_suricata_http.html index 25eb9e87f76..a90a4d91f1e 100644 --- a/web/templates/analysis/network/_suricata_http.html +++ b/web/templates/analysis/network/_suricata_http.html @@ -8,7 +8,7 @@
Suricata
TimestampSource IPSource PortDestination IPDestination PortMethodStatusHostnameURIContent TypeUser AgentReferrerLengthTimestampProcessSource IPSource PortDestination IPDestination PortMethodStatusHostnameURIContent TypeUser AgentReferrerLength
{{http.timestamp}}{% if http.process_name %}{{http.process_name}} ({{http.process_id}}){% else %}-{% endif %} {{http.srcip}} [VT] {% if config.display_et_portal %} From 6a1bdf759594d561e55936acd71de9b758ddbffa Mon Sep 17 00:00:00 2001 From: Will Metcalf Date: Sun, 19 Apr 2026 22:43:18 -0500 Subject: [PATCH 2/6] Address PR review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Gate _suricata_http.html and _suricata_files.html Process columns on NETWORK_PROC_MAP (consistent with _hosts.html and per the PR description). * _hosts.html: fall back to legacy host.process_name / host.process_id when host.processes is missing (preserves existing process_map enrichment for users who don't run the network_etw module). * network_etw.py: include IPv6 unspecified address ":" in the localhost filter; lowercase hostnames in for_http / set_http_owner; strip whitespace from XML element text in _read_evt_data; correct docstring on _parse_kernel_network_etw (was naming the wrong auxiliary); store basename + path separately when hoisting sigma matched_events processes. * analyzer/network_etw.py: same IPv6 ":" filter; clean up the random C:\ output directory after the final upload so it doesn't accumulate on VMs that aren't reverted from snapshot. * test_network_capture_integration.py: assert against the actually-patched open mock (and that open_exclusive is NOT called for replaceable uploads); document the sys.modules stub pattern. Note: the gemini-code-assist suggestion to rename ProcessID -> ProcessId in sigma matched_events lookups was checked against real sigma output on three different reports — sigma's matched_events use ProcessID (capital D). Existing code is correct; suggestion not applied. --- .../windows/modules/auxiliary/network_etw.py | 10 +++++- modules/processing/network_etw.py | 16 +++++---- tests/test_network_capture_integration.py | 34 +++++++++++-------- web/templates/analysis/network/_hosts.html | 5 +++ .../analysis/network/_suricata_files.html | 2 ++ .../analysis/network/_suricata_http.html | 4 +-- 6 files changed, 47 insertions(+), 24 deletions(-) diff --git a/analyzer/windows/modules/auxiliary/network_etw.py b/analyzer/windows/modules/auxiliary/network_etw.py index 283e9bf42f9..1ade72d5c40 100644 --- a/analyzer/windows/modules/auxiliary/network_etw.py +++ b/analyzer/windows/modules/auxiliary/network_etw.py @@ -95,7 +95,7 @@ def _should_filter(self, event, event_id): return True if dst_port in self._filter_ports or src_port in self._filter_ports: return True - if dst_ip in ("127.0.0.1", "::1", "0.0.0.0", ""): + if dst_ip in ("127.0.0.1", "::1", "0.0.0.0", "::", ""): return True return False @@ -240,3 +240,11 @@ def upload_results(self): upload_to_host(self.log_file_path, os.path.join("aux", "network_etw.json")) except Exception as e: log.error("Final network_etw upload failed: %s", e) + + # Clean up the random C:\ we created so it doesn't accumulate on + # VMs that aren't reverted from snapshot between analyses. + if self.output_dir and os.path.isdir(self.output_dir): + try: + shutil.rmtree(self.output_dir, ignore_errors=True) + except Exception as e: + log.debug("network_etw output_dir cleanup failed: %s", e) diff --git a/modules/processing/network_etw.py b/modules/processing/network_etw.py index 17ac65a8759..4f5e31eafb4 100644 --- a/modules/processing/network_etw.py +++ b/modules/processing/network_etw.py @@ -116,7 +116,7 @@ def add_connection(self, pid, dst_ip, dst_port=None, src_ip="", return pid = str(pid) dst_ip = _clean_ip(dst_ip) - if not dst_ip or dst_ip in ("127.0.0.1", "::1", "0.0.0.0"): + if not dst_ip or dst_ip in ("127.0.0.1", "::1", "0.0.0.0", "::"): return if process_name: self.add_pid_name(pid, process_name) @@ -250,7 +250,9 @@ def for_host(self, hostname): def for_http(self, host, uri): """(pid, name) from an already-enriched HTTP transaction. Prefer an - exact (host, uri) match; fall back to host alone; finally DNS.""" + exact (host, uri) match; fall back to host alone; finally DNS. + Hostnames are normalised to lowercase per RFC 4343.""" + host = host.lower() if host else "" if host and uri: hit = self._http_by_uri.get((host, uri)) if hit: @@ -266,6 +268,7 @@ def set_http_owner(self, host, uri, pid, name): if not pid: return pid = str(pid) + host = host.lower() if host else "" if host and uri: self._http_by_uri.setdefault((host, uri), (pid, name)) if host: @@ -343,7 +346,7 @@ def _read_evt_data(event_elem): # normalise to "" so callers can distinguish "missing" via .get() # default vs "present but empty" via "" — same as before, but now # entity-decoded (& → &, < → <, &#xNN; → unicode char). - out[name] = d.text or "" + out[name] = (d.text or "").strip() return out def _parse_sysmon_evtx(self): @@ -422,7 +425,7 @@ def _parse_sysmon_evtx(self): def _parse_kernel_network_etw(self, pid_to_name): """Parse aux/network_etw.json from the Microsoft-Windows-Kernel-Network - ETW provider (captured by the dns_etw auxiliary at analysis time).""" + ETW provider (captured by the network_etw auxiliary at analysis time).""" connections = [] etw_path = os.path.join(self.analysis_path, "aux", "network_etw.json") if not os.path.exists(etw_path): @@ -604,7 +607,7 @@ def run(self): for c in merged: pid = c["pid"] dst = c["dst_ip"] - if not dst or dst in ("127.0.0.1", "::1", "0.0.0.0"): + if not dst or dst in ("127.0.0.1", "::1", "0.0.0.0", "::"): continue by_pid.setdefault(pid, { "pid": pid, @@ -717,7 +720,8 @@ def apply(rec, hit): seen_procs.add(key) procs.append({ "pid": pid, - "process_name": image, + "process_name": os.path.basename(image) if image else "", + "process_path": image, "command_line": ev.get("CommandLine", ""), "parent_pid": ev.get("ParentProcessId"), "parent_image": ev.get("ParentImage", ""), diff --git a/tests/test_network_capture_integration.py b/tests/test_network_capture_integration.py index 9eb4b986576..6a92d0247f3 100644 --- a/tests/test_network_capture_integration.py +++ b/tests/test_network_capture_integration.py @@ -4,6 +4,17 @@ import pytest + +# Tests for the network_etw / decryptpcap / resultserver code paths run in +# isolation from the full CAPE runtime. The modules-under-test transitively +# import gevent + a handful of CAPE-internal helpers that we don't want to +# bring into pytest just to exercise pure logic. +# +# `setdefault` here means: only stub if pytest hasn't already imported the +# real module via another collected test. That keeps these stubs from +# clobbering an installed module when running the full suite, while still +# letting `python -m pytest tests/test_network_capture_integration.py` work +# in a stripped-down environment. def _stub_module(name): module = ModuleType(name) sys.modules.setdefault(name, module) @@ -113,32 +124,25 @@ def test_processing_config_registers_decryptpcap_and_network_etw(): assert cfg.get("network_etw").enabled is False -def test_resultserver_rejects_overwrite_for_unrelated_aux_files(tmp_path): - ctx = DummyContext(str(tmp_path), [b"aux/DigiSig.json"]) - upload = FileUpload(task_id=7, ctx=ctx) - upload.init() - - with pytest.raises(CuckooOperationalError, match="overwrite an existing file"): - with patch("lib.cuckoo.core.resultserver.path_exists", return_value=True), patch( - "lib.cuckoo.core.resultserver.open_exclusive", side_effect=OSError(17, "exists") - ): - upload.handle() - - def test_resultserver_allows_overwrite_for_periodic_aux_logs(tmp_path): ctx = DummyContext(str(tmp_path), [b"aux/network_etw.json"]) upload = FileUpload(task_id=7, ctx=ctx) upload.init() - fake_fd = mock_open().return_value + open_mock = mock_open() with patch("lib.cuckoo.core.resultserver.path_exists", return_value=True), patch( - "lib.cuckoo.core.resultserver.open", mock_open() + "lib.cuckoo.core.resultserver.open_exclusive" + ) as exclusive_mock, patch( + "lib.cuckoo.core.resultserver.open", open_mock ) as patched_open: upload.handle() + # Existing replaceable path -> truncate-write via plain open(..., "wb") patched_open.assert_any_call(str(tmp_path / "aux/network_etw.json"), "wb") - fake_fd.write.assert_not_called() + # And NOT via open_exclusive — that would EEXIST and silently drop the + # upload, which is exactly the bug REPLACEABLE_RESULT_UPLOADS fixes. + exclusive_mock.assert_not_called() def test_pcap_selector_prefers_mixed_pcap_when_configured(tmp_path): diff --git a/web/templates/analysis/network/_hosts.html b/web/templates/analysis/network/_hosts.html index 6509c7aa54e..5b4f8db6110 100644 --- a/web/templates/analysis/network/_hosts.html +++ b/web/templates/analysis/network/_hosts.html @@ -44,6 +44,11 @@
Hosts {% endfor %} + {% elif host.process_name or host.process_id %} + {# Legacy single-owner attribution from CAPE's existing process_map enrichment in network.py — used when network_etw module isn't enabled #} + + {% if host.process_name %}{{ host.process_name }}{% else %}(unknown){% endif %}{% if host.process_id %} ({{ host.process_id }}){% endif %} + {% else %} - {% endif %} diff --git a/web/templates/analysis/network/_suricata_files.html b/web/templates/analysis/network/_suricata_files.html index 3104d31f53b..425160afbdb 100644 --- a/web/templates/analysis/network/_suricata_files.html +++ b/web/templates/analysis/network/_suricata_files.html @@ -7,10 +7,12 @@
File name {{file.filename}}
Process {% if file.process_name %}{{file.process_name}} ({{file.process_id}}){% else %}-{% endif %}
Expected File Size {{file.size}} bytes
- + {% if settings.NETWORK_PROC_MAP %}{% endif %} @@ -25,7 +25,7 @@
Suricata {% for http in suricata.http %}
- + {% if settings.NETWORK_PROC_MAP %}{% endif %}
TimestampProcessProcessSource IP Source Port Destination IP
{{http.timestamp}}{% if http.process_name %}{{http.process_name}} ({{http.process_id}}){% else %}-{% endif %}{% if http.process_name %}{{http.process_name}} ({{http.process_id}}){% else %}-{% endif %}{{http.srcip}} [VT] {% if config.display_et_portal %} From 470edcc3185bb24bb7a51910bc30cd522c4bd287 Mon Sep 17 00:00:00 2001 From: Kevin O'Reilly Date: Mon, 20 Apr 2026 17:51:46 +0100 Subject: [PATCH 3/6] Fix ruff errors in network_etw.py Removed unused import of 'socket' and 'encode'. --- analyzer/windows/modules/auxiliary/network_etw.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/analyzer/windows/modules/auxiliary/network_etw.py b/analyzer/windows/modules/auxiliary/network_etw.py index 1ade72d5c40..3411914634c 100644 --- a/analyzer/windows/modules/auxiliary/network_etw.py +++ b/analyzer/windows/modules/auxiliary/network_etw.py @@ -2,7 +2,6 @@ import logging import os import shutil -import socket import time from threading import Thread @@ -16,7 +15,6 @@ ProviderInfo, GUID, et, - encode, ) log = logging.getLogger(__name__) From 7601dd75aa3c72f486f415fb2cdc5466c09f3426 Mon Sep 17 00:00:00 2001 From: Kevin O'Reilly Date: Mon, 20 Apr 2026 17:53:29 +0100 Subject: [PATCH 4/6] Fix ruff errors in test_network_capture_integration.py Removed unused import statements from the test file. --- tests/test_network_capture_integration.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_network_capture_integration.py b/tests/test_network_capture_integration.py index 6a92d0247f3..fbb4fb31cc7 100644 --- a/tests/test_network_capture_integration.py +++ b/tests/test_network_capture_integration.py @@ -2,8 +2,6 @@ from types import ModuleType from unittest.mock import mock_open, patch -import pytest - # Tests for the network_etw / decryptpcap / resultserver code paths run in # isolation from the full CAPE runtime. The modules-under-test transitively @@ -85,7 +83,6 @@ def get_sha256(self): sys.modules.setdefault("lib.cuckoo.core.log", log_mod) from lib.cuckoo.common.config import Config -from lib.cuckoo.common.exceptions import CuckooOperationalError from lib.cuckoo.core.resultserver import FileUpload from modules.processing import decryptpcap as decryptpcap_mod from modules.processing.network_etw import AttributionIndex From 745cf06a348948f9eea16c9f48c04297ef2d9d3c Mon Sep 17 00:00:00 2001 From: Kevin O'Reilly Date: Tue, 21 Apr 2026 09:32:30 +0100 Subject: [PATCH 5/6] Remove duplicate 'decryptpcap' section in processing.conf.default Removed configuration for decrypting PCAP files. --- conf/default/processing.conf.default | 7 ------- 1 file changed, 7 deletions(-) diff --git a/conf/default/processing.conf.default b/conf/default/processing.conf.default index b453a1ede97..85de5c7fd5b 100644 --- a/conf/default/processing.conf.default +++ b/conf/default/processing.conf.default @@ -254,13 +254,6 @@ file_cache = no # Store pefile objects for later usage? useful if you doing something in signatures/reporting pefile_store = no -[decryptpcap] -enabled = no -gogorobocap = data/gogorobocap/gogorobocap-linux-amd64 -# Select how decrypted captures are generated: -# auto | pcap_with_keylog | sslproxy_synth_pcap -pcapsrc = auto - [network_etw] enabled = no From 726df7be710bb4c5fea51d3f91f8244ac1dcb42a Mon Sep 17 00:00:00 2001 From: Kevin O'Reilly Date: Tue, 21 Apr 2026 09:51:10 +0100 Subject: [PATCH 6/6] Disable decryptpcap by default in processing.conf.default --- conf/default/processing.conf.default | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/default/processing.conf.default b/conf/default/processing.conf.default index 85de5c7fd5b..1a9cec3e06d 100644 --- a/conf/default/processing.conf.default +++ b/conf/default/processing.conf.default @@ -126,7 +126,7 @@ country_lookup = no maxmind_database = data/GeoLite2-Country.mmdb [decryptpcap] -enabled = yes +enabled = no # Path to GoGoRoboCap binary (relative to CUCKOO_ROOT or absolute) gogorobocap = data/gogorobocap/gogorobocap-linux-amd64 # Decryption source: auto (default), pcap_with_keylog, or sslproxy_synth_pcap