diff --git a/admin/admin.py b/admin/admin.py index 61209688b84..905a1bb0891 100644 --- a/admin/admin.py +++ b/admin/admin.py @@ -30,7 +30,6 @@ from lib.cuckoo.common.admin_utils import ( CAPE_PATH, - POSTPROCESS, AutoAddPolicy, bulk_deploy, compare_hashed_files, @@ -61,6 +60,7 @@ JUMPBOX_USED = False jumpbox = False +RETRY = 3 logging.getLogger("paramiko").setLevel(logging.WARNING) logging.getLogger("paramiko.transport").setLevel(logging.WARNING) @@ -226,6 +226,13 @@ required=False, default=False, ) + compare_opt.add_argument( + "--remove-ssh-keys", + help="Remove servers ssh key from known keys on localhost", + action="store_true", + required=False, + default=False, + ) args = parser.parse_args() @@ -235,12 +242,15 @@ logging.getLogger("paramiko.transport").setLevel(logging.DEBUG) if args.username: + from lib.cuckoo.common import admin_utils + admin_utils.JUMP_BOX_USERNAME = args.username JUMP_BOX_USERNAME = args.username - # if args.debug: - # log.setLevel(logging.DEBUG) + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) if args.jump_box_second and not args.dry_run: + ssh.connect( JUMP_BOX_SECOND, username=JUMP_BOX_SECOND_USERNAME, @@ -286,7 +296,12 @@ print(parameters) sys.exit(0) queue.put([servers, file] + list(parameters)) - _ = deploy_file(queue, jumpbox) + for i in range(RETRY): + try: + _ = deploy_file(queue, jumpbox) + break + except Exception as eee: + print(f"Error {eee}, retry {i + 1}/{RETRY}") elif args.delete_file: queue = Queue() @@ -342,7 +357,7 @@ sys.exit() elif args.enum_all_servers: - enumerate_files_on_all_servers() + enumerate_files_on_all_servers(servers, jumpbox, "/opt/CAPEv2", args.filename) elif args.generate_files_listing and not args.enum_all_servers: gen_hashfile(args.generate_files_listing, args.filename) elif args.check_files_difference: @@ -355,8 +370,12 @@ bulk_deploy(files, args.yara_category, args.dry_run, servers, jumpbox) - if args.restart_service and POSTPROCESS: - execute_command_on_all(POSTPROCESS, servers, jumpbox) + if args.restart_service: + execute_command_on_all("systemctl restart cape-processor; systemctl status cape-processor", servers, jumpbox) if args.restart_uwsgi: execute_command_on_all("touch /tmp/capeuwsgireload", servers, jumpbox) + + if args.remove_ssh_keys: + for node in SERVERS_STATIC_LIST: + subprocess.run(["ssh-keygen", "-R", node]) diff --git a/analyzer/linux/analyzer.py b/analyzer/linux/analyzer.py index 1bc335de7c4..ca5beca1f2b 100644 --- a/analyzer/linux/analyzer.py +++ b/analyzer/linux/analyzer.py @@ -8,6 +8,7 @@ import os import pkgutil import re +import subprocess import sys import tempfile import time @@ -100,33 +101,29 @@ def dump_memory(pid): if pid in DUMPED_LIST: return # Skip if already dumped try: - maps_file = open(f"/proc/{pid}/maps", "r") - mem_file = open(f"/proc/{pid}/mem", "rb", 0) - output_file = open(f"{MEM_PATH}/{pid}.dmp", "wb") - - for line in maps_file.readlines(): - # Reference: https://man7.org/linux/man-pages/man5/proc_pid_maps.5.html - m = re.match(r"^([0-9a-f]+)-([0-9a-f]+) ([-rwxsp]{4}) ([0-9a-f]+) (\d\d:\d\d) (\d+) *(.*)$", line) - if not m: - log.error("Could not parse memory map line for pid %s: %s", pid, line) - continue - perms = m.group(3) - pathname = m.group(7) - if "r" in perms: - # Testing: Uncomment to skip memory regions associated with dynamic libraries - # if pathname and (pathname.endswith('.so') or 'lib' in pathname or '[' in pathname): - # continue - start = int(m.group(1), 16) - end = int(m.group(2), 16) - try: - mem_file.seek(start) - chunk = mem_file.read(end - start) - output_file.write(chunk) - except (OSError, ValueError) as e: - log.error("Could not read memory range %x-%x (%s) (%s): %s", start, end, perms, pathname, e) - maps_file.close() - mem_file.close() - output_file.close() + with open(f"/proc/{pid}/maps", "r") as maps_file, open(f"/proc/{pid}/mem", "rb", 0) as mem_file, open( + f"{MEM_PATH}/{pid}.dmp", "wb" + ) as output_file: + for line in maps_file: + # Reference: https://man7.org/linux/man-pages/man5/proc_pid_maps.5.html + m = re.match(r"^([0-9a-f]+)-([0-9a-f]+) ([-rwxsp]{4}) ([0-9a-f]+) (\d\d:\d\d) (\d+) *(.*)$", line) + if not m: + log.error("Could not parse memory map line for pid %s: %s", pid, line) + continue + perms = m.group(3) + pathname = m.group(7) + if "r" in perms: + # Testing: Uncomment to skip memory regions associated with dynamic libraries + # if pathname and (pathname.endswith('.so') or 'lib' in pathname or '[' in pathname): + # continue + start = int(m.group(1), 16) + end = int(m.group(2), 16) + try: + mem_file.seek(start) + chunk = mem_file.read(end - start) + output_file.write(chunk) + except (OSError, ValueError) as e: + log.error("Could not read memory range %x-%x (%s) (%s): %s", start, end, perms, pathname, e) except FileNotFoundError: log.error("Process with PID %s not found.", str(pid)) except PermissionError: @@ -166,7 +163,7 @@ def prepare(self): # Set virtual machine clock. clock = datetime.datetime.strptime(self.config.clock, "%Y%m%dT%H:%M:%S") # Setting date and time. - os.system(f'date -s "{clock.strftime("%y-%m-%d %H:%M:%S")}"') + subprocess.run(["date", "-s", clock.strftime("%y-%m-%d %H:%M:%S")], check=True) # We update the target according to its category. If it's a file, then # we store the path. diff --git a/analyzer/linux/lib/api/process.py b/analyzer/linux/lib/api/process.py index 6fc922bc504..61edc6d0fab 100644 --- a/analyzer/linux/lib/api/process.py +++ b/analyzer/linux/lib/api/process.py @@ -39,7 +39,11 @@ def get_proc_status(self): try: with open(f"/proc/{self.pid}/status") as f: status = f.readlines() - status_values = dict([tuple(map(str.strip, j.split(':',1))) for j in status]) + status_values = {} + for line in status: + if ":" in line: + key, value = line.split(":", 1) + status_values[key.strip()] = value.strip() return status_values except Exception: log.critical("Could not get process status for pid %s", self.pid) diff --git a/analyzer/linux/lib/core/packages.py b/analyzer/linux/lib/core/packages.py index 695719491a4..c955a95ff29 100644 --- a/analyzer/linux/lib/core/packages.py +++ b/analyzer/linux/lib/core/packages.py @@ -4,6 +4,7 @@ # of the MIT license. See the LICENSE file for details. import inspect +import importlib import logging import shutil import subprocess @@ -36,7 +37,7 @@ def choose_package_class(file_type=None, file_name="", suggestion=None): sys.path.append(path.abspath(path.join(path.dirname(__file__), "..", ".."))) # Since we don't know the package class yet, we'll just import everything # from this module and then try to figure out the required member class - module = __import__(full_name, globals(), locals(), ["*"]) + module = importlib.import_module(full_name) except ImportError: raise Exception(f'Unable to import package "{name}": it does not exist') try: diff --git a/conf/default/web.conf.default b/conf/default/web.conf.default index 0966b8f943f..f5f29af8595 100644 --- a/conf/default/web.conf.default +++ b/conf/default/web.conf.default @@ -70,6 +70,7 @@ reprocess_failed_processing = no url_splitter = , # Limit number of files extracted from archive in demux.py demux_files_limit = 10 +public_searches = yes # ratelimit for anon users [ratelimit] @@ -123,6 +124,8 @@ package = edge # TLP markings on submission and webgui [tlp] enabled = no +# Should TLP: RED tasks be searchable by other users? +public_red = yes #AMSI dump submission checkbox: can be useful to disable if no Win10+ instances #(amsidump is enabled by default in the monitor for Win10+) diff --git a/dev_utils/mongo_hooks.py b/dev_utils/mongo_hooks.py index 4f280ab820d..5dd8faede50 100644 --- a/dev_utils/mongo_hooks.py +++ b/dev_utils/mongo_hooks.py @@ -182,7 +182,7 @@ def remove_task_references_from_files(task_ids): """ mongo_update_many( FILES_COLL, - {TASK_IDS_KEY: {"$elemMatch": {"$in": task_ids}}}, + {TASK_IDS_KEY: {"$in": task_ids}}, {"$pullAll": {TASK_IDS_KEY: task_ids}}, ) @@ -210,7 +210,8 @@ def delete_unused_file_docs(): referenced by any analysis tasks. This should typically be invoked via utils/cleaners.py in a cron job. """ - return mongo_delete_many(FILES_COLL, {TASK_IDS_KEY: {"$size": 0}}) + # Using exact empty array match is much faster than $size: 0 + return mongo_delete_many(FILES_COLL, {TASK_IDS_KEY: []}) NORMALIZED_FILE_FIELDS = ("target.file", "dropped", "CAPE.payloads", "procdump", "procmemory") diff --git a/dev_utils/mongodb.py b/dev_utils/mongodb.py index cb32967087e..e156dc6ef4d 100644 --- a/dev_utils/mongodb.py +++ b/dev_utils/mongodb.py @@ -1,11 +1,8 @@ import collections import functools -import itertools import logging import time -from typing import Any, Callable, Sequence - -from bson import ObjectId +from typing import Callable, Sequence from lib.cuckoo.common.config import Config @@ -161,11 +158,11 @@ def mongo_update_many(collection: str, query, update): @graceful_auto_reconnect -def mongo_update_one(collection: str, query, projection, bypass_document_validation: bool = False): - if query.get("$set", None): - for hook in hooks[mongo_find_one][collection]: - query["$set"] = hook(query["$set"]) - return getattr(results_db, collection).update_one(query, projection, bypass_document_validation=bypass_document_validation) +def mongo_update_one(collection: str, query, update, bypass_document_validation: bool = False): + if isinstance(update, dict) and update.get("$set"): + for hook in hooks[mongo_update_one][collection]: + update["$set"] = hook(update["$set"]) + return getattr(results_db, collection).update_one(query, update, bypass_document_validation=bypass_document_validation) @graceful_auto_reconnect @@ -224,43 +221,14 @@ def mongo_delete_data_range(*, range_start: int = 0, range_end: int = 0) -> None def mongo_delete_calls(task_ids: Sequence[int] | None) -> None: - """Delete calls by primary key. - - This obtains the call IDs from the analysis collection, which are then used - to delete calls in batches.""" - log.info("attempting to delete calls for %d tasks", len(task_ids)) - - query = {"info.id": {"$in": task_ids}} - projection = {"behavior.processes.calls": 1} - tasks: list[dict[str, Any]] = mongo_find("analysis", query, projection) - - if not tasks: - return - - delete_target_ids: list[ObjectId] = [] - - def get_call_ids_from_task(task: dict[str, Any]) -> list[ObjectId]: - """Get the call IDs from an analysis document.""" - processes = task.get("behavior", {}).get("processes", []) - calls = [proc.get("calls", []) for proc in processes] - return list(itertools.chain.from_iterable(calls)) - - for task in tasks: - delete_target_ids.extend(get_call_ids_from_task(task)) - - delete_target_ids = list(set(delete_target_ids)) - chunk_size = 1000 - for idx in range(0, len(delete_target_ids), chunk_size): - mongo_delete_many("calls", {"_id": {"$in": delete_target_ids[idx : idx + chunk_size]}}) - - -def mongo_delete_calls_by_task_id(task_ids: Sequence[int]) -> None: """Delete calls by querying the calls collection by the task_id field. Note, the task_id field was added to the calls collection in 9999881. - Objects added to the collection prior to this will not be deleted. Use - mongo_delete_calls for backwards compatibility. + Objects added to the collection prior to this will be deleted. """ + if not task_ids: + return + log.info("attempting to delete calls for %d tasks", len(task_ids)) mongo_delete_many("calls", {"task_id": {"$in": task_ids}}) diff --git a/lib/cuckoo/common/admin_utils.py b/lib/cuckoo/common/admin_utils.py index 9808bf72e80..900d80856e7 100644 --- a/lib/cuckoo/common/admin_utils.py +++ b/lib/cuckoo/common/admin_utils.py @@ -2,6 +2,7 @@ import logging import os import re +import shlex # from glob import glob import shutil @@ -11,7 +12,7 @@ from pathlib import Path from queue import Queue from socket import if_nameindex -from threading import Thread +from threading import Lock, Thread import urllib3 @@ -34,7 +35,6 @@ ) from scp import SCPClient, SCPException - conf = SSHConfig() conf.parse(open(os.path.expanduser("~/.ssh/config"))) @@ -197,7 +197,7 @@ def compare_hashed_files(files: list, servers: list, ssh_proxy: SSHClient, priva def enumerate_files_on_all_servers(servers: list, ssh_proxy: SSHClient, dir_folder: str, filename: str): - cmd = f"python3 {CAPE_PATH}/admin/admin.py -gfl {dir_folder} -f /tmp/{filename} -s" + cmd = f"python3 {CAPE_PATH}/admin/admin.py -gfl {shlex.quote(dir_folder)} -f /tmp/{shlex.quote(filename)} -s" execute_command_on_all(cmd, servers, ssh_proxy) get_file(f"/tmp/{filename}.json", servers, ssh_proxy) @@ -308,43 +308,52 @@ def file_recon(file, yara_category="CAPE"): return False # build command to be executed remotely - REMOTE_COMMAND = f"chown {OWNER} {TARGET}; chmod 644 {TARGET};" + quoted_target = shlex.quote(TARGET) + REMOTE_COMMAND = f"chown {OWNER} {quoted_target}; chmod 644 {quoted_target};" if filename.endswith(".py") and TARGET: - REMOTE_COMMAND += "rm -f {0}.pyc; ls -la {0}.*".format(TARGET.replace(".py", "")) + REMOTE_COMMAND += "rm -f {0}.pyc; ls -la {0}.*".format(shlex.quote(TARGET.replace(".py", ""))) return TARGET, REMOTE_COMMAND, LOCAL_SHA256 # For session reuse sockets = {} +sockets_lock = Lock() def _connect_via_jump_box(server: str, ssh_proxy: SSHClient): session_checker() - host = conf.lookup(server) + try: + host = conf.lookup(server) + except NameError: + log.error("Missed dependencies/activation of venv") + return None + + with sockets_lock: + if server in sockets: + ssh = sockets[server] + if ssh.get_transport() and ssh.get_transport().is_active(): + return ssh + else: + del sockets[server] + try: """ This is SSH pivoting it ssh to host Y via host X, can be used due to different networks We doing direct-tcpip channel and pasing it as socket to be used """ if ssh_proxy and JUMP_BOX_USERNAME: - if server not in sockets: - ssh = SSHJumpClient(jump_session=ssh_proxy if ssh_proxy else None) - ssh.set_missing_host_key_policy(AutoAddPolicy()) - # ssh_port = 22 if ":" not in server else int(server.split(":")[1]) - ssh.connect( - server, - username=JUMP_BOX_USERNAME, - key_filename=host.get("identityfile"), - banner_timeout=200, - look_for_keys=False, - allow_agent=True, - # disabled_algorithms=dict(pubkeys=["rsa-sha2-512", "rsa-sha2-256"]), - ) - sockets[server] = ssh - else: - # ToDo check if alive and reconnect - ssh = sockets[server] - + ssh = SSHJumpClient(jump_session=ssh_proxy if ssh_proxy else None) + ssh.set_missing_host_key_policy(AutoAddPolicy()) + # ssh_port = 22 if ":" not in server else int(server.split(":")[1]) + ssh.connect( + server, + username=JUMP_BOX_USERNAME, + key_filename=host.get("identityfile"), + banner_timeout=200, + look_for_keys=False, + allow_agent=True, + # disabled_algorithms=dict(pubkeys=["rsa-sha2-512", "rsa-sha2-256"]), + ) else: ssh = SSHJumpClient() ssh.load_system_host_keys() @@ -357,35 +366,53 @@ def _connect_via_jump_box(server: str, ssh_proxy: SSHClient): banner_timeout=200, look_for_keys=False, allow_agent=True, - sock=ProxyCommand(host.get("proxycommand")), + sock=ProxyCommand(host.get("proxycommand")) if host.get("proxycommand") else None, ) + + with sockets_lock: + sockets[server] = ssh + except (BadHostKeyException, AuthenticationException, PasswordRequiredException) as e: - sys.exit( - f"Connect error: {str(e)}. Also pay attention to this log for more details /var/log/auth.log and paramiko might need update.\nAlso ensure that you have added your public ssh key to /root/.ssh/authorized_keys" + log.error( + "Connect error to %s: %s. Also pay attention to this log for more details /var/log/auth.log and paramiko might need update.\nAlso ensure that you have added your public ssh key to /root/.ssh/authorized_keys", server, str(e) ) + return None except ProxyCommandFailure as e: - # Todo reconnect - log.error("Can't connect to server: %s", str(e)) + log.error("Can't connect to server %s: %s", server, str(e)) + return None + except Exception as e: + log.error("Unexpected error connecting to %s: %s", server, str(e)) + return None + return ssh def execute_command_on_all(remote_command, servers: list, ssh_proxy: SSHClient): for server in servers: + srv = server.split(".")[1] if "." in server else server + log.info("[*] Connecting to %s...", server) try: ssh = _connect_via_jump_box(server, ssh_proxy) - _, ssh_stdout, _ = ssh.exec_command(remote_command) + if not ssh: + continue + + _, ssh_stdout, ssh_stderr = ssh.exec_command(remote_command, get_pty=True) ssh_out = ssh_stdout.read().decode("utf-8").strip() + ssh_err = ssh_stderr.read().decode("utf-8").strip() + if "Active: active (running)" in ssh_out and "systemctl status" not in remote_command: - log.info("[+] Service %s", green("restarted successfully and is UP")) - else: - srv = str(server.split(".")[1]) - if ssh_out: - log.info(green(f"[+] {srv} - {ssh_out}")) - else: - log.info(green(f"[+] {srv}")) - ssh.close() + log.info("[+] %s - Service %s", srv, green("restarted successfully and is UP")) + elif ssh_out: + log.info(green("[+] %s - %s", srv, ssh_out )) + + if ssh_err: + log.error(red("[-] %s ERROR - %s", srv, ssh_err)) + + if not ssh_out and not ssh_err: + log.info(green("[+] %s", srv)) + except TimeoutError as e: - sys.exit(f"Did you forget to use jump box? {str(e)}") + log.error("Timeout connecting to %s: %s", server, str(e)) except SSHException as e: log.error("Can't read remote bufffer: %s", str(e)) except Exception as e: @@ -431,8 +458,10 @@ def bulk_deploy(files, yara_category, dry_run=False, servers: list = [], ssh_pro def get_file(path, servers: list, ssh_proxy: SSHClient, yara_category: str = "CAPE", dry_run: bool = False): for server in servers: try: - print(server) + print(f"[*] Connecting to {server}...") ssh = _connect_via_jump_box(server, ssh_proxy) + if not ssh: + continue with SCPClient(ssh.get_transport()) as scp: try: scp.get(path, f"{server}_{os.path.basename(path)}") @@ -454,6 +483,8 @@ def deploy_file(queue, ssh_proxy: SSHClient): for server in servers: try: ssh = _connect_via_jump_box(server, ssh_proxy) + if not ssh: + continue with SCPClient(ssh.get_transport()) as scp: try: scp.put(local_file, remote_file) @@ -461,15 +492,24 @@ def deploy_file(queue, ssh_proxy: SSHClient): print(e) if remote_command: - _, ssh_stdout, _ = ssh.exec_command(remote_command) + _, ssh_stdout, ssh_stderr = ssh.exec_command(remote_command, get_pty=True) - ssh_out = ssh_stdout.read().decode("utf-8") - log.info(ssh_out) + ssh_out = ssh_stdout.read().decode("utf-8").strip() + ssh_err = ssh_stderr.read().decode("utf-8").strip() + if ssh_out: + log.info(ssh_out) + if ssh_err: + log.error(red("ERROR: %s", ssh_err)) - _, ssh_stdout, _ = ssh.exec_command(f"sha256sum {remote_file} | cut -d' ' -f1") + _, ssh_stdout, ssh_stderr = ssh.exec_command(f"sha256sum {shlex.quote(remote_file)} | cut -d' ' -f1", get_pty=True) remote_sha256 = ssh_stdout.read().strip().decode("utf-8") + remote_sha256_err = ssh_stderr.read().strip().decode("utf-8") + if remote_sha256_err: + log.error(red("sha256sum error: %s", remote_sha256_err)) + + srv = server.split(".")[1] if "." in server else server if local_sha256 == remote_sha256: - log.info("[+] %s - Hashes are %s: %s - %s", server.split(".")[1], green("correct"), local_sha256, remote_file) + log.info("[+] %s - Hashes are %s: %s - %s", srv, green("correct"), local_sha256, remote_file) else: log.info( "[-] %s - Hashes are %s: \n\tLocal: %s\n\tRemote: %s - %s", @@ -481,14 +521,13 @@ def deploy_file(queue, ssh_proxy: SSHClient): ) error = 1 error_list.append(remote_file) - ssh.close() except TimeoutError as e: log.error(e) if not error: - log.info(green(f"Completed! {remote_file}\n")) + log.info(green("Completed! %s\n", remote_file)) else: - log.info(red(f"Completed with errors. {remote_file}\n")) + log.info(red("Completed with errors. %s\n", remote_file)) queue.task_done() return error_list @@ -504,11 +543,15 @@ def delete_file(queue, ssh_proxy: SSHClient): for server in servers: try: ssh = _connect_via_jump_box(server, ssh_proxy) - _, ssh_stdout, _ = ssh.exec_command(f"rm {remote_file}") - ssh_out = ssh_stdout.read().decode("utf-8") + if not ssh: + continue + _, ssh_stdout, ssh_stderr = ssh.exec_command(f"rm {shlex.quote(remote_file)}", get_pty=True) + ssh_out = ssh_stdout.read().decode("utf-8").strip() + ssh_err = ssh_stderr.read().decode("utf-8").strip() if ssh_out: log.info(ssh_out) - ssh.close() + if ssh_err: + log.error(red("ERROR: %s", ssh_err)) except TimeoutError as e: log.error(e) error = 1 diff --git a/lib/cuckoo/common/cleaners_utils.py b/lib/cuckoo/common/cleaners_utils.py index ee36657f3f3..74bfd20267e 100644 --- a/lib/cuckoo/common/cleaners_utils.py +++ b/lib/cuckoo/common/cleaners_utils.py @@ -747,7 +747,7 @@ def cleanup_files_collection_by_id(task_id: int): { bytesFreed: Long('107198922752'), ok: 1 } """ - mongo_update_many({}, {"$pull": {"_task_ids": {"$lt": task_id}}}) + mongo_update_many({"_task_ids": {"$lt": task_id}}, {"$pull": {"_task_ids": {"$lt": task_id}}}) def execute_cleanup(args: dict, init_log=True): diff --git a/lib/cuckoo/common/network_utils.py b/lib/cuckoo/common/network_utils.py index 9a14d77ec72..03e1d1f45c8 100644 --- a/lib/cuckoo/common/network_utils.py +++ b/lib/cuckoo/common/network_utils.py @@ -548,10 +548,11 @@ def winhttp_finalize_sessions(state): sessions_by_domain_keys[dom].add(key) if sessions_by_domain: + sessions_list = [{"host": dom, "events": evts} for dom, evts in sessions_by_domain.items()] out.append({ "process_id": p.get("process_id"), "process_name": p.get("process_name", ""), - "sessions": sessions_by_domain, + "sessions": sessions_list, }) return out diff --git a/lib/cuckoo/common/web_utils.py b/lib/cuckoo/common/web_utils.py index b5faa3d1e48..e577068bea9 100644 --- a/lib/cuckoo/common/web_utils.py +++ b/lib/cuckoo/common/web_utils.py @@ -1310,6 +1310,30 @@ def validate_task_by_path(tid): ) +def _build_es_user_filter(privs: bool, user_id: int): + user_filter = None + if not privs: + if force_bool(web_cfg.general.get("public_searches", True)): + if not force_bool(web_cfg.tlp.get("public_red", False)): + shoulds = [{"bool": {"must_not": [{"terms": {"info.tlp": ["red", "Red", "RED"]}}]}}] + if user_id: + shoulds.append({"term": {"info.user_id": user_id}}) + else: + shoulds.append({"bool": {"must_not": {"exists": {"field": "info.user_id"}}}}) + user_filter = { + "bool": { + "should": shoulds, + "minimum_should_match": 1 + } + } + else: + if user_id: + user_filter = {"term": {"info.user_id": user_id}} + else: + user_filter = {"bool": {"must_not": {"exists": {"field": "info.user_id"}}}} + return user_filter + + def perform_search( term: str, value: str, search_limit: int = 0, user_id: int = 0, privs: bool = False, web: bool = True, projection: dict = None ): @@ -1330,6 +1354,10 @@ def perform_search( """ if repconf.mongodb.enabled and repconf.elasticsearchdb.enabled and essearch and not term: multi_match_search = {"query": {"multi_match": {"query": value, "fields": ["*"]}}} + if not privs: + user_filter = _build_es_user_filter(privs, user_id) + if user_filter: + multi_match_search = {"query": {"bool": {"must": [{"multi_match": {"query": value, "fields": ["*"]}}], "filter": [user_filter]}}} numhits = es.search(index=get_analysis_index(), body=multi_match_search, size=0)["hits"]["total"] return [ d["_source"] @@ -1423,9 +1451,18 @@ def perform_search( {"$unwind": "$task_doc"}, # Stage 8: Make the task doc the new root {"$replaceRoot": {"newRoot": "$task_doc"}}, - # Stage 9: Add your custom projection - {"$project": projection or perform_search_filters}, ] + + if not privs: + if force_bool(web_cfg.general.get("public_searches", True)): + if not force_bool(web_cfg.tlp.get("public_red", False)): + pipeline.append({"$match": {"$or": [{"info.tlp": {"$nin": ["red", "Red", "RED"]}}, {"info.user_id": user_id}]}}) + else: + pipeline.append({"$match": {"info.user_id": user_id}}) + + # Stage 9: Add your custom projection + pipeline.append({"$project": projection or perform_search_filters}) + retval = list(mongo_aggregate(FILES_COLL, pipeline)) if not retval: return [] @@ -1446,6 +1483,19 @@ def perform_search( projection[f"target.file.{FILE_REF_KEY}"] = 1 if term in search_term_map_repetetive_blocks: mongo_search_query = {"$or": [{path: condition} for path, condition in mongo_search_query.items()]} + + if not privs: + if force_bool(web_cfg.general.get("public_searches", True)): + if not force_bool(web_cfg.tlp.get("public_red", False)): + mongo_search_query = { + "$and": [ + mongo_search_query, + {"$or": [{"info.tlp": {"$nin": ["red", "Red", "RED"]}}, {"info.user_id": user_id}]} + ] + } + else: + mongo_search_query["info.user_id"] = user_id + retval = list(mongo_find("analysis", mongo_search_query, projection, limit=search_limit)) for doc in retval: @@ -1455,13 +1505,20 @@ def perform_search( return retval if es_as_db: - _source_fields = list(perform_search_filters.keys())[:-1] + _source_fields = list((projection or perform_search_filters).keys())[:-1] + + user_filter = _build_es_user_filter(privs, user_id) + if isinstance(search_term_map[term], str): q = {"query": {"match": {search_term_map[term]: value}}} + if user_filter: + q = {"query": {"bool": {"must": [q["query"]], "filter": [user_filter]}}} return [d["_source"] for d in es.search(index=get_analysis_index(), body=q, _source=_source_fields)["hits"]["hits"]] else: queries = [{"match": {search_term: value}} for search_term in search_term_map[term]] q = {"query": {"bool": {"should": queries, "minimum_should_match": 1}}} + if user_filter: + q["query"]["bool"]["filter"] = [user_filter] return [d["_source"] for d in es.search(index=get_analysis_index(), body=q, _source=_source_fields)["hits"]["hits"]] diff --git a/modules/processing/CAPE.py b/modules/processing/CAPE.py index 5a1455f791e..1ae1ba8b302 100644 --- a/modules/processing/CAPE.py +++ b/modules/processing/CAPE.py @@ -256,6 +256,14 @@ def process_file(self, file_path, append_file, metadata: dict, *, category: str, file_info["options_hash"] = options_hash + # GravityRAT is infector so it will produce a lot of files. we don't need them + if category == "dropped" and any("GravityRAT" in i.get("name", "") for i in file_info.get("cape_yara", [])): + # delete file and continue + log.info("GravityRAT detected, removing file: %s", file_path) + with suppress(OSError): + os.remove(file_path) + return + if category in ("static", "file"): file_info["name"] = Path(self.task["target"]).name @@ -291,6 +299,10 @@ def process_file(self, file_path, append_file, metadata: dict, *, category: str, "category": category, "file": file_info, } + + if not os.path.exists(self.task["target"]): + log.error("Target file doesn't exist anymore. That will prevent data to be shown on webgui") + elif processing_conf.CAPE.dropped and category in ("dropped", "package"): if category == "dropped": file_info.update(metadata.get(file_info["path"][0], {})) @@ -440,6 +452,28 @@ def run(self): self.process_file( self.file_path, False, meta.get(self.file_path, {}), category=self.task["category"], duplicated=duplicated ) + if "target" not in self.results: + target_restored = False + try: + db_analysis = mongo_find_one("analysis", {"info.id": int(self.task["id"])}, {"target": 1, "_id": 0}) + if db_analysis and "target" in db_analysis: + self.results["target"] = db_analysis["target"] + target_restored = True + log.info("Restored missing target info from MongoDB analysis collection") + except Exception as e: + log.error("Failed to restore target info from MongoDB: %s", e) + + if not target_restored: + json_path = os.environ.get("CAPE_REPORT") or os.path.join(self.reports_path, "report.json") + if path_exists(json_path): + try: + with open(json_path, "r", encoding="utf-8") as f: + report_data = json.load(f) + if "target" in report_data: + self.results["target"] = report_data["target"] + log.info("Restored missing target info from existing report.json") + except Exception as e: + log.error("Failed to restore target info from existing report: %s", e) for folder in ("CAPE_path", "procdump_path", "dropped_path", "package_files"): category = folder.replace("_path", "").replace("_files", "") diff --git a/modules/processing/behavior.py b/modules/processing/behavior.py index d306293bda5..e04558c5d0b 100644 --- a/modules/processing/behavior.py +++ b/modules/processing/behavior.py @@ -1350,14 +1350,15 @@ def run(self): # BSON/JSON keys must be strings. # Let's convert tuple keys to string representation "ip:port" - endpoint_map_str = {} - for (ip, port), entries in self.endpoint_map.items(): - endpoint_map_str[f"{ip}:{port}"] = entries + endpoint_map_list = [{"ip_port": f"{ip}:{port}", "pinfo": entries} for (ip, port), entries in self.endpoint_map.items()] + + http_host_map_list = [{"host": k, "pinfo": v} for k, v in self.http_host_map.items()] + dns_intents_list = [{"domain": k, "intents": v} for k, v in self.dns_intents.items()] return { - "endpoint_map": endpoint_map_str, - "http_host_map": self.http_host_map, - "dns_intents": self.dns_intents, + "endpoint_map": endpoint_map_list, + "http_host_map": http_host_map_list, + "dns_intents": dns_intents_list, "http_requests": self.http_requests, "winhttp_sessions": winhttp_finalize_sessions(self._winhttp_state), } @@ -1468,6 +1469,9 @@ def run(self): instance.event_apicall(call, process) except Exception: log.exception('Failure in partial behavior "%s"', instance.key) + # Reset the iterator so reporting modules can read the calls again + with suppress(AttributeError): + process["calls"].reset() for instance in instances: try: diff --git a/modules/processing/network.py b/modules/processing/network.py index 891b98749ed..bb0b814a32a 100644 --- a/modules/processing/network.py +++ b/modules/processing/network.py @@ -46,8 +46,6 @@ log = logging.getLogger(__name__) - - try: import re2 as re except ImportError: @@ -1116,21 +1114,55 @@ def _import_ja3_fprints(self): def _load_network_map(self) -> Dict: with suppress(Exception): - return self.results.get("behavior", {}).get("network_map") or {} + behavior_net_map = self.results.get("behavior", {}).get("network_map") or {} + if not behavior_net_map: + return {} + + # Create a separate dictionary to avoid modifying self.results in place + net_map = behavior_net_map.copy() + + raw_http_host_map = net_map.get("http_host_map", {}) + if isinstance(raw_http_host_map, list): + net_map["http_host_map"] = {item["host"]: item["pinfo"] for item in raw_http_host_map} + + raw_dns_intents = net_map.get("dns_intents", {}) + if isinstance(raw_dns_intents, list): + net_map["dns_intents"] = {item["domain"]: item["intents"] for item in raw_dns_intents} + + # We need to deep copy winhttp_sessions if we are modifying its internal dicts + raw_winhttp = net_map.get("winhttp_sessions", []) + new_winhttp = [] + for p in raw_winhttp: + new_p = dict(p) + raw_sessions = p.get("sessions", {}) + if isinstance(raw_sessions, list): + new_p["sessions"] = {item["host"]: item["events"] for item in raw_sessions} + new_winhttp.append(new_p) + net_map["winhttp_sessions"] = new_winhttp + + return net_map return {} - def _reconstruct_endpoint_map(self, raw_map: Dict[str, List[Dict]]) -> Dict[tuple, List[Dict]]: + def _reconstruct_endpoint_map(self, raw_map) -> Dict[tuple, List[Dict]]: """ Convert JSON-friendly "ip:port" keys back to (ip, int(port)) tuples. """ endpoint_map = {} - for key, val in raw_map.items(): - try: - ip, port_str = key.rsplit(":", 1) - port = int(port_str) - endpoint_map[(ip, port)] = val - except (ValueError, IndexError): - continue + if isinstance(raw_map, list): + for item in raw_map: + try: + ip, port_str = item["ip_port"].rsplit(":", 1) + endpoint_map[(ip, int(port_str))] = item["pinfo"] + except (ValueError, IndexError, KeyError): + continue + elif isinstance(raw_map, dict): + for key, val in raw_map.items(): + try: + ip, port_str = key.rsplit(":", 1) + port = int(port_str) + endpoint_map[(ip, port)] = val + except (ValueError, IndexError): + continue return endpoint_map def _pick_best(self, candidates: List[Dict]) -> Optional[Dict]: diff --git a/modules/reporting/mongodb.py b/modules/reporting/mongodb.py index dfe39c1a299..b85f522d843 100644 --- a/modules/reporting/mongodb.py +++ b/modules/reporting/mongodb.py @@ -4,10 +4,12 @@ import gc import logging - +from contextlib import suppress +from lib.cuckoo.common.iocs import dump_iocs from lib.cuckoo.common.abstracts import Report from lib.cuckoo.common.exceptions import CuckooDependencyError, CuckooReportError from modules.reporting.report_doc import ensure_valid_utf8, get_json_document, insert_calls +from lib.cuckoo.common.config import Config try: from pymongo.errors import InvalidDocument, OperationFailure @@ -22,6 +24,7 @@ MEGABYTE = 0x100000 log = logging.getLogger(__name__) +reporting_conf = Config("reporting") class MongoDB(Report): @@ -76,14 +79,18 @@ def loop_saver(self, report): if "_id" in keys: keys.remove("_id") + # We insert the info section first to get an _id obj_id = mongo_insert_one("analysis", {"info": report["info"]}).inserted_id keys.remove("info") for key in keys: try: - mongo_update_one("analysis", {"_id": obj_id}, {"$set": {key: report[key]}}, bypass_document_validation=True) + # We include info here so that mongo hooks (like normalize_files) can get the task_id + mongo_update_one("analysis", {"_id": obj_id}, {"$set": {key: report[key], "info": report["info"]}}, bypass_document_validation=True) except InvalidDocument: log.warning("Investigate your key: %s", key) + except Exception as e: + log.error("Failed to update key %s in loop_saver: %s", key, e) def run(self, results): """Writes report. @@ -108,28 +115,49 @@ def run(self, results): # the original dictionary and possibly compromise the following # reporting modules. report = get_json_document(results, self.analysis_path) + if not report or "info" not in report: + log.error("Failed to get JSON document or 'info' key is missing for Task") + return - mongo_delete_data(int(report["info"]["id"])) - log.debug("Deleted previous MongoDB data for Task %s", report["info"]["id"]) + local_task_id = int(report["info"].get("id", 0)) + if not local_task_id: + log.error("Task ID is missing in report['info']") + return # trick for distributed api - if results.get("info", {}).get("options", {}).get("main_task_id", ""): - report["info"]["id"] = int(results["info"]["options"]["main_task_id"]) + main_task_id = results.get("info", {}).get("options", {}).get("main_task_id") + if main_task_id: + with suppress(ValueError, TypeError): + report["info"]["id"] = int(main_task_id) if "network" not in report: report["network"] = {} + if "behavior" not in report or not isinstance(report["behavior"], dict): + report["behavior"] = {"processes": [], "processtree": [], "summary": {}} + + # Delete old data just before inserting new one to avoid "missing report" window + # or data loss if insertion fails during preparation (e.g. OOM) + ids_to_delete = {local_task_id, int(report["info"]["id"])} + log.debug("Deleting previous MongoDB data for Task IDs: %s", ids_to_delete) + mongo_delete_data(list(ids_to_delete)) + new_processes = insert_calls(report, mongodb=True) # Store the results in the report. - report["behavior"] = dict(report["behavior"]) report["behavior"]["processes"] = new_processes + # Store iocs as file + if reporting_conf.mongodb.dump_iocs: + dump_iocs(report, local_task_id) + ensure_valid_utf8(report) gc.collect() # Store the report and retrieve its object id. try: + log.debug("Inserting new MongoDB report for Task %s", report["info"]["id"]) mongo_insert_one("analysis", report) + except OperationFailure as e: # Check for error codes indicating the BSON object was too large # (10334 BSONObjectTooLarge) or the maximum nested object depth was @@ -145,8 +173,6 @@ def run(self, results): log.error("Deleting behavior process tree parent from results: %s", str(e)) del report["behavior"]["processtree"][0] mongo_insert_one("analysis", report) - else: - raise CuckooReportError("Failed inserting report in Mongo") from e except InvalidDocument as e: if str(e).startswith("cannot encode object") or "must not contain" in str(e): self.loop_saver(report) @@ -193,3 +219,8 @@ def run(self, results): except Exception as e: log.error("Failed to delete child key: %s", e) error_saved = False + + if error_saved: + log.error("Failed to insert report into MongoDB even after attempting to fix large documents for Task %s", report["info"]["id"]) + except Exception as e: + log.exception("Failed to store report in MongoDB for Task %s: %s", report["info"]["id"], e) diff --git a/utils/dist.py b/utils/dist.py index bf2f36708fa..c71b9e373b6 100644 --- a/utils/dist.py +++ b/utils/dist.py @@ -53,6 +53,7 @@ ) from lib.cuckoo.core.database import ( Database, + Guest, _Database, init_database, ) @@ -863,6 +864,46 @@ def delete_target_file(self, task_id: int, sample_sha256: str, target: str): if not sample_still_used: path_delete(copy_path) + def inject_guest_info(self, main_task_id: int, report_path: str): + """ + Inject guest information from report.json into the main database. + + Args: + main_task_id (int): The ID of the main task. + report_path (str): The path to the analysis folder. + """ + report_json_path = os.path.join(report_path, "reports", "report.json") + if not path_exists(report_json_path): + return + + try: + with open(report_json_path, "r") as f: + report_data = json.load(f) + machine = report_data.get("info", {}).get("machine", {}) + if machine and isinstance(machine, dict): + with main_db.session.begin(): + # Check if guest already exists + stmt = select(Guest).where(Guest.task_id == main_task_id) + if not main_db.session.scalar(stmt): + guest = Guest( + name=machine.get("name"), + label=machine.get("label"), + platform=machine.get("platform"), + manager=machine.get("manager"), + task_id=main_task_id, + ) + # Set optional fields if they exist + if "started_on" in machine: + with suppress(Exception): + guest.started_on = datetime.strptime(machine["started_on"], "%Y-%m-%d %H:%M:%S") + if "shutdown_on" in machine: + with suppress(Exception): + guest.shutdown_on = datetime.strptime(machine["shutdown_on"], "%Y-%m-%d %H:%M:%S") + + main_db.session.add(guest) + except Exception as e: + log.error("Failed to inject guest info for task %d: %s", main_task_id, e) + # This should be executed as external thread as it generates bottle neck def fetch_latest_reports_nfs(self): """ @@ -961,6 +1002,8 @@ def fetch_latest_reports_nfs(self): t.main_task_id, ) + self.inject_guest_info(t.main_task_id, report_path) + # this doesn't exist for some reason if path_exists(t.path): sample_sha256 = None @@ -1141,6 +1184,7 @@ def fetch_latest_reports(self): with zipfile.ZipFile(BytesIO(report.content)) as zf: try: zf.extractall(report_path) + self.inject_guest_info(t.main_task_id, report_path) if (node_id, task.get("id")) not in self.cleaner_queue.queue: self.cleaner_queue.put((node_id, task.get("id"))) except OSError: diff --git a/utils/process.py b/utils/process.py index e41a82d5bd3..695c39888ad 100644 --- a/utils/process.py +++ b/utils/process.py @@ -195,32 +195,31 @@ def init_worker(): # See https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork db.engine.dispose(close=False) - # Fix for open file handles on rotated logs in workers - for h in log.handlers[:]: - if isinstance(h, logging.FileHandler): - h.close() - log.removeHandler(h) + # Avoid fork deadlock: use direct list ops instead of + # handler.close()/removeHandler()/addHandler() which acquire locks. + # Inherited FDs are intentionally leaked: closing them via os.close() + # frees the fd number, but the old Python stream still references it; + # when GC finalizes that stream it may close a new handler's fd. + # Workers are short-lived (max_tasks) so the leak is harmless. + log.handlers.clear() - # Restore Console Handler ch = ConsoleHandler() ch.setFormatter(FORMATTER) - log.addHandler(ch) + log.handlers.append(ch) - # Restore Syslog Handler if enabled if logconf.logger.syslog_process: try: slh = logging.handlers.SysLogHandler(address=logconf.logger.syslog_dev) slh.setFormatter(FORMATTER) - log.addHandler(slh) + log.handlers.append(slh) except Exception as e: log.warning("Failed to restore Syslog handler in worker: %s", e) - # Restore File Handler using WatchedFileHandler to support rotation try: path = os.path.join(CUCKOO_ROOT, "log", "process.log") fh = logging.handlers.WatchedFileHandler(path) fh.setFormatter(FORMATTER) - log.addHandler(fh) + log.handlers.append(fh) except PermissionError as e: log.warning("Failed to restore File handler in worker due to permissions: %s", e) diff --git a/uv.lock b/uv.lock index 7de39b95dad..77f521d020a 100644 --- a/uv.lock +++ b/uv.lock @@ -1460,7 +1460,7 @@ wheels = [ [[package]] name = "fastmcp" -version = "3.1.0" +version = "3.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "authlib" }, @@ -1485,9 +1485,9 @@ dependencies = [ { name = "watchfiles" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0a/70/862026c4589441f86ad3108f05bfb2f781c6b322ad60a982f40b303b47d7/fastmcp-3.1.0.tar.gz", hash = "sha256:e25264794c734b9977502a51466961eeecff92a0c2f3b49c40c070993628d6d0", size = 17347083, upload-time = "2026-03-03T02:43:11.283Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d0/32/4f1b2cfd7b50db89114949f90158b1dcc2c92a1917b9f57c0ff24e47a2f4/fastmcp-3.2.0.tar.gz", hash = "sha256:d4830b8ffc3592d3d9c76dc0f398904cf41f04910e41a0de38cc1004e0903bef", size = 26318581, upload-time = "2026-03-30T20:25:37.692Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/17/07/516f5b20d88932e5a466c2216b628e5358a71b3a9f522215607c3281de05/fastmcp-3.1.0-py3-none-any.whl", hash = "sha256:b1f73b56fd3b0cb2bd9e2a144fc650d5cc31587ed129d996db7710e464ae8010", size = 633749, upload-time = "2026-03-03T02:43:09.06Z" }, + { url = "https://files.pythonhosted.org/packages/4f/67/684fa2d2de1e7504549d4ca457b4f854ccec3cd3be03bd86b33b599fbf58/fastmcp-3.2.0-py3-none-any.whl", hash = "sha256:e71aba3df16f86f546a4a9e513261d3233bcc92bef0dfa647bac3fa33623f681", size = 705550, upload-time = "2026-03-30T20:25:35.499Z" }, ] [[package]] diff --git a/web/apiv2/views.py b/web/apiv2/views.py index 2f64bd8b16f..9f99197d31e 100644 --- a/web/apiv2/views.py +++ b/web/apiv2/views.py @@ -2602,6 +2602,10 @@ def _stream_iterator(fp, guest_name, chunk_size=1024): resp = {"error": True, "error_value": "Filepath mustn't start with /"} return Response(resp) filepath = os.path.join(CUCKOO_ROOT, "storage", "analyses", f"{task_id}", filepath) + task_dir = os.path.join(ANALYSIS_BASE_PATH, "analyses", f"{task_id}") + if not os.path.normpath(filepath).startswith(task_dir + os.sep): + resp = {"error": True, "error_value": "Path traversal detected"} + return Response(resp) if not os.path.isfile(filepath): resp = {"error": True, "error_value": "file does not exist"} return Response(resp) diff --git a/web/submission/views.py b/web/submission/views.py index 7a7b49e7c89..6fa22886e29 100644 --- a/web/submission/views.py +++ b/web/submission/views.py @@ -56,6 +56,12 @@ logger = logging.getLogger(__name__) +allowed_functions = { + "sorted": sorted, + "set": set, + "os.path.join": os.path.join, +} + def parse_expr(expr, context): """Return the value from a python AST expression. @@ -88,11 +94,10 @@ def parse_expr(expr, context): # Figure out what function is being called, with what arguments. func = parse_expr(expr.func, context) args = tuple([parse_expr(item, context) for item in expr.args]) - # We deem these functions safe to use with "eval". - allowed_functions = ("sorted", "set", "os.path.join") + if func in allowed_functions: # Actually call the function, passing the args, and return the result. - return eval(f"{func}{args}") + return allowed_functions[func](*args) # Don't execute the call, but instead, give back a string representation. return f"<{func}{args}>" if isinstance(expr, ast.BinOp) and isinstance(expr.op, ast.Add): diff --git a/web/templates/analysis/overview/_info.html b/web/templates/analysis/overview/_info.html index e96c2dc2719..9b4e4cf1299 100644 --- a/web/templates/analysis/overview/_info.html +++ b/web/templates/analysis/overview/_info.html @@ -196,7 +196,7 @@
{{analysis.info.route}}