diff --git a/analyzer/windows/modules/auxiliary/evtx.py b/analyzer/windows/modules/auxiliary/evtx.py
index ccd1fd6f847..e9cfc3469e6 100644
--- a/analyzer/windows/modules/auxiliary/evtx.py
+++ b/analyzer/windows/modules/auxiliary/evtx.py
@@ -12,6 +12,15 @@
class Evtx(Thread, Auxiliary):
+ # Stop AFTER capemon-related auxiliaries so the final EVTX snapshot
+ # captures sysmon events from late-fire callbacks that fire between
+ # the analysis-stopping signal and the VM teardown (e.g. C2 callbacks
+ # the malware schedules after a delay). Without this priority bump,
+ # those events happen after the last EVTX snapshot and never reach
+ # the host-side processing modules.
+ start_priority = 0
+ stop_priority = -20
+
evtx_dump = "evtx.zip"
# Event log channels to collect
diff --git a/analyzer/windows/modules/auxiliary/network_etw.py b/analyzer/windows/modules/auxiliary/network_etw.py
new file mode 100644
index 00000000000..3411914634c
--- /dev/null
+++ b/analyzer/windows/modules/auxiliary/network_etw.py
@@ -0,0 +1,248 @@
+import json
+import logging
+import os
+import shutil
+import time
+from threading import Thread
+
+from lib.common.results import upload_to_host
+from lib.common.rand import random_string
+from lib.core.config import Config
+from lib.common.etw_utils import (
+ ETWAuxiliaryWrapper,
+ ETWProviderWrapper,
+ HAVE_ETW,
+ ProviderInfo,
+ GUID,
+ et,
+)
+
+log = logging.getLogger(__name__)
+
+__author__ = "DNS-GEE-O (@wmetcalf)"
+
+KERNEL_NETWORK_GUID = "{7DD42A49-5329-4832-8DFD-43D979153A88}"
+
+CONNECT_EVENT_IDS = [12, 15, 28, 31, 42, 58]
+
+EVENT_NAMES = {
+ 12: "tcp_connect_v4",
+ 15: "tcp_accept_v4",
+ 28: "tcp_connect_v6",
+ 31: "tcp_accept_v6",
+ 42: "udp_send_v4",
+ 58: "udp_send_v6",
+}
+
+# Periodic upload interval in seconds
+UPLOAD_INTERVAL = 15
+
+
+if HAVE_ETW:
+
+ class NetworkETWProvider(ETWProviderWrapper):
+ def __init__(
+ self,
+ level=et.TRACE_LEVEL_INFORMATION,
+ logfile=None,
+ no_conout=False,
+ any_keywords=None,
+ all_keywords=None,
+ filter_ips=None,
+ filter_ports=None,
+ ):
+ self._filter_ips = filter_ips or set()
+ self._filter_ports = filter_ports or set()
+
+ providers = [
+ ProviderInfo(
+ "Microsoft-Windows-Kernel-Network",
+ GUID(KERNEL_NETWORK_GUID),
+ level,
+ any_keywords or 0x30,
+ all_keywords,
+ )
+ ]
+ super().__init__(
+ session_name="ETW_KernelNetwork",
+ providers=providers,
+ event_id_filters=CONNECT_EVENT_IDS,
+ logfile=logfile,
+ no_conout=no_conout,
+ )
+
+ def _should_filter(self, event, event_id):
+ src_ip = str(event.get("saddr", ""))
+ dst_ip = str(event.get("daddr", ""))
+ src_port = event.get("sport", 0)
+ dst_port = event.get("dport", 0)
+
+ # Try int conversion for port comparison
+ try:
+ src_port = int(src_port)
+ except (ValueError, TypeError):
+ pass
+ try:
+ dst_port = int(dst_port)
+ except (ValueError, TypeError):
+ pass
+
+ if dst_ip in self._filter_ips:
+ return True
+ if event_id in (15, 31) and src_ip in self._filter_ips:
+ return True
+ if dst_port in self._filter_ports or src_port in self._filter_ports:
+ return True
+ if dst_ip in ("127.0.0.1", "::1", "0.0.0.0", "::", ""):
+ return True
+ return False
+
+ def on_event(self, event_tufo):
+ event_id, event = event_tufo
+ if event_id not in self.event_id_filters:
+ return
+ if self._should_filter(event, event_id):
+ return
+ if self.logfile:
+ self.write_to_log(self.logfile, event_id, event)
+
+ def write_to_log(self, file_handle, event_id, event):
+ header = event.get("EventHeader", {})
+ pid = event.get("PID") or header.get("ProcessId", 0)
+ proto = "TCP" if event_id in (12, 15, 28, 31) else "UDP"
+ direction = "outbound" if event_id in (12, 28, 42, 58) else "inbound"
+
+ entry = {
+ "event_type": EVENT_NAMES.get(event_id, "unknown"),
+ "event_id": event_id,
+ "pid": pid,
+ "protocol": proto,
+ "direction": direction,
+ "src_ip": str(event.get("saddr", "")),
+ "src_port": event.get("sport", 0),
+ "dst_ip": str(event.get("daddr", "")),
+ "dst_port": event.get("dport", 0),
+ "timestamp": str(header.get("TimeStamp", "")),
+ }
+ connid = event.get("connid")
+ if connid:
+ entry["connid"] = connid
+
+ json.dump(entry, file_handle)
+ file_handle.write("\n")
+
+
+class Network_ETW(ETWAuxiliaryWrapper):
+ """Captures TCP/UDP connection events via Microsoft-Windows-Kernel-Network ETW.
+
+ Provides process-to-network 5-tuple mapping.
+ Periodically uploads captured data to ensure availability if analysis
+ terminates unexpectedly.
+
+ Output: aux/network_etw.json (NDJSON)
+ """
+
+ # Stop AFTER capemon-related modules so late-firing network calls get attributed
+ start_priority = 0
+ stop_priority = -20
+
+ def __init__(self, options, config):
+ super().__init__(options, config, "network_etw")
+
+ self.output_dir = os.path.join("C:\\", random_string(5, 10))
+ try:
+ os.mkdir(self.output_dir)
+ except FileExistsError:
+ pass
+
+ self.log_file_path = os.path.join(self.output_dir, "%s.log" % random_string(5, 10))
+ self.log_file = None
+ self._do_periodic = False
+ self._periodic_thread = None
+
+ if HAVE_ETW and self.enabled:
+ filter_ips = set()
+ filter_ports = set()
+
+ try:
+ analysis_cfg = Config(cfg="analysis.conf")
+ host_ip = getattr(analysis_cfg, "ip", "")
+ if host_ip:
+ filter_ips.add(host_ip)
+ rs_port = getattr(analysis_cfg, "port", 0)
+ if rs_port:
+ filter_ports.add(int(rs_port))
+ except Exception as e:
+ log.debug("Could not read analysis config for filters: %s", e)
+
+ filter_ports.add(8000)
+ filter_ports.add(53)
+
+ log.info("NetworkETW filters: ips=%s ports=%s", filter_ips, filter_ports)
+
+ try:
+ self.log_file = open(self.log_file_path, "w", encoding="utf-8")
+ self.capture = NetworkETWProvider(
+ logfile=self.log_file,
+ level=255,
+ no_conout=True,
+ filter_ips=filter_ips,
+ filter_ports=filter_ports,
+ )
+ except Exception as e:
+ log.error("Failed to open Network ETW log file: %s", e)
+
+ def start(self):
+ result = super().start()
+ # Start periodic upload thread
+ if self.enabled and self.log_file:
+ self._do_periodic = True
+ self._periodic_thread = Thread(target=self._periodic_upload, daemon=True)
+ self._periodic_thread.start()
+ return result
+
+ def _periodic_upload(self):
+ """Periodically flush and upload current data."""
+ while self._do_periodic:
+ for _ in range(UPLOAD_INTERVAL):
+ if not self._do_periodic:
+ break
+ time.sleep(1)
+ if self._do_periodic and self.log_file:
+ try:
+ self.log_file.flush()
+ # Copy the file so we don't interfere with ongoing writes
+ snap_path = self.log_file_path + ".snap"
+ shutil.copy2(self.log_file_path, snap_path)
+ upload_to_host(snap_path, os.path.join("aux", "network_etw.json"))
+ log.debug("Periodic network_etw upload: %d bytes", os.path.getsize(snap_path))
+ os.remove(snap_path)
+ except Exception as e:
+ log.debug("Periodic network_etw upload failed: %s", e)
+
+ def upload_results(self):
+ """Final upload on stop."""
+ self._do_periodic = False
+ if self._periodic_thread:
+ self._periodic_thread.join(timeout=5)
+
+ if self.log_file:
+ try:
+ self.log_file.close()
+ except Exception:
+ pass
+ self.log_file = None
+
+ if os.path.isfile(self.log_file_path) and os.path.getsize(self.log_file_path) > 0:
+ try:
+ upload_to_host(self.log_file_path, os.path.join("aux", "network_etw.json"))
+ except Exception as e:
+ log.error("Final network_etw upload failed: %s", e)
+
+ # Clean up the random C:\
we created so it doesn't accumulate on
+ # VMs that aren't reverted from snapshot between analyses.
+ if self.output_dir and os.path.isdir(self.output_dir):
+ try:
+ shutil.rmtree(self.output_dir, ignore_errors=True)
+ except Exception as e:
+ log.debug("network_etw output_dir cleanup failed: %s", e)
diff --git a/conf/default/auxiliary.conf.default b/conf/default/auxiliary.conf.default
index 3e7b137f452..21018cdb1db 100644
--- a/conf/default/auxiliary.conf.default
+++ b/conf/default/auxiliary.conf.default
@@ -32,6 +32,7 @@ browsermonitor = no
wmi_etw = no
dns_etw = no
amsi_etw = no
+network_etw = no
watchdownloads = no
[AzSniffer]
diff --git a/conf/default/processing.conf.default b/conf/default/processing.conf.default
index 8fe384c0eaa..1a9cec3e06d 100644
--- a/conf/default/processing.conf.default
+++ b/conf/default/processing.conf.default
@@ -101,6 +101,9 @@ enabled = no
[network]
enabled = yes
sort_pcap = no
+# Which capture to analyze when decryptpcap produced additional outputs:
+# auto | original | mixed | decrypted
+pcapsrc = auto
# Enable mapping of network events to specific processes using behavioral analysis data
process_map = no
# Adds network connections seen in behavior but not in PCAP. Requires process_map = yes
@@ -123,7 +126,7 @@ country_lookup = no
maxmind_database = data/GeoLite2-Country.mmdb
[decryptpcap]
-enabled = yes
+enabled = no
# Path to GoGoRoboCap binary (relative to CUCKOO_ROOT or absolute)
gogorobocap = data/gogorobocap/gogorobocap-linux-amd64
# Decryption source: auto (default), pcap_with_keylog, or sslproxy_synth_pcap
@@ -181,6 +184,9 @@ urlscrub = (^http:\/\/serw\.clicksor\.com\/redir\.php\?url=|&InjectedParam=.+$)
[suricata]
enabled = no
runmode = cli
+# Which capture to analyze when decryptpcap produced additional outputs:
+# auto | original | mixed | decrypted
+pcapsrc = auto
# Outputfiles
# if evelog is specified, it will be used instead of the per-protocol log files
evelog = eve.json
@@ -248,6 +254,9 @@ file_cache = no
# Store pefile objects for later usage? useful if you doing something in signatures/reporting
pefile_store = no
+[network_etw]
+enabled = no
+
# Deduplicate screenshots - You need to install dependency ImageHash>=4.3.1
[deduplication]
#
@@ -348,4 +357,3 @@ enabled = no
# Enable when using the PolarProxy option during analysis. This will merge the tls.pcap containing
# plain-text TLS streams into the task PCAP.
enabled = no
-
diff --git a/lib/cuckoo/core/resultserver.py b/lib/cuckoo/core/resultserver.py
index 7e675d335d2..fa98dc259e3 100644
--- a/lib/cuckoo/core/resultserver.py
+++ b/lib/cuckoo/core/resultserver.py
@@ -81,6 +81,14 @@
RESULT_DIRECTORIES = RESULT_UPLOADABLE + (b"reports", b"logs")
+REPLACEABLE_RESULT_UPLOADS = (
+ b"tlsdump/",
+ b"aux/dns_etw.json",
+ b"aux/network_etw.json",
+ b"aux/wmi_etw.json",
+ b"aux/sslkeylogfile/sslkeys.log",
+)
+
def netlog_sanitize_fname(path):
"""Validate agent-provided path for result files"""
@@ -90,11 +98,17 @@ def netlog_sanitize_fname(path):
raise CuckooOperationalError(f"Netlog client requested banned path: {path}")
if any(c in BANNED_PATH_CHARS for c in name):
for c in BANNED_PATH_CHARS:
- path.replace(bytes([c]), b"X")
+ path = path.replace(bytes([c]), b"X")
return path
+def is_replaceable_result_upload(path):
+ """Return True for result uploads that are expected to overwrite prior
+ content with a full snapshot rather than append a distinct artifact."""
+ return path.startswith(REPLACEABLE_RESULT_UPLOADS)
+
+
class Disconnect(Exception):
pass
@@ -254,7 +268,15 @@ def handle(self):
try:
if file_path.endswith("_script.log"):
self.fd = open_inclusive(file_path)
- elif not path_exists(file_path):
+ elif is_replaceable_result_upload(dump_path) and path_exists(file_path):
+ # Auxiliary modules (tlsdump, network_etw, sslkeylogfile…)
+ # upload the SAME dump_path periodically so accumulated
+ # key / connection data survives an unexpected analysis
+ # termination. Each upload is a full replacement of the
+ # prior content — truncate and rewrite rather than failing
+ # silently with EEXIST.
+ self.fd = open(file_path, "wb")
+ else:
# open_exclusive will fail if file_path already exists
self.fd = open_exclusive(file_path)
except OSError as e:
diff --git a/modules/processing/decryptpcap.py b/modules/processing/decryptpcap.py
index ae1adf71cd1..72e4bc9d3c5 100644
--- a/modules/processing/decryptpcap.py
+++ b/modules/processing/decryptpcap.py
@@ -14,6 +14,56 @@
PCAP_HEADER_SIZE = 24
+def _get_option(options, key, default=None):
+ """Read `key` from `options` whether it's a dict-like or an attribute bag.
+
+ CAPE's processing modules receive `self.options` from different callers
+ in slightly different shapes (dict from task options, namespace from
+ config parsing). This helper handles both without TypeErrors."""
+ if options is None:
+ return default
+ getter = getattr(options, "get", None)
+ if callable(getter):
+ try:
+ return getter(key, default)
+ except TypeError:
+ pass
+ return getattr(options, key, default)
+
+
+def _is_usable_pcap(path):
+ return bool(path and os.path.exists(path) and os.path.getsize(path) > PCAP_HEADER_SIZE)
+
+
+def resolve_processing_pcap_path(analysis_path, default_pcap_path, pcapsrc="auto"):
+ """Pick the best PCAP for downstream processing modules.
+
+ `pcapsrc` may explicitly request `mixed`, `decrypted`, or `original`.
+ Any other value falls back to auto-selection: prefer `dump_mixed.pcap`,
+ then `dump_decrypted.pcap`, then the original capture.
+ """
+ mixed_path = os.path.join(analysis_path, "dump_mixed.pcap")
+ decrypted_path = os.path.join(analysis_path, "dump_decrypted.pcap")
+ requested = (pcapsrc or "auto").lower()
+
+ explicit = {
+ "mixed": mixed_path,
+ "decrypted": decrypted_path,
+ "original": default_pcap_path,
+ "default": default_pcap_path,
+ "dump": default_pcap_path,
+ }
+ if requested in explicit:
+ candidate = explicit[requested]
+ return candidate if candidate == default_pcap_path or _is_usable_pcap(candidate) else default_pcap_path
+
+ for candidate in (mixed_path, decrypted_path, default_pcap_path):
+ if candidate == default_pcap_path or _is_usable_pcap(candidate):
+ return candidate
+
+ return default_pcap_path
+
+
class DecryptPcap(Processing):
"""Generate decrypted pcaps from TLS traffic using GoGoRoboCap.
diff --git a/modules/processing/network.py b/modules/processing/network.py
index bb0b814a32a..372efd8605b 100644
--- a/modules/processing/network.py
+++ b/modules/processing/network.py
@@ -41,6 +41,7 @@
from lib.cuckoo.common.path_utils import path_delete, path_exists, path_mkdir, path_read_file, path_write_file
from lib.cuckoo.common.safelist import is_safelisted_domain
from lib.cuckoo.common.utils import convert_to_printable
+from modules.processing.decryptpcap import resolve_processing_pcap_path
# from lib.cuckoo.common.safelist import is_safelisted_ip
log = logging.getLogger(__name__)
@@ -1561,7 +1562,12 @@ def _merge_behavior_network(self, network):
target_list = "udp" if port == 53 else "tcp"
network.setdefault(target_list, []).append(entry)
+ def _resolve_pcap_path(self):
+ pcapsrc = self.options.get("pcapsrc", "auto") if self.options else "auto"
+ return resolve_processing_pcap_path(self.analysis_path, self.pcap_path, pcapsrc=pcapsrc)
+
def run(self):
+ self.pcap_path = self._resolve_pcap_path()
if not path_exists(self.pcap_path):
log.debug('The PCAP file does not exist at path "%s"', self.pcap_path)
return {}
diff --git a/modules/processing/network_etw.py b/modules/processing/network_etw.py
new file mode 100644
index 00000000000..4f5e31eafb4
--- /dev/null
+++ b/modules/processing/network_etw.py
@@ -0,0 +1,741 @@
+# Process→Network attribution for CAPE.
+#
+# Consumes every process-to-network signal captured during analysis and feeds
+# it into a single AttributionIndex. Each enrichment target (suricata alerts,
+# tls, http, files; network.tcp/udp/dns/hosts; sigma detections) queries the
+# index through one of four methods — no target-specific lookup tables.
+#
+# Signal sources, highest-confidence first:
+# 1. Sysmon EID 3 (NetworkConnect) — from evtx.zip. Full image path.
+# 2. Kernel-Network ETW (aux/network_etw.json) — periodic uploads.
+# 3. Sigma EID 3 matched_events — tertiary (catches late-fire flows).
+# 4. DNS-Client ETW (aux/dns_etw.json) — originating-process DNS. Used with
+# resolution data (#5) to attribute by resolved IP. Avoids svchost bias.
+# 5. Sysmon EID 22 (DnsQuery) — parallel to #4, covers queries that happened
+# before DNS-Client ETW subscribed (common miss for early CDN resolutions).
+# 6. Sigma EID 22 matched_events — subset of #5, has Image in the record.
+# 7. Resolution data — suricata.dns, network.dns, network.hosts, sigma EID 22
+# QueryResults. Used to turn hostnames into IPs for the DNS cross-reference.
+# 8. Sysmon EID 1 (ProcessCreate) — pid→image map for processes that made
+# queries but aren't in behavior (capemon not attached).
+
+import json
+import logging
+import os
+import shutil
+import tempfile
+import xml.etree.ElementTree as ET
+import zipfile
+
+from lib.cuckoo.common.abstracts import Processing
+
+# Sysmon Event XML lives in this namespace; ElementTree returns tag names
+# already qualified, so we strip the prefix when reading element names.
+EVT_NS = "{http://schemas.microsoft.com/win/2004/08/events/event}"
+
+log = logging.getLogger(__name__)
+
+try:
+ import Evtx.Evtx as EvtxParser
+ HAVE_EVTX = True
+except ImportError:
+ HAVE_EVTX = False
+
+
+def _clean_ip(s):
+ if not s:
+ return ""
+ s = s.strip()
+ if s.startswith("::ffff:"):
+ s = s[7:]
+ return s
+
+
+def _clean_host(s):
+ if not s:
+ return ""
+ return s.strip().rstrip(".").lower()
+
+
+class AttributionIndex:
+ """Centralized process-to-network attribution index.
+
+ Call order:
+ idx = AttributionIndex()
+ idx.add_pid_name(...) # any number of times
+ idx.add_connection(...) # any number of times
+ idx.add_dns_query(...)
+ idx.add_resolution(...)
+ idx.finalize()
+ idx.for_flow(...) # query API
+ ...
+ idx.set_http_owner(...) # after suricata.http enrichment
+ idx.for_http(...) # for files lookup
+ """
+
+ def __init__(self):
+ self._pid_to_name = {} # pid_str -> basename
+ self._by_ip = {} # ip -> [{pid, process_name, dst_port, protocol, source}]
+ self._dns_host_to_pid = {} # host -> (pid_str, name, source)
+ self._host_to_ips = {} # host -> set(ip)
+ self._ip_via_dns = {} # ip -> [(pid_str, host)]
+ self._http_by_uri = {} # (host, uri) -> (pid_str, name)
+ self._http_by_host = {} # host -> (pid_str, name)
+ # Counters surfaced via .stats() for logging
+ self.stats_counters = {"dns_etw": 0, "sysmon_eid22": 0,
+ "sigma_eid22": 0, "direct": 0,
+ "resolutions": 0}
+
+ # ------------------------------------------------------------------ seed
+ def add_pid_name(self, pid, image_or_name):
+ if not pid or not image_or_name:
+ return
+ pid = str(pid)
+ name = os.path.basename(image_or_name)
+ self._pid_to_name.setdefault(pid, name)
+ for entries in self._by_ip.values():
+ for entry in entries:
+ if entry["pid"] == pid and not entry["process_name"]:
+ entry["process_name"] = self._pid_to_name[pid]
+
+ def name_of(self, pid):
+ return self._pid_to_name.get(str(pid), "") if pid else ""
+
+ def pid_names(self):
+ """Read-only view of the pid->name map. Callers that need to seed
+ another helper with the current names should use this rather than
+ the underscored attribute."""
+ return dict(self._pid_to_name)
+
+ def add_connection(self, pid, dst_ip, dst_port=None, src_ip="",
+ src_port=None, protocol="", process_name="", source=""):
+ """Direct connection observed (kernel-ETW, sysmon EID 3, sigma EID 3).
+ src_port is the disambiguator when multiple processes share a (dst_ip,
+ dst_port) — every TCP flow has a unique client-side ephemeral port."""
+ if not pid or not dst_ip:
+ return
+ pid = str(pid)
+ dst_ip = _clean_ip(dst_ip)
+ if not dst_ip or dst_ip in ("127.0.0.1", "::1", "0.0.0.0", "::"):
+ return
+ if process_name:
+ self.add_pid_name(pid, process_name)
+ name = self._pid_to_name.get(pid, process_name or "")
+ entry = {
+ "pid": pid,
+ "process_name": name,
+ "src_ip": _clean_ip(src_ip),
+ "src_port": str(src_port) if src_port is not None and src_port != "" else "",
+ "dst_port": str(dst_port) if dst_port is not None and dst_port != "" else "",
+ "protocol": (protocol or "").upper(),
+ "source": source,
+ }
+ bucket = self._by_ip.setdefault(dst_ip, [])
+ # Dedupe on (pid, src_port, dst_port, protocol) so the same connection
+ # observed by multiple sources doesn't multiply, but distinct flows
+ # from the same process (different src_ports) stay separate.
+ dedup_key = (pid, entry["src_port"], entry["dst_port"], entry["protocol"])
+ for existing in bucket:
+ if (existing["pid"], existing["src_port"], existing["dst_port"],
+ existing["protocol"]) == dedup_key:
+ return
+ bucket.append(entry)
+ self.stats_counters["direct"] += 1
+
+ def add_dns_query(self, pid, hostname, image_or_name="", source=""):
+ """pid asked for hostname (DNS-Client ETW / sysmon EID 22 / sigma EID 22)."""
+ if not pid or not hostname:
+ return
+ pid = str(pid)
+ h = _clean_host(hostname)
+ if not h:
+ return
+ if image_or_name:
+ self.add_pid_name(pid, image_or_name)
+ name = self._pid_to_name.get(pid, "")
+ # Skip positively-identified svchost (dnscache doing a delegated lookup
+ # isn't the real owner). Unknown names are kept — PID alone is still
+ # useful attribution.
+ if name and "svchost" in name.lower():
+ return
+ self._dns_host_to_pid.setdefault(h, (pid, name, source))
+ if source in self.stats_counters:
+ self.stats_counters[source] += 1
+
+ def add_resolution(self, hostname, ip):
+ """hostname resolves to ip (suricata.dns, network.dns, network.hosts, sigma EID 22)."""
+ h = _clean_host(hostname)
+ ip = _clean_ip(ip)
+ if not h or not ip:
+ return
+ # Basic garbage filter
+ if ":" in ip and ip.count(":") < 2:
+ return
+ self._host_to_ips.setdefault(h, set()).add(ip)
+ self.stats_counters["resolutions"] += 1
+
+ # --------------------------------------------------------------- finalize
+ def finalize(self):
+ """Cross-reference DNS queries × resolutions into ip_via_dns."""
+ for host, (pid, name, source) in self._dns_host_to_pid.items():
+ for ip in self._host_to_ips.get(host, ()):
+ self._ip_via_dns.setdefault(ip, []).append((pid, host))
+
+ # --------------------------------------------------------------- queries
+ def for_ip(self, ip, dst_port=None, src_port=None):
+ """Best process for a connection to `ip`. Match priority:
+ 1. exact 5-tuple match by src_port (each TCP flow has unique
+ client ephemeral port — disambiguates multi-process cases)
+ 2. dst_port match (when src_port unknown or not in index)
+ 3. first known process (when no port info available)
+ 4. DNS-resolved IP fallback (process asked for a hostname that
+ resolved to this IP, but we never saw the connect)
+ Returns {pid, process_name, source, ...} or None."""
+ ip = _clean_ip(ip)
+ if not ip:
+ return None
+ procs = self._by_ip.get(ip)
+ if procs:
+ # 1. src_port match — most specific
+ if src_port is not None and src_port != "":
+ for p in procs:
+ if p["src_port"] == str(src_port):
+ return dict(p)
+ # 2. dst_port match
+ if dst_port is not None and dst_port != "":
+ for p in procs:
+ if p["dst_port"] == str(dst_port):
+ return dict(p)
+ # 3. first known
+ return dict(procs[0])
+ # 4. DNS fallback
+ dns_hits = self._ip_via_dns.get(ip)
+ if dns_hits:
+ pid, host = dns_hits[0]
+ name = self._pid_to_name.get(pid, "")
+ if name and "svchost" in name.lower():
+ return None
+ return {
+ "pid": pid,
+ "process_name": name,
+ "src_port": "",
+ "dst_port": str(dst_port) if dst_port is not None else "",
+ "protocol": "TCP",
+ "source": "dns-fallback",
+ "resolved_hostname": host,
+ }
+ return None
+
+ def for_flow(self, dstip="", dstport=None, srcip="", srcport=None):
+ """Bidirectional flow attribution with full 5-tuple matching when
+ available. Tries dst-side first (outbound-favored), then src-side
+ for ingress alerts where dst is the local VM."""
+ # On the outbound interpretation, srcport is the local ephemeral
+ # port — that's the disambiguator. On the ingress interpretation
+ # (alert dst=VM), dstport is the local ephemeral port.
+ return (self.for_ip(dstip, dst_port=dstport, src_port=srcport)
+ or self.for_ip(srcip, dst_port=srcport, src_port=dstport))
+
+ def for_host(self, hostname):
+ """(pid, name) that queried this hostname, or None. Used for files
+ and network.dns records."""
+ h = _clean_host(hostname)
+ if not h:
+ return None
+ rec = self._dns_host_to_pid.get(h)
+ if not rec:
+ return None
+ pid, name, _src = rec
+ return (pid, name)
+
+ def for_http(self, host, uri):
+ """(pid, name) from an already-enriched HTTP transaction. Prefer an
+ exact (host, uri) match; fall back to host alone; finally DNS.
+ Hostnames are normalised to lowercase per RFC 4343."""
+ host = host.lower() if host else ""
+ if host and uri:
+ hit = self._http_by_uri.get((host, uri))
+ if hit:
+ return hit
+ if host:
+ hit = self._http_by_host.get(host)
+ if hit:
+ return hit
+ return self.for_host(host)
+
+ def set_http_owner(self, host, uri, pid, name):
+ """Register an attributed HTTP transaction for subsequent files lookup."""
+ if not pid:
+ return
+ pid = str(pid)
+ host = host.lower() if host else ""
+ if host and uri:
+ self._http_by_uri.setdefault((host, uri), (pid, name))
+ if host:
+ self._http_by_host.setdefault(host, (pid, name))
+
+ def all_processes_for(self, ip):
+ """Return every distinct (pid, name) seen for an IP — direct + DNS.
+ Used by network.hosts where multiple processes may share a dst."""
+ ip = _clean_ip(ip)
+ if not ip:
+ return []
+ seen = set()
+ out = []
+ for p in self._by_ip.get(ip, ()):
+ key = (p["pid"], p["process_name"])
+ if key in seen:
+ continue
+ seen.add(key)
+ out.append({
+ "pid": p["pid"],
+ "process_name": p["process_name"],
+ "dst_port": p["dst_port"],
+ "protocol": p["protocol"],
+ "source": p["source"],
+ })
+ for pid, host in self._ip_via_dns.get(ip, ()):
+ key = (pid, self._pid_to_name.get(pid, ""))
+ if key in seen:
+ continue
+ seen.add(key)
+ out.append({
+ "pid": pid,
+ "process_name": self._pid_to_name.get(pid, ""),
+ "dst_port": "",
+ "protocol": "",
+ "source": "dns-fallback",
+ "resolved_hostname": host,
+ })
+ return out
+
+
+class NetworkETW(Processing):
+ """Parse network connection events and correlate with process info."""
+
+ key = "network_etw"
+ order = 99 # Run after suricata but before dnsgeeo (101)
+
+ # ------------------------------------------------------------------ parse
+ @staticmethod
+ def _safe_extract(zf, member, dest_dir):
+ """Extract `member` from `zf` into `dest_dir` only if the resolved
+ target stays inside `dest_dir` (zip-slip guard). Returns the on-disk
+ path on success, None if the entry would escape."""
+ target = os.path.realpath(os.path.join(dest_dir, member))
+ if not target.startswith(os.path.realpath(dest_dir) + os.sep):
+ log.warning("Skipping evtx zip entry that would escape tmpdir: %s", member)
+ return None
+ zf.extract(member, dest_dir)
+ return target
+
+ @staticmethod
+ def _read_evt_data(event_elem):
+ """Parse value... into a
+ dict with XML entities properly decoded. Returns {} when no EventData
+ present (some events don't carry one)."""
+ out = {}
+ ed = event_elem.find(EVT_NS + "EventData")
+ if ed is None:
+ return out
+ for d in ed.findall(EVT_NS + "Data"):
+ name = d.get("Name")
+ if not name:
+ continue
+ # ElementTree returns text=None for self-closing/empty elements;
+ # normalise to "" so callers can distinguish "missing" via .get()
+ # default vs "present but empty" via "" — same as before, but now
+ # entity-decoded (& → &, < → <, NN; → unicode char).
+ out[name] = (d.text or "").strip()
+ return out
+
+ def _parse_sysmon_evtx(self):
+ """Extract EID 1 / EID 3 / EID 22 from sysmon EVTX snapshots.
+ Returns (connections, pid_to_image, dns_queries)."""
+ connections = []
+ pid_to_image = {}
+ dns_queries = []
+ evtx_path = os.path.join(self.analysis_path, "evtx", "evtx.zip")
+ if not HAVE_EVTX or not os.path.exists(evtx_path):
+ return connections, pid_to_image, dns_queries
+
+ tmpdir = tempfile.mkdtemp()
+ try:
+ with zipfile.ZipFile(evtx_path) as z:
+ sysmon_files = sorted([f for f in z.namelist() if "Sysmon" in f])
+ for fname in sysmon_files:
+ path = self._safe_extract(z, fname, tmpdir)
+ if path is None:
+ continue
+ try:
+ with EvtxParser.Evtx(path) as ef:
+ for rec in ef.records():
+ try:
+ root = ET.fromstring(rec.xml())
+ except ET.ParseError as parse_err:
+ log.debug("Skipping malformed evtx record in %s: %s",
+ fname, parse_err)
+ continue
+ sys_elem = root.find(EVT_NS + "System")
+ if sys_elem is None:
+ continue
+ eid_elem = sys_elem.find(EVT_NS + "EventID")
+ if eid_elem is None or eid_elem.text not in ("1", "3", "22"):
+ continue
+ eid = eid_elem.text
+ fields = self._read_evt_data(root)
+
+ if eid == "1":
+ pid = fields.get("ProcessId", "")
+ image = fields.get("Image", "")
+ if pid and image:
+ pid_to_image[str(pid)] = os.path.basename(image)
+
+ elif eid == "22":
+ pid = fields.get("ProcessId", "")
+ qname = _clean_host(fields.get("QueryName", ""))
+ image = fields.get("Image", "")
+ if pid and qname:
+ dns_queries.append((str(pid), qname, image))
+ if pid and image:
+ pid_to_image.setdefault(str(pid), os.path.basename(image))
+
+ else: # "3"
+ connections.append({
+ "pid": fields.get("ProcessId", ""),
+ "process_name": os.path.basename(fields.get("Image", "")),
+ "process_path": fields.get("Image", ""),
+ "protocol": fields.get("Protocol", "").upper(),
+ "direction": "outbound" if fields.get("Initiated") == "true" else "inbound",
+ "src_ip": fields.get("SourceIp", ""),
+ "src_port": fields.get("SourcePort", ""),
+ "dst_ip": fields.get("DestinationIp", ""),
+ "dst_port": fields.get("DestinationPort", ""),
+ "dst_hostname": fields.get("DestinationHostname", ""),
+ "source": "sysmon",
+ })
+ except Exception:
+ log.debug("Failed to parse sysmon EVTX %s", fname, exc_info=True)
+ except Exception:
+ log.warning("Failed to read EVTX zip", exc_info=True)
+ finally:
+ shutil.rmtree(tmpdir, ignore_errors=True)
+
+ return connections, pid_to_image, dns_queries
+
+ def _parse_kernel_network_etw(self, pid_to_name):
+ """Parse aux/network_etw.json from the Microsoft-Windows-Kernel-Network
+ ETW provider (captured by the network_etw auxiliary at analysis time)."""
+ connections = []
+ etw_path = os.path.join(self.analysis_path, "aux", "network_etw.json")
+ if not os.path.exists(etw_path):
+ return connections
+
+ try:
+ with open(etw_path, "r") as f:
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ event = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+ pid = str(event.get("pid", ""))
+ connections.append({
+ "pid": pid,
+ "process_name": pid_to_name.get(pid, ""),
+ "process_path": "",
+ "protocol": event.get("protocol", "").upper(),
+ "direction": event.get("direction", ""),
+ "src_ip": event.get("src_ip", ""),
+ "src_port": str(event.get("src_port", "")),
+ "dst_ip": event.get("dst_ip", ""),
+ "dst_port": str(event.get("dst_port", "")),
+ "dst_hostname": "",
+ "source": "kernel_etw",
+ })
+ except Exception:
+ log.warning("Failed to parse network ETW data", exc_info=True)
+
+ return connections
+
+ def _parse_dns_etw(self):
+ """Parse aux/dns_etw.json (DNS-Client ETW; originating-process DNS).
+ Returns: [(pid_str, hostname_lower), ...]."""
+ out = []
+ path = os.path.join(self.analysis_path, "aux", "dns_etw.json")
+ if not os.path.exists(path):
+ return out
+ try:
+ with open(path, "r") as f:
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ e = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+ if e.get("QueryType") != "Query":
+ continue
+ pid = e.get("ProcessId")
+ qname = _clean_host(e.get("QueryName", ""))
+ if pid is None or not qname:
+ continue
+ out.append((str(pid), qname))
+ except Exception:
+ log.warning("Failed to parse dns_etw.json", exc_info=True)
+ return out
+
+ # --------------------------------------------------------------------- run
+ def run(self):
+ results = {
+ "process_connections": [],
+ "connections_by_pid": {},
+ "connections_by_dst": {},
+ }
+
+ idx = AttributionIndex()
+
+ # pid->image seeds ---------------------------------------------------
+ behavior_processes = self.results.get("behavior", {}).get("processes", []) or []
+ for proc in behavior_processes:
+ idx.add_pid_name(proc.get("process_id"), proc.get("process_name", ""))
+
+ sysmon_conns, sysmon_pid_to_image, sysmon_dns_queries = self._parse_sysmon_evtx()
+ for pid, image in sysmon_pid_to_image.items():
+ idx.add_pid_name(pid, image)
+
+ # Direct connections -------------------------------------------------
+ for c in sysmon_conns:
+ idx.add_connection(
+ pid=c["pid"], dst_ip=c["dst_ip"], dst_port=c["dst_port"],
+ src_ip=c.get("src_ip", ""), src_port=c.get("src_port", ""),
+ protocol=c["protocol"], process_name=c["process_name"],
+ source="sysmon",
+ )
+
+ etw_conns = self._parse_kernel_network_etw(idx.pid_names())
+ for c in etw_conns:
+ idx.add_connection(
+ pid=c["pid"], dst_ip=c["dst_ip"], dst_port=c["dst_port"],
+ src_ip=c.get("src_ip", ""), src_port=c.get("src_port", ""),
+ protocol=c["protocol"], process_name=c["process_name"],
+ source="kernel_etw",
+ )
+
+ sigma = self.results.get("sigma", {}) or {}
+ for det in sigma.get("detections", []) or []:
+ for ev in det.get("matched_events", []) or []:
+ if ev.get("EventID") == 3 and ev.get("ProcessID") is not None:
+ image = ev.get("Image", "")
+ idx.add_pid_name(ev.get("ProcessID"), image)
+ idx.add_connection(
+ pid=ev.get("ProcessID"),
+ dst_ip=ev.get("DestinationIp", ""),
+ dst_port=ev.get("DestinationPort"),
+ src_ip=ev.get("SourceIp", ""),
+ src_port=ev.get("SourcePort"),
+ protocol=ev.get("Protocol", ""),
+ process_name=os.path.basename(image) if image else "",
+ source="sigma",
+ )
+
+ # DNS queries (pid -> hostname) --------------------------------------
+ for pid, host in self._parse_dns_etw():
+ idx.add_dns_query(pid, host, source="dns_etw")
+ for pid, host, image in sysmon_dns_queries:
+ idx.add_dns_query(pid, host, image, source="sysmon_eid22")
+ for det in sigma.get("detections", []) or []:
+ for ev in det.get("matched_events", []) or []:
+ if ev.get("EventID") != 22:
+ continue
+ pid = ev.get("ProcessID")
+ if pid is None:
+ continue
+ idx.add_dns_query(pid, ev.get("QueryName", ""),
+ ev.get("Image", ""), source="sigma_eid22")
+
+ # Resolutions (hostname -> IPs) --------------------------------------
+ suricata = self.results.get("suricata", {}) or {}
+ network = self.results.get("network", {}) or {}
+
+ for rec in suricata.get("dns", []) or []:
+ q = rec.get("rrname") or rec.get("query") or ""
+ idx.add_resolution(q, rec.get("rdata") or rec.get("answer") or "")
+ for a in rec.get("answers", []) or []:
+ idx.add_resolution(q, a.get("rdata") or a.get("data") or "")
+ for rec in network.get("dns", []) or []:
+ q = rec.get("request") or ""
+ for a in rec.get("answers", []) or []:
+ if a.get("type") in ("A", "AAAA"):
+ idx.add_resolution(q, a.get("data", ""))
+ for rec in network.get("hosts", []) or []:
+ idx.add_resolution(rec.get("hostname", ""), rec.get("ip", ""))
+ for det in sigma.get("detections", []) or []:
+ for ev in det.get("matched_events", []) or []:
+ if ev.get("EventID") != 22:
+ continue
+ q = ev.get("QueryName", "")
+ raw = ev.get("QueryResults", "") or ""
+ for part in raw.split(";"):
+ part = part.strip()
+ if part and not part.startswith("type:"):
+ idx.add_resolution(q, part)
+
+ idx.finalize()
+
+ # Build the result structure (process_connections + by_pid + by_dst).
+ # Dedupe key here is intentionally coarser than AttributionIndex's
+ # (which uses src_port to keep distinct flows separate): this view
+ # is for human consumption — multiple ephemeral connections from
+ # the same process to the same dst_ip:dst_port should fold to one
+ # row in process_connections / connections_by_dst. AttributionIndex
+ # still has the per-flow detail for query-time matching.
+ merged = []
+ seen = set()
+ for pool in (sysmon_conns, etw_conns):
+ for c in pool:
+ key = (c["pid"], c["dst_ip"], c["dst_port"])
+ if c["pid"] and c["dst_ip"] and key not in seen:
+ seen.add(key)
+ merged.append(c)
+
+ by_pid = {}
+ by_dst = {}
+ for c in merged:
+ pid = c["pid"]
+ dst = c["dst_ip"]
+ if not dst or dst in ("127.0.0.1", "::1", "0.0.0.0", "::"):
+ continue
+ by_pid.setdefault(pid, {
+ "pid": pid,
+ "process_name": c["process_name"],
+ "process_path": c.get("process_path", ""),
+ "connections": [],
+ })["connections"].append({
+ "dst_ip": dst,
+ "dst_port": c["dst_port"],
+ "protocol": c["protocol"],
+ })
+ by_dst.setdefault(dst, []).append({
+ "pid": pid,
+ "process_name": c["process_name"],
+ "dst_port": c["dst_port"],
+ "protocol": c["protocol"],
+ "source": c.get("source", ""),
+ })
+
+ results["process_connections"] = merged
+ results["connections_by_pid"] = by_pid
+ results["connections_by_dst"] = by_dst
+
+ log.info(
+ "network_etw: sources — %d sysmon conns, %d kernel-ETW conns, "
+ "%d pid->image, %d sysmon DNS, %d DNS-ETW pairs, %d resolutions",
+ len(sysmon_conns), len(etw_conns), len(sysmon_pid_to_image),
+ len(sysmon_dns_queries), idx.stats_counters.get("dns_etw", 0),
+ idx.stats_counters.get("resolutions", 0),
+ )
+
+ # Enrichment loops — all go through the single index ----------------
+ enriched = {k: 0 for k in ("alerts", "tls", "http", "files",
+ "tcp", "udp", "hosts", "dns", "sigma")}
+
+ def apply(rec, hit):
+ if not hit:
+ return False
+ rec["process_name"] = hit.get("process_name", "")
+ rec["process_id"] = hit.get("pid", "")
+ return True
+
+ # suricata.alerts — bidirectional (ingress-direction rules dst=VM)
+ for rec in suricata.get("alerts", []) or []:
+ hit = idx.for_flow(rec.get("dstip", ""), rec.get("dstport"),
+ rec.get("srcip", ""), rec.get("srcport"))
+ if apply(rec, hit):
+ enriched["alerts"] += 1
+
+ # suricata.tls + http — dst-based (with src fallback too, for safety)
+ for kind in ("tls", "http"):
+ for rec in suricata.get(kind, []) or []:
+ hit = idx.for_flow(rec.get("dstip", ""), rec.get("dstport"),
+ rec.get("srcip", ""), rec.get("srcport"))
+ if apply(rec, hit):
+ enriched[kind] += 1
+ if kind == "http":
+ idx.set_http_owner(rec.get("hostname", ""),
+ rec.get("uri", ""),
+ hit["pid"], hit.get("process_name", ""))
+
+ # suricata.files — via HTTP transaction (uri/host) or DNS hostname
+ for rec in suricata.get("files", []) or []:
+ host = rec.get("http_host", "")
+ hit = idx.for_http(host, rec.get("http_uri", ""))
+ if hit:
+ pid, name = hit
+ rec["process_name"] = name
+ rec["process_id"] = pid
+ enriched["files"] += 1
+
+ # network.tcp / udp — CAPE's pcap-parsed connections (sport disambiguates)
+ for proto in ("tcp", "udp"):
+ for rec in network.get(proto, []) or []:
+ hit = idx.for_ip(rec.get("dst", ""),
+ dst_port=rec.get("dport"),
+ src_port=rec.get("sport"))
+ if apply(rec, hit):
+ enriched[proto] += 1
+
+ # network.dns — via DNS-query hostname (never by UDP 53 flow owner)
+ for rec in network.get("dns", []) or []:
+ hit = idx.for_host(rec.get("request", ""))
+ if hit:
+ pid, name = hit
+ rec["process_name"] = name
+ rec["process_id"] = pid
+ enriched["dns"] += 1
+
+ # network.hosts — may have multiple owners; list all
+ for rec in network.get("hosts", []) or []:
+ owners = idx.all_processes_for(rec.get("ip", ""))
+ if owners:
+ rec["processes"] = owners
+ enriched["hosts"] += 1
+
+ # sigma.detections — hoist (pid, image, command_line, parent) from
+ # matched_events so the UI doesn't have to dig
+ for det in sigma.get("detections", []) or []:
+ seen_procs = set()
+ procs = []
+ for ev in det.get("matched_events", []) or []:
+ pid = ev.get("ProcessID")
+ image = ev.get("Image", "")
+ if pid is None and not image:
+ continue
+ key = (pid, image)
+ if key in seen_procs:
+ continue
+ seen_procs.add(key)
+ procs.append({
+ "pid": pid,
+ "process_name": os.path.basename(image) if image else "",
+ "process_path": image,
+ "command_line": ev.get("CommandLine", ""),
+ "parent_pid": ev.get("ParentProcessId"),
+ "parent_image": ev.get("ParentImage", ""),
+ })
+ if procs:
+ det["processes"] = procs
+ enriched["sigma"] += 1
+
+ log.info(
+ "network_etw: enriched — %d alerts, %d tls, %d http, %d files, "
+ "%d tcp, %d udp, %d dns, %d hosts, %d sigma",
+ enriched["alerts"], enriched["tls"], enriched["http"], enriched["files"],
+ enriched["tcp"], enriched["udp"], enriched["dns"], enriched["hosts"],
+ enriched["sigma"],
+ )
+
+ return results
diff --git a/modules/processing/suricata.py b/modules/processing/suricata.py
index de2358eac5b..3fa82d249bd 100644
--- a/modules/processing/suricata.py
+++ b/modules/processing/suricata.py
@@ -19,6 +19,7 @@
from lib.cuckoo.common.path_utils import path_delete, path_exists, path_read_file, path_write_file
from lib.cuckoo.common.suricata_detection import et_categories, get_suricata_family
from lib.cuckoo.common.utils import add_family_detection, convert_to_printable_and_truncate
+from modules.processing.decryptpcap import resolve_processing_pcap_path
processing_cfg = Config("processing")
@@ -40,6 +41,10 @@
class Suricata(Processing):
"""Suricata processing."""
+ def _resolve_pcap_path(self):
+ pcapsrc = self.options.get("pcapsrc", "auto") if self.options else "auto"
+ return resolve_processing_pcap_path(self.analysis_path, self.pcap_path, pcapsrc=pcapsrc)
+
def cmd_wrapper(self, cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = p.communicate()
@@ -58,6 +63,7 @@ def run(self):
"""Run Suricata.
@return: hash with alerts
"""
+ self.pcap_path = self._resolve_pcap_path()
self.key = "suricata"
# General
SURICATA_CONF = self.options.get("conf")
diff --git a/tests/test_network_capture_integration.py b/tests/test_network_capture_integration.py
new file mode 100644
index 00000000000..fbb4fb31cc7
--- /dev/null
+++ b/tests/test_network_capture_integration.py
@@ -0,0 +1,179 @@
+import sys
+from types import ModuleType
+from unittest.mock import mock_open, patch
+
+
+# Tests for the network_etw / decryptpcap / resultserver code paths run in
+# isolation from the full CAPE runtime. The modules-under-test transitively
+# import gevent + a handful of CAPE-internal helpers that we don't want to
+# bring into pytest just to exercise pure logic.
+#
+# `setdefault` here means: only stub if pytest hasn't already imported the
+# real module via another collected test. That keeps these stubs from
+# clobbering an installed module when running the full suite, while still
+# letting `python -m pytest tests/test_network_capture_integration.py` work
+# in a stripped-down environment.
+def _stub_module(name):
+ module = ModuleType(name)
+ sys.modules.setdefault(name, module)
+ return module
+
+
+gevent_mod = _stub_module("gevent")
+gevent_mod.__path__ = []
+gevent_pool_mod = _stub_module("gevent.pool")
+gevent_server_mod = _stub_module("gevent.server")
+gevent_socket_mod = _stub_module("gevent.socket")
+gevent_thread_mod = _stub_module("gevent.thread")
+gevent_mod.pool = gevent_pool_mod
+gevent_mod.server = gevent_server_mod
+gevent_mod.socket = gevent_socket_mod
+gevent_mod.thread = gevent_thread_mod
+gevent_server_mod.StreamServer = object
+gevent_pool_mod.Pool = object
+
+abstracts_mod = ModuleType("lib.cuckoo.common.abstracts")
+
+
+class Processing:
+ def __init__(self, results=None):
+ self.results = results
+ self.analysis_path = ""
+ self.pcap_path = ""
+ self.options = None
+
+ def set_path(self, analysis_path):
+ self.analysis_path = analysis_path
+ self.pcap_path = f"{analysis_path}/dump.pcap"
+
+ def set_options(self, options):
+ self.options = options
+
+
+class ProtocolHandler:
+ def __init__(self, task_id, ctx, version=None):
+ self.task_id = task_id
+ self.handler = ctx
+ self.version = version
+ self.fd = None
+
+
+abstracts_mod.Processing = Processing
+abstracts_mod.ProtocolHandler = ProtocolHandler
+sys.modules.setdefault("lib.cuckoo.common.abstracts", abstracts_mod)
+
+objects_mod = ModuleType("lib.cuckoo.common.objects")
+
+
+class File:
+ def __init__(self, path):
+ self.path = path
+
+ def get_sha256(self):
+ return "sha256"
+
+
+objects_mod.File = File
+sys.modules.setdefault("lib.cuckoo.common.objects", objects_mod)
+
+log_mod = ModuleType("lib.cuckoo.core.log")
+log_mod.task_log_start = lambda *args, **kwargs: None
+log_mod.task_log_stop = lambda *args, **kwargs: None
+log_mod.task_log_stop_force = lambda *args, **kwargs: None
+sys.modules.setdefault("lib.cuckoo.core.log", log_mod)
+
+from lib.cuckoo.common.config import Config
+from lib.cuckoo.core.resultserver import FileUpload
+from modules.processing import decryptpcap as decryptpcap_mod
+from modules.processing.network_etw import AttributionIndex
+
+
+
+class DummySock:
+ def settimeout(self, _value):
+ pass
+
+
+class DummyContext:
+ def __init__(self, storagepath, lines):
+ self.storagepath = storagepath
+ self.sock = DummySock()
+ self._lines = list(lines)
+
+ def read_newline(self):
+ return self._lines.pop(0)
+
+ def copy_to_fd(self, fd, _max_size=None):
+ fd.write(b"payload")
+ fd.flush()
+
+
+def test_auxiliary_config_registers_network_etw():
+ cfg = Config("auxiliary")
+
+ assert hasattr(cfg.auxiliary_modules, "network_etw")
+
+
+def test_processing_config_registers_decryptpcap_and_network_etw():
+ cfg = Config("processing")
+
+ assert cfg.get("decryptpcap").enabled is False
+ assert cfg.get("network_etw").enabled is False
+
+
+def test_resultserver_allows_overwrite_for_periodic_aux_logs(tmp_path):
+ ctx = DummyContext(str(tmp_path), [b"aux/network_etw.json"])
+ upload = FileUpload(task_id=7, ctx=ctx)
+ upload.init()
+
+ open_mock = mock_open()
+
+ with patch("lib.cuckoo.core.resultserver.path_exists", return_value=True), patch(
+ "lib.cuckoo.core.resultserver.open_exclusive"
+ ) as exclusive_mock, patch(
+ "lib.cuckoo.core.resultserver.open", open_mock
+ ) as patched_open:
+ upload.handle()
+
+ # Existing replaceable path -> truncate-write via plain open(..., "wb")
+ patched_open.assert_any_call(str(tmp_path / "aux/network_etw.json"), "wb")
+ # And NOT via open_exclusive — that would EEXIST and silently drop the
+ # upload, which is exactly the bug REPLACEABLE_RESULT_UPLOADS fixes.
+ exclusive_mock.assert_not_called()
+
+
+def test_pcap_selector_prefers_mixed_pcap_when_configured(tmp_path):
+ mixed = tmp_path / "dump_mixed.pcap"
+ mixed.write_bytes(b"x" * 32)
+
+ selected = decryptpcap_mod.resolve_processing_pcap_path(
+ analysis_path=str(tmp_path),
+ default_pcap_path=str(tmp_path / "dump.pcap"),
+ pcapsrc="mixed",
+ )
+
+ assert selected == str(mixed)
+
+
+def test_pcap_selector_prefers_decrypted_pcap_when_configured(tmp_path):
+ decrypted = tmp_path / "dump_decrypted.pcap"
+ decrypted.write_bytes(b"x" * 32)
+
+ selected = decryptpcap_mod.resolve_processing_pcap_path(
+ analysis_path=str(tmp_path),
+ default_pcap_path=str(tmp_path / "dump.pcap"),
+ pcapsrc="decrypted",
+ )
+
+ assert selected == str(decrypted)
+
+
+def test_attribution_index_backfills_process_names():
+ idx = AttributionIndex()
+
+ idx.add_connection(pid=42, dst_ip="8.8.8.8", dst_port=443, protocol="tcp", process_name="", source="kernel_etw")
+ idx.add_pid_name(42, "powershell.exe")
+
+ hit = idx.for_ip("8.8.8.8", dst_port=443)
+
+ assert hit["process_name"] == "powershell.exe"
diff --git a/web/templates/analysis/network/_hosts.html b/web/templates/analysis/network/_hosts.html
index c1eb25adcca..5b4f8db6110 100644
--- a/web/templates/analysis/network/_hosts.html
+++ b/web/templates/analysis/network/_hosts.html
@@ -11,7 +11,7 @@ HostsIP
| Country Name |
ASN |
- {% if settings.NETWORK_PROC_MAP %} Process Name (PID) | {% endif %}
+ {% if settings.NETWORK_PROC_MAP %} Processes | {% endif %}
{% for host in network.hosts %}
@@ -32,15 +32,25 @@ Hosts
| {{host.country_name}} |
- {% if host.asn %}
- {{host.asn}}{% if host.asn_name %} - {{host.asn_name}}{% endif %} |
- {% endif %}
+
+ {% if host.asn %}{{host.asn}}{% if host.asn_name %} - {{host.asn_name}}{% endif %}{% else %}-{% endif %}
+ |
{% if settings.NETWORK_PROC_MAP %}
- {% if host.process_name %}
- {{ host.process_name }}{% if host.process_id %} ({{ host.process_id }}){% endif %}
+ {% if host.processes %}
+ {% for p in host.processes %}
+
+ {% if p.process_name %}{{ p.process_name }}{% else %}(unknown){% endif %}{% if p.pid %} ({{ p.pid }}){% endif %}
+
+ {% endfor %}
+ {% elif host.process_name or host.process_id %}
+ {# Legacy single-owner attribution from CAPE's existing process_map enrichment in network.py — used when network_etw module isn't enabled #}
+
+ {% if host.process_name %}{{ host.process_name }}{% else %}(unknown){% endif %}{% if host.process_id %} ({{ host.process_id }}){% endif %}
+
{% else %}
- -
+ -
{% endif %}
|
{% endif %}
diff --git a/web/templates/analysis/network/_suricata_files.html b/web/templates/analysis/network/_suricata_files.html
index cfcbf50b520..425160afbdb 100644
--- a/web/templates/analysis/network/_suricata_files.html
+++ b/web/templates/analysis/network/_suricata_files.html
@@ -7,6 +7,12 @@
File name |
{{file.filename}} |
+ {% if settings.NETWORK_PROC_MAP %}
+
+ | Process |
+ {% if file.process_name %}{{file.process_name}} ({{file.process_id}}){% else %}-{% endif %} |
+
+ {% endif %}
| Expected File Size |
{{file.size}} bytes |
diff --git a/web/templates/analysis/network/_suricata_http.html b/web/templates/analysis/network/_suricata_http.html
index 3244be161d7..a90a4d91f1e 100644
--- a/web/templates/analysis/network/_suricata_http.html
+++ b/web/templates/analysis/network/_suricata_http.html
@@ -5,25 +5,27 @@ Suricata
{% if suricata.http %}
-
+
- | Timestamp |
- Source IP |
- Source Port |
- Destination IP |
- Destination Port |
- Method |
- Status |
- Hostname |
- URI |
- Content Type |
- User Agent |
- Referrer |
- Length |
+ Timestamp |
+ {% if settings.NETWORK_PROC_MAP %}Process | {% endif %}
+ Source IP |
+ Source Port |
+ Destination IP |
+ Destination Port |
+ Method |
+ Status |
+ Hostname |
+ URI |
+ Content Type |
+ User Agent |
+ Referrer |
+ Length |
{% for http in suricata.http %}
| {{http.timestamp}} |
+ {% if settings.NETWORK_PROC_MAP %}{% if http.process_name %}{{http.process_name}} ({{http.process_id}}){% else %}-{% endif %} | {% endif %}
{{http.srcip}}
[VT]
{% if config.display_et_portal %}
|