diff --git a/.gitignore b/.gitignore index ddac56ef..71e2721f 100644 --- a/.gitignore +++ b/.gitignore @@ -327,6 +327,9 @@ cython_debug/ # Visual Studio Code .vscode/ +# Weight & Biases +wandb/ + # Project specific cache_dir demos diff --git a/README.md b/README.md index c78543ec..2b0ba968 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Moreover, AIOpsLab provides a built-in benchmark suite with a set of problems to ### Requirements - Python >= 3.11 +- [Helm](https://helm.sh/) Recommended installation: ```bash @@ -64,10 +65,23 @@ kind create cluster --config kind/kind-config-arm.yaml If you're running into issues, consider building a Docker image for your machine by following this [README](kind/README.md). Please also open an issue. +### [Tips] +If you are running AIOpsLab using a proxy, beware of exporting the HTTP proxy as `172.17.0.1`. When creating the kind cluster, all the nodes in the cluster will inherit the proxy setting from the host environment and the Docker container. + +The `172.17.0.1` address is used to communicate with the host machine. For more details, refer to the official guide: [Configure Kind to Use a Proxy](https://kind.sigs.k8s.io/docs/user/quick-start/#configure-kind-to-use-a-proxy). + +Additionally, Docker doesn't support SOCKS5 proxy directly. If you're using a SOCKS5 protocol to proxy, you may need to use [Privoxy](https://www.privoxy.org) to forward SOCKS5 to HTTP. + +If you're running VLLM and the LLM agent locally, Privoxy will by default proxy `localhost`, which will cause errors. To avoid this issue, you should set the following environment variable: + +```bash +export no_proxy=localhost +``` + After finishing cluster creation, proceed to the next "Update `config.yml`" step. ### b) Remote cluster -AIOpsLab supports any remote kubernetes cluster that your `kubectl` context is set to, whether it's a cluster from a cloud provider or one you build yourself. We have some Ansible playbooks we have to setup clusters on providers like [CloudLab](https://www.cloudlab.us/) and our own machines. Follow this [README](./scripts/ansible/README.md) to set up your own cluster, and then proceed to the next "Update `config.yml`" step. +AIOpsLab supports any remote kubernetes cluster that your `kubectl` context is set to, whether it's a cluster from a cloud provider or one you build yourself. We have some Ansible playbooks to setup clusters on providers like [CloudLab](https://www.cloudlab.us/) and our own machines. Follow this [README](./scripts/ansible/README.md) to set up your own cluster, and then proceed to the next "Update `config.yml`" step. ### Update `config.yml` ```bash @@ -76,7 +90,7 @@ cp config.yml.example config.yml ``` Update your `config.yml` so that `k8s_host` is the host name of the control plane node of your cluster. Update `k8s_user` to be your username on the control plane node. If you are using a kind cluster, your `k8s_host` should be `kind`. If you're running AIOpsLab on cluster, your `k8s_host` should be `localhost`. -### Running agents +### Running agents locally Human as the agent: ```bash @@ -89,12 +103,30 @@ python3 cli.py Run GPT-4 baseline agent: ```bash -export OPENAI_API_KEY= +# Create a .env file in the project root (if not exists) +echo "OPENAI_API_KEY=" > .env +# Add more API keys as needed: +# echo "QWEN_API_KEY=" >> .env +# echo "DEEPSEEK_API_KEY=" >> .env + python3 clients/gpt.py # you can also change the problem to solve in the main() function ``` +The clients will automatically load API keys from your .env file. + You can check the running status of the cluster using [k9s](https://k9scli.io/) or other cluster monitoring tools conveniently. +To browse your logged `session_id` values in the W&B app as a table: + +1. Make sure you have W&B installed and configured. +2. Set the USE_WANDB environment variable: + ```bash + # Add to your .env file + echo "USE_WANDB=true" >> .env + ``` +3. In the W&B web UI, open any run and click Tables → Add Query Panel. +4. In the key field, type `runs.summary` and click `Run`, then you will see the results displayed in a table format. +

⚙️ Usage

AIOpsLab can be used in the following ways: @@ -102,6 +134,52 @@ AIOpsLab can be used in the following ways: - [Add new applications to AIOpsLab](#how-to-add-new-applications-to-aiopslab) - [Add new problems to AIOpsLab](#how-to-add-new-problems-to-aiopslab) +### Running agents remotely +You can run AIOpsLab on a remote machine with larger computational resources. This section guides you through setting up and using AIOpsLab remotely. + +1. **On the remote machine, start the AIOpsLab service**: + + ```bash + SERVICE_HOST= SERVICE_PORT= SERVICE_WORKERS= python service.py + ``` +2. **Test the connection from your local machine**: + In your local machine, you can test the connection to the remote AIOpsLab service using `curl`: + + ```bash + # Check if the service is running + curl http://:/health + + # List available problems + curl http://:/problems + + # List available agents + curl http://:/agents + ``` + +3. **Run vLLM on the remote machine (if using vLLM agent):** + If you're using the vLLM agent, make sure to launch the vLLM server on the remote machine: + + ```bash + # On the remote machine + chmod +x ./clients/launch_vllm.sh + ./clients/launch_vllm.sh + ``` + You can customize the model by editing `launch_vllm.sh` before running it. + +4. **Run the agent**: + In your local machine, you can run the agent using the following command: + + ```bash + curl -X POST http://:/simulate \ + -H "Content-Type: application/json" \ + -d '{ + "problem_id": "misconfig_app_hotel_res-mitigation-1", + "agent_name": "vllm", + "max_steps": 10, + "temperature": 0.7, + "top_p": 0.9 + }' + ``` ### How to onboard your agent to AIOpsLab? diff --git a/aiopslab-applications b/aiopslab-applications index 5221ef96..231ccc32 160000 --- a/aiopslab-applications +++ b/aiopslab-applications @@ -1 +1 @@ -Subproject commit 5221ef962879546bb3c977c8a256ce117697b9f2 +Subproject commit 231ccc32d94b2e202cf11ba08be371d372c44b3d diff --git a/aiopslab/generators/fault/base.py b/aiopslab/generators/fault/base.py index 6122148c..c58dc7f3 100644 --- a/aiopslab/generators/fault/base.py +++ b/aiopslab/generators/fault/base.py @@ -59,7 +59,6 @@ def _recover( self._invoke_method("recover", fault_type, microservices) elif fault_type: self._invoke_method("recover", fault_type) - time.sleep(6) def _invoke_method(self, action_prefix, *args): """helper: injects/recovers faults based on name""" diff --git a/aiopslab/generators/fault/inject_otel.py b/aiopslab/generators/fault/inject_otel.py index 5b1bae40..2f1887b1 100644 --- a/aiopslab/generators/fault/inject_otel.py +++ b/aiopslab/generators/fault/inject_otel.py @@ -8,7 +8,7 @@ class OtelFaultInjector(FaultInjector): def __init__(self, namespace: str): self.namespace = namespace self.kubectl = KubeCtl() - self.configmap_name = f"{namespace}-flagd-config" + self.configmap_name = "flagd-config" def inject_fault(self, feature_flag: str): command = ( @@ -39,6 +39,11 @@ def inject_fault(self, feature_flag: str): self.kubectl.create_or_update_configmap( self.configmap_name, self.namespace, updated_data ) + + self.kubectl.exec_command( + f"kubectl rollout restart deployment flagd -n {self.namespace}" + ) + print(f"Fault injected: Feature flag '{feature_flag}' set to 'on'.") def recover_fault(self, feature_flag: str): @@ -70,6 +75,10 @@ def recover_fault(self, feature_flag: str): self.kubectl.create_or_update_configmap( self.configmap_name, self.namespace, updated_data ) + + self.kubectl.exec_command( + f"kubectl rollout restart deployment flagd -n {self.namespace}" + ) print(f"Fault recovered: Feature flag '{feature_flag}' set to 'off'.") diff --git a/aiopslab/generators/fault/inject_virtual.py b/aiopslab/generators/fault/inject_virtual.py index 8b6680d0..2ee4089d 100644 --- a/aiopslab/generators/fault/inject_virtual.py +++ b/aiopslab/generators/fault/inject_virtual.py @@ -8,6 +8,7 @@ from aiopslab.service.kubectl import KubeCtl from aiopslab.service.helm import Helm +from aiopslab.service.dock import Docker from aiopslab.generators.fault.base import FaultInjector from aiopslab.service.apps.base import Application from aiopslab.paths import TARGET_MICROSERVICES @@ -18,6 +19,7 @@ def __init__(self, namespace: str): super().__init__(namespace) self.namespace = namespace self.kubectl = KubeCtl() + self.docker = Docker() self.mongo_service_pod_map = { "url-shorten-mongodb": "url-shorten-service", } @@ -248,7 +250,35 @@ def recover_wrong_bin_usage(self, microservices: list[str]): self.kubectl.exec_command(apply_command) print(f"Recovered from wrong binary usage fault for service: {service}") - + + def inject_container_stop(self, microservices: list[str]): + """Inject a fault to stop a container.""" + for service in microservices: + self.docker.get_container(service).stop() + print(f"Stopped container {service}.") + + print("Waiting for faults to propagate...") + time.sleep(15) + print("Faults propagated.") + + def recover_container_stop(self, microservices: list[str]): + for service in microservices: + self.docker.get_container(service).start() + print(f"Started container {service}.") + + def inject_model_misconfig(self, microservices: list[str]): + """Inject a fault to misconfigure the model in the Flower application.""" + for service in microservices: + command = f""" docker exec -it {service} sh -c "sed -i '24s/84/80/' /app/.flwr/apps/*/task.py" """ + self.docker.exec_command(command) + print(f"Changed model configuration for service: {service}") + + def recover_model_misconfig(self, microservices: list[str]): + for service in microservices: + command = f""" docker exec -it {service} sh -c "sed -i '24s/80/84/' /app/.flwr/apps/*/task.py" """ + self.docker.exec_command(command) + print(f"Recovered model configuration for service: {service}") + ############# HELPER FUNCTIONS ################ def _wait_for_pods_ready(self, microservices: list[str], timeout: int = 30): for service in microservices: diff --git a/aiopslab/observer/__init__.py b/aiopslab/observer/__init__.py index 93267236..44a63f2e 100644 --- a/aiopslab/observer/__init__.py +++ b/aiopslab/observer/__init__.py @@ -12,7 +12,8 @@ root_path = pathlib.Path(__file__).parent sys.path.append(root_path) # read the configuration file -monitor_config = full_load(open(root_path / "monitor_config.yaml", "r")) +with open(root_path / "monitor_config.yaml", "r") as f: + monitor_config = full_load(f) # root_config = full_load(open(root_path / "config.yaml", "r")) diff --git a/aiopslab/observer/metric_api.py b/aiopslab/observer/metric_api.py index 76bb0abc..587ef55a 100644 --- a/aiopslab/observer/metric_api.py +++ b/aiopslab/observer/metric_api.py @@ -138,7 +138,8 @@ class PrometheusAPI: # disable_ssl – (bool) if True, will skip prometheus server's http requests' SSL certificate def __init__(self, url: str, namespace: str): self.namespace = namespace - self.port = 32000 + self.output_threads = [] + self.port = self.find_free_port() self.port_forward_process = None self.stop_event = threading.Event() self.start_port_forward() @@ -151,6 +152,13 @@ def __init__(self, url: str, namespace: str): def is_port_in_use(self, port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(("127.0.0.1", port)) == 0 + + def find_free_port(self, start=32000, end=32100): + for port in range(start, end): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("127.0.0.1", port)) != 0: + return port + raise RuntimeError("No free ports available in the range.") def print_output(self, stream): """Thread function to print output from a subprocess stream non-blockingly.""" @@ -197,6 +205,7 @@ def start_port_forward(self): ) thread_out.start() thread_err.start() + self.output_threads.extend([thread_out, thread_err]) time.sleep(3) # Wait a bit for the port-forward to establish @@ -209,13 +218,29 @@ def start_port_forward(self): print("Failed to establish port forwarding after multiple attempts.") def stop_port_forward(self): - """Stops the kubectl port-forward command.""" + """Stops the kubectl port-forward command and cleans up resources.""" if self.port_forward_process: self.port_forward_process.terminate() - self.port_forward_process.wait() + try: + self.port_forward_process.wait(timeout=5) + except subprocess.TimeoutExpired: + print("Port-forward process did not terminate in time, killing...") + self.port_forward_process.kill() + self.stop_event.set() + + if self.port_forward_process.stdout: + self.port_forward_process.stdout.close() + if self.port_forward_process.stderr: + self.port_forward_process.stderr.close() + print("Port forwarding stopped.") + for thread in self.output_threads: + thread.join(timeout=5) + if thread.is_alive(): + print(f"Warning: Thread {thread.name} did not terminate cleanly.") + def cleanup(self): """Cleanup resources like port-forwarding.""" self.stop_port_forward() diff --git a/aiopslab/observer/trace_api.py b/aiopslab/observer/trace_api.py index 77eabb05..e123f83e 100644 --- a/aiopslab/observer/trace_api.py +++ b/aiopslab/observer/trace_api.py @@ -21,14 +21,20 @@ def __init__(self, namespace: str): self.port_forward_process = None self.namespace = namespace self.stop_event = threading.Event() + self.output_threads = [] - # NOTE: it may not be jaeger-out for other apps - node_port = self.get_nodeport("jaeger", namespace) - if node_port: - self.base_url = f"http://localhost:{node_port}" - else: - self.base_url = "http://localhost:16686" + if self.namespace == "astronomy-shop": + # No NodePort in astronomy shop + self.base_url = "http://localhost:16686/jaeger/ui" self.start_port_forward() + else: + # Other namespaces may expose a NodePort + node_port = self.get_nodeport("jaeger", namespace) + if node_port: + self.base_url = f"http://localhost:{node_port}" + else: + self.base_url = "http://localhost:16686" + self.start_port_forward() def get_nodeport(self, service_name, namespace): """Fetch the NodePort for the given service.""" @@ -74,19 +80,38 @@ def print_output(self, stream): def is_port_in_use(self, port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(("127.0.0.1", port)) == 0 + + def get_jaeger_pod_name(self): + try: + result = subprocess.check_output( + ["kubectl", "get", "pods", "-n", self.namespace, + "-l", "app.kubernetes.io/name=jaeger", + "-o", "jsonpath={.items[0].metadata.name}"], + text=True + ) + return result.strip() + except subprocess.CalledProcessError as e: + print("Error getting Jaeger pod name:", e) + raise def start_port_forward(self): - """Starts kubectl port-forward command to access Jaeger service.""" + """Starts kubectl port-forward command to access Jaeger service or pod.""" for attempt in range(3): if self.is_port_in_use(16686): print( - f"Port 16686 is already in use. Attempt {attempt + 1} of {3}. Retrying in {3} seconds..." + f"Port 16686 is already in use. Attempt {attempt + 1} of 3. Retrying in 3 seconds..." ) time.sleep(3) continue - # command = "kubectl port-forward svc/jaeger 16686:16686 -n hotel-reservation" - command = f"kubectl port-forward svc/jaeger 16686:16686 -n {self.namespace}" + # Use pod port-forwarding for astronomy-shop only + if self.namespace == "astronomy-shop": + pod_name = self.get_jaeger_pod_name() + command = f"kubectl port-forward pod/{pod_name} 16686:16686 -n {self.namespace}" + else: + command = f"kubectl port-forward svc/jaeger 16686:16686 -n {self.namespace}" + + print("Starting port-forward with command:", command) self.port_forward_process = subprocess.Popen( command, shell=True, @@ -103,17 +128,17 @@ def start_port_forward(self): ) thread_out.start() thread_err.start() - time.sleep(3) # Wait a bit for the port-forward to establish - if ( - self.port_forward_process.poll() is None - ): # Check if the process is still running + time.sleep(3) # Let port-forward initialize + + if self.port_forward_process.poll() is None: print("Port forwarding established successfully.") break else: print("Port forwarding failed. Retrying...") else: - print("Failed to establish port forwarding after multiple attempts.") + print("Failed to establish port forwarding after 3 attempts.") + # TODO: modify this command for other microservices # command = "kubectl port-forward svc/jaeger 16686:16686 -n hotel-reservation" # self.port_forward_process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) @@ -126,38 +151,44 @@ def start_port_forward(self): # time.sleep(3) # Wait a bit for the port-forward to establish def stop_port_forward(self): - """Stops the kubectl port-forward command.""" if self.port_forward_process: - self.port_forward_process.terminate() # Send SIGTERM - self.port_forward_process.wait() # Wait for the process to terminate - self.stop_event.set() - print("Set the stop event.") - self.port_forward_process.stdout.close() - self.port_forward_process.stderr.close() - print("Port forwarding stopped.") + self.stop_event.set() # Signal threads to exit + try: + self.port_forward_process.terminate() + self.port_forward_process.wait(timeout=5) + except Exception as e: + print("Error terminating port-forward process:", e) + + try: + if self.port_forward_process.stdout: + self.port_forward_process.stdout.close() + if self.port_forward_process.stderr: + self.port_forward_process.stderr.close() + except Exception as e: + print("Error closing process streams:", e) + self.port_forward_process = None + def cleanup(self): - """Clean up resources.""" self.stop_port_forward() - for thread in threading.enumerate(): - if thread != threading.current_thread(): - thread.join(timeout=5) - if thread.is_alive(): - print( - f"Thread {thread.name} could not be joined and may need to be stopped forcefully." - ) + for thread in self.output_threads: + thread.join(timeout=5) + if thread.is_alive(): + print(f"Thread {thread.name} could not be joined and may need to be stopped forcefully.") + self.output_threads.clear() print("Cleanup completed.") def get_services(self) -> list: """Fetch a list of services from the tracing API.""" url = f"{self.base_url}/api/services" - response = requests.get(url) - if response.status_code == 200: - data = response.json() - # print(f"data: {response}") - return data.get("data", []) - else: - print(f"Failed to get services: {response.status_code}") + headers = {"Accept": "application/json"} if self.namespace == "astronomy-shop" else {} + + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + return response.json().get("data", []) + except Exception as e: + print(f"Failed to get services: {e}") return [] def get_traces( @@ -171,15 +202,14 @@ def get_traces( Fetch traces for a specific service between start_time and end_time. If limit is not specified, all available traces are fetched. """ - # Calculate the lookback in milliseconds. lookback = int((datetime.now() - start_time).total_seconds()) - url = f"{self.base_url}/api/traces?service={service_name}&lookback={lookback}s" if limit is not None: url += f"&limit={limit}" + headers = {"Accept": "application/json"} if self.namespace == "astronomy-shop" else {} try: - response = requests.get(url) + response = requests.get(url, headers=headers) response.raise_for_status() return response.json().get("data", []) except requests.RequestException as e: @@ -211,9 +241,9 @@ def extract_traces( ) for trace in traces: for span in trace["spans"]: - span[ + span["serviceName"] = trace["processes"][span["processID"]][ "serviceName" - ] = service # Directly associate service name with each span + ] all_traces.append(trace) # Collect the trace with service name included self.cleanup() print("Cleanup completed.") @@ -223,22 +253,20 @@ def extract_traces( def process_traces(self, traces) -> pd.DataFrame: """Process raw traces data into a structured DataFrame.""" trace_id_list = [] + span_id_list = [] service_name_list = [] operation_name_list = [] start_time_list = [] duration_list = [] parent_span_list = [] + error_list = [] + response_list = [] for trace in traces: trace_id = trace["traceID"] for span in trace["spans"]: trace_id_list.append(trace_id) - service_name_list.append( - span["serviceName"] - ) # Use the correct service name from the span - operation_name_list.append(span["operationName"]) - start_time_list.append(span["startTime"]) - duration_list.append(span["duration"]) + span_id_list.append(span["spanID"]) parent_span = "ROOT" if "references" in span: for ref in span["references"]: @@ -247,14 +275,34 @@ def process_traces(self, traces) -> pd.DataFrame: break parent_span_list.append(parent_span) + service_name_list.append( + span["serviceName"] + ) # Use the correct service name from the span + operation_name_list.append(span["operationName"]) + start_time_list.append(span["startTime"]) + duration_list.append(span["duration"]) + + has_error = False + response = "Unknown" + for tag in span.get("tags", []): + if tag["key"] == "error" and tag["value"] == True: + has_error = True + if tag["key"] == "http.status_code" or tag["key"] == "response_class": + response = tag["value"] + error_list.append(has_error) + response_list.append(response) + df = pd.DataFrame( { "trace_id": trace_id_list, + "span_id": span_id_list, + "parent_span": parent_span_list, "service_name": service_name_list, "operation_name": operation_name_list, "start_time": start_time_list, "duration": duration_list, - "parent_span": parent_span_list, + "has_error": error_list, + "response": response_list, } ) return df diff --git a/aiopslab/onboarding_evaluator.py b/aiopslab/onboarding_evaluator.py new file mode 100644 index 00000000..6eb051c6 --- /dev/null +++ b/aiopslab/onboarding_evaluator.py @@ -0,0 +1,216 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Orchestrator class that interfaces with the agent and the environment.""" + +from aiopslab.service.helm import Helm +from aiopslab.service.kubectl import KubeCtl +from aiopslab.session import Session +from aiopslab.orchestrator.problems.registry import ProblemRegistry +from aiopslab.orchestrator.onboarding_eval_parser import EvalParser +from aiopslab.utils.status import * +from aiopslab.service.telemetry.prometheus import Prometheus +import time +import inspect +import asyncio + + +class Evaluator: + def __init__(self): + self.agent = None + self.session = None + self.parser = EvalParser() + self.probs = ProblemRegistry() + self.sprint = SessionPrint() + self.execution_start_time = None + self.execution_end_time = None + self.kubectl = KubeCtl() + + def init_problem(self, problem_id: str): + """Initialize a problem instance for the agent to solve. + + Args: + problem_id (str): The problem instance identifier. + + Returns: + tuple: A tuple containing the problem description, task message, and session object. + """ + # Start timer + self.execution_start_time = time.time() + + self.session = Session() + print(f"Session ID: {self.session.session_id}") + prob = self.probs.get_problem_instance(problem_id) + self.session.set_problem(prob, pid=problem_id) + self.session.set_agent(self.agent_name) + + print("Setting up OpenEBS...") + + command = "kubectl get pods -n openebs" + result = self.kubectl.exec_command(command) + if "Running" in result: + print("OpenEBS is already running. Skipping installation.") + else: + self.kubectl.exec_command( + "kubectl apply -f https://openebs.github.io/charts/openebs-operator.yaml" + ) + self.kubectl.exec_command( + "kubectl patch storageclass openebs-hostpath -p '{\"metadata\": {\"annotations\":{\"storageclass.kubernetes.io/is-default-class\":\"true\"}}}'" + ) + self.kubectl.wait_for_ready("openebs") + print("OpenEBS setup completed.") + + # Setup and deploy Prometheus + self.prometheus = Prometheus() + self.prometheus.deploy() + + # deploy service + prob.app.delete() + prob.app.deploy() + + # inject fault + prob.inject_fault() + + # Check if start_workload is async or sync + if inspect.iscoroutinefunction(prob.start_workload): + asyncio.create_task(prob.start_workload()) + else: + prob.start_workload() + + task_desc = prob.get_task_description() + instructions = prob.get_instructions() + actions = prob.get_available_actions() + + return task_desc, instructions, actions + + def register_agent(self, agent, name="agent"): + """Register the agent for the current session. + + Args: + agent: The agent to register. + name: The name of the agent (default: "agent"). + """ + self.agent = agent + self.agent_name = name + + async def ask_agent(self, input): + """Ask the agent for the next action given the current context.""" + assert self.session is not None + assert self.agent is not None + + agent_response = await self.agent.get_action(input) + self.session.add({"role": "assistant", "content": agent_response}) + + return agent_response + + async def ask_env(self, input): + """Ask the environment for the observation given the current action.""" + assert self.session is not None + + try: + resp = self.parser.parse(input) + except ResponseParsingError as e: + self.session.add({"role": "env", "content": str(e)}) + return str(e) + + api, args, kwargs = resp["api_name"], resp["args"], resp["kwargs"] + + # special handling for submit + if api == "submit": + self.session.set_solution(args[0] if len(args) == 1 else args) + + # Use the problem's eval method to check if solution is valid + try: + # Calculate the current duration manually since session isn't ended yet + current_time = time.time() + current_duration = current_time - self.session.start_time + + # Create a temporary dict to store results + temp_results = self.session.problem.eval( + self.session.solution, + self.session.history, + current_duration + ) + + # Check if the solution is successful based on eval results + if temp_results.get("success", False): + env_response = SubmissionStatus.VALID_SUBMISSION + else: + env_response = SubmissionStatus.INVALID_SUBMISSION + + except Exception as e: + print(f"Error validating submission: {e}") + import traceback + traceback.print_exc() + env_response = SubmissionStatus.INVALID_SUBMISSION + else: + # Regular action handling + try: + env_response = self.session.problem.perform_action(api, *args, **kwargs) + except InvalidActionError as e: + env_response = str(e) + + self.session.add({"role": "env", "content": env_response}) + return env_response + + async def start_problem(self): + """Start the task and run until a valid submission is received. + + Returns: + dict: The final state of the session. + """ + assert self.session is not None + action_instr = "Please take the next action" + action, env_response, results = "", "", {} + self.session.start() + self.execution_start_time = time.time() + + # Initial environment response + env_response = await self.ask_env(action) + + while env_response != SubmissionStatus.VALID_SUBMISSION: + action = await self.ask_agent(action_instr) + self.sprint.agent(action) + + env_response = await self.ask_env(action) + self.sprint.service(env_response) + + if env_response == SubmissionStatus.VALID_SUBMISSION: + print("Submission is correct!") + break + elif env_response == SubmissionStatus.INVALID_SUBMISSION: + print("Your submission was invalid. Please continue working on the problem.") + else: + action_instr = env_response + + self.session.end() + + # Final evaluation with the valid submission + if env_response == SubmissionStatus.VALID_SUBMISSION: + results = self.session.problem.eval( + self.session.solution, self.session.history, self.session.get_duration() + ) + self.sprint.result(results) + + self.session.set_results(results) + self.session.to_json() + self.session.problem.recover_fault() + + # App cleanup + self.session.problem.app.cleanup() + + self.execution_end_time = time.time() + total_execution_time = self.execution_end_time - self.execution_start_time + time_keys = ["TTD", "TTL", "TTA", "TTM"] + key = next((k for k in time_keys if k in results), None) + framework_overhead = ( + total_execution_time - (results.get(key, 0) or 0) + ) + print(f"Framework overhead: {framework_overhead}") + + return { + "history": self.session.history, + "final_state": env_response, + "results": results, + "framework_overhead": framework_overhead, + } \ No newline at end of file diff --git a/aiopslab/orchestrator/actions/base.py b/aiopslab/orchestrator/actions/base.py index b623cca5..2aa9d49f 100644 --- a/aiopslab/orchestrator/actions/base.py +++ b/aiopslab/orchestrator/actions/base.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta from aiopslab.utils.actions import action, read, write from aiopslab.service.kubectl import KubeCtl +from aiopslab.service.dock import Docker from aiopslab.service.shell import Shell # from aiopslab.observer import initialize_pod_and_service_lists @@ -22,7 +23,7 @@ class TaskActions: @read def get_logs(namespace: str, service: str) -> str: """ - Collects relevant log data from a pod using Kubectl. + Collects relevant log data from a pod using Kubectl or from a container with Docker. Args: namespace (str): The namespace in which the service is running. @@ -31,20 +32,35 @@ def get_logs(namespace: str, service: str) -> str: Returns: str | dict | list[dicts]: Log data as a structured object or a string. """ - kubectl = KubeCtl() - try: - if namespace == "test-social-network": - user_service_pod = kubectl.get_pod_name(namespace, f"app={service}") - elif namespace == "test-hotel-reservation": - user_service_pod = kubectl.get_pod_name( - namespace, f"io.kompose.service={service}" - ) - else: - raise Exception - logs = kubectl.get_pod_logs(user_service_pod, namespace) - except Exception as e: - return "Error: Your service/namespace does not exist. Use kubectl to check." - + if namespace == "docker": + docker = Docker() + try: + logs = docker.get_logs(service) + except Exception as e: + return "Error: Your service does not exist. Use docker to check." + + else: + kubectl = KubeCtl() + try: + if namespace == "test-social-network": + user_service_pod = kubectl.get_pod_name(namespace, f"app={service}") + elif namespace == "test-hotel-reservation": + user_service_pod = kubectl.get_pod_name( + namespace, f"io.kompose.service={service}" + ) + elif namespace == "astronomy-shop": + user_service_pod = kubectl.get_pod_name( + namespace, f"app.kubernetes.io/name={service}" + ) + elif namespace == "default" and "wrk2-job" in service: + user_service_pod = kubectl.get_pod_name(namespace, f"job-name=wrk2-job") + else: + raise Exception + logs = kubectl.get_pod_logs(user_service_pod, namespace) + except Exception as e: + return "Error: Your service/namespace does not exist. Use kubectl to check." + + print(logs) logs = "\n".join(logs.split("\n")) return logs @@ -65,6 +81,9 @@ def exec_shell(command: str) -> str: """ if "kubectl edit" in command or "edit svc" in command: return "Error: Cannot use `kubectl edit`. Use `kubectl patch` instead." + + if "docker logs -f" in command: + return "Error: Cannot use `docker logs -f`. Use `docker logs` instead." return Shell.exec(command) @@ -111,7 +130,7 @@ def read_metrics(file_path: str) -> str: str: The requested metrics or an error message. """ if not os.path.exists(file_path): - return {"error": f"Metrics file '{file_path}' not found."} + return f"error: Metrics file '{file_path}' not found." try: df_metrics = pd.read_csv(file_path) @@ -161,7 +180,7 @@ def read_traces(file_path: str) -> str: str: The requested traces or an error message. """ if not os.path.exists(file_path): - return {"error": f"Traces file '{file_path}' not found."} + return f"error: Traces file '{file_path}' not found." try: df_traces = pd.read_csv(file_path) diff --git a/aiopslab/orchestrator/actions/detection.py b/aiopslab/orchestrator/actions/detection.py index ed2ffad3..3a3089b2 100644 --- a/aiopslab/orchestrator/actions/detection.py +++ b/aiopslab/orchestrator/actions/detection.py @@ -20,7 +20,7 @@ def submit(has_anomaly: str) -> SubmissionStatus: Submit if anomalies are detected to the orchestrator for evaluation. Args: - has_anomaly (str): Yes if anomalies are detected, No otherwise. + has_anomaly (str): "Yes" if anomalies are detected, "No" otherwise. Returns: SubmissionStatus: The status of the submission. diff --git a/aiopslab/orchestrator/evaluators/quantitative.py b/aiopslab/orchestrator/evaluators/quantitative.py index 4f1b08bf..92afd5e0 100644 --- a/aiopslab/orchestrator/evaluators/quantitative.py +++ b/aiopslab/orchestrator/evaluators/quantitative.py @@ -22,7 +22,7 @@ def out_tokens(trace: list[SessionItem]) -> int: # NOTE: not dollar value, since depends on Agent's model agent_steps = "".join([item.content for item in trace if item.role == "assistant"]) - return len(tokenizer.encode(agent_steps)) + return len(tokenizer.encode(agent_steps, disallowed_special=())) def in_tokens(trace: list[SessionItem]) -> int: diff --git a/aiopslab/orchestrator/onboarding_eval_parser.py b/aiopslab/orchestrator/onboarding_eval_parser.py new file mode 100644 index 00000000..3211aaf3 --- /dev/null +++ b/aiopslab/orchestrator/onboarding_eval_parser.py @@ -0,0 +1,170 @@ +"""Custom parser for the onboarding task evaluator""" + +import re +import ast + +from aiopslab.utils.status import ResponseParsingError + +class EvalParser: + def __init__(self): + # Define list of known API commands that need special handling + self.known_apis = ["submit"] + + def parse(self, response: str) -> dict: + """Parses the response string to extract the API name and arguments. + + Args: + response (str): The response string (typically an agent's response). + + Returns: + dict: The parsed API name and arguments. + """ + code_block = self.extract_codeblock(response) + context = self.extract_context(response) + + # If there's no code block, check if the response itself is a command + if not code_block: + code_block = response.strip() + + # Check if the code block is a simple "submit" command without parameters + if code_block.strip() == "submit": + return { + "api_name": "submit", + "args": [None], # Placeholder argument + "kwargs": {}, + "context": context, + } + + # Handle other known APIs with function call syntax + if any(code_block.strip().startswith(api + "(") for api in self.known_apis): + api_name = self.parse_api_name(code_block) + args, kwargs = self.parse_args(code_block) + return { + "api_name": api_name, + "args": args, + "kwargs": kwargs, + "context": context, + } + + # Default to exec_shell for unrecognized commands + # Strip any leading/trailing backticks if present + command = code_block.strip("` \n") + return { + "api_name": "exec_shell", + "args": [command], + "kwargs": {}, + "context": context, + } + + def extract_codeblock(self, response: str) -> str: + """Extract a markdown code block from a string. + + Args: + response (str): The response string. + + Returns: + str: The extracted code block. + """ + outputlines = response.split("\n") + indexlines = [i for i, line in enumerate(outputlines) if "```" in line] + if len(indexlines) < 2: + return "" + return "\n".join(outputlines[indexlines[0] + 1 : indexlines[1]]) + + def extract_context(self, response: str) -> list: + """Extract context outside of a code block. + + Args: + response (str): The response string. + + Returns: + list: The extracted context. + """ + pattern = r"(?:```[\s\S]*?```)|(.*?)(?:(?=```)|$)" + matches = re.findall(pattern, response, re.DOTALL) + context = [match.strip() for match in matches if match.strip()] + + return context + + def parse_api_name(self, response: str) -> str: + """Parses the API name from the response function call. + + Args: + response (str): The response string. + + Returns: + str: The API name. + """ + first_parenthesis = response.find("(") + if first_parenthesis != -1: + return response[:first_parenthesis].strip() + return "" + + def parse_args(self, response: str) -> tuple: + """Parses the arguments of a function call. + + Args: + response (str): The response string. + + Returns: + tuple: (args, kwargs) - Lists of positional and keyword arguments. + """ + first_parenthesis = response.find("(") + last_parenthesis = response.rfind(")") + + if first_parenthesis != -1 and last_parenthesis != -1: + args_str = response[first_parenthesis + 1 : last_parenthesis].strip() + + # case: no arguments + if not args_str: + return [], {} + + # case: positional/kwargs handled w/ ast.parse + try: + parsed = ast.parse(f"func({args_str})") + call = parsed.body[0].value + args, kwargs = [], {} + + for arg in call.args: + if isinstance(arg, ast.Constant): + args.append(arg.value) + elif isinstance(arg, (ast.List, ast.Tuple)): + args.append([self.eval_ast_node(elt) for elt in arg.elts]) + elif isinstance(arg, ast.Dict): + args.append( + { + self.eval_ast_node(key): self.eval_ast_node(value) + for key, value in zip(arg.keys, arg.values) + } + ) + else: + args.append(self.eval_ast_node(arg)) + + for kwarg in call.keywords: + kwargs[kwarg.arg] = self.eval_ast_node(kwarg.value) + + return args, kwargs + except Exception as e: + raise ResponseParsingError(f"Error parsing arguments: {str(e)}") + + return [], {} + + def eval_ast_node(self, node): + """Evaluates an AST node to its Python value.""" + if isinstance(node, ast.Constant): + return node.value + elif isinstance(node, ast.List): + return [self.eval_ast_node(elt) for elt in node.elts] + elif isinstance(node, ast.Dict): + return { + self.eval_ast_node(key): self.eval_ast_node(value) + for key, value in zip(node.keys, node.values) + } + elif isinstance(node, ast.Name): + if node.id == "True": + return True + elif node.id == "False": + return False + elif node.id == "None": + return None + raise ValueError(f"Unsupported AST node type: {type(node)}") \ No newline at end of file diff --git a/aiopslab/orchestrator/orchestrator.py b/aiopslab/orchestrator/orchestrator.py index a1938f1d..c29b68b1 100644 --- a/aiopslab/orchestrator/orchestrator.py +++ b/aiopslab/orchestrator/orchestrator.py @@ -9,10 +9,13 @@ from aiopslab.orchestrator.problems.registry import ProblemRegistry from aiopslab.orchestrator.parser import ResponseParser from aiopslab.utils.status import * +from aiopslab.utils.critical_section import CriticalSection from aiopslab.service.telemetry.prometheus import Prometheus import time import inspect import asyncio +import atexit +import os class Orchestrator: @@ -25,6 +28,7 @@ def __init__(self): self.execution_start_time = None self.execution_end_time = None self.kubectl = KubeCtl() + self.use_wandb = os.getenv("USE_WANDB", "false").lower() == "true" def init_problem(self, problem_id: str): """Initialize a problem instance for the agent to solve. @@ -44,13 +48,10 @@ def init_problem(self, problem_id: str): self.session.set_problem(prob, pid=problem_id) self.session.set_agent(self.agent_name) - print("Setting up OpenEBS...") + if "flower" not in problem_id: # temporary fix for testing, will edit later + print("Setting up OpenEBS...") - command = "kubectl get pods -n openebs" - result = self.kubectl.exec_command(command) - if "Running" in result: - print("OpenEBS is already running. Skipping installation.") - else: + # Install OpenEBS self.kubectl.exec_command( "kubectl apply -f https://openebs.github.io/charts/openebs-operator.yaml" ) @@ -60,16 +61,20 @@ def init_problem(self, problem_id: str): self.kubectl.wait_for_ready("openebs") print("OpenEBS setup completed.") - # Setup and deploy Prometheus - self.prometheus = Prometheus() - self.prometheus.deploy() + # Setup and deploy Prometheus + self.prometheus = Prometheus() + self.prometheus.deploy() # deploy service prob.app.delete() prob.app.deploy() - # inject fault - prob.inject_fault() + # make sure is_fault_injected is correct to apply appropriate + # function with atexit to recover fault + with CriticalSection(): + # inject fault + prob.inject_fault() + atexit.register(exit_cleanup_fault, prob=prob) # Check if start_workload is async or sync if inspect.iscoroutinefunction(prob.start_workload): @@ -121,8 +126,15 @@ async def ask_env(self, input): try: env_response = self.session.problem.perform_action(api, *args, **kwargs) + + if hasattr(env_response, "error"): + env_response = str(env_response) + print("An error occurred:", env_response) except InvalidActionError as e: env_response = str(e) + except Exception as e: + env_response = str(e) + print("Unhandled exception:", e) self.session.add({"role": "env", "content": env_response}) @@ -142,19 +154,29 @@ async def start_problem(self, max_steps: int): action, env_response, results = "", "", {} self.session.start() - for step in range(max_steps): - action = await self.ask_agent(action_instr) - self.sprint.agent(action) - - env_response = await self.ask_env(action) - self.sprint.service(env_response) - - if env_response == SubmissionStatus.VALID_SUBMISSION: - break - elif env_response == SubmissionStatus.INVALID_SUBMISSION: - raise ValueError("Invalid submission!") # TODO (@manish): ask to retry? - - action_instr = env_response + "\n" + "Please take the next action" + # catch any exception and recover fault before the users catch it + try: + for step in range(max_steps): + action = await self.ask_agent(action_instr) + self.sprint.agent(action) + + env_response = await self.ask_env(action) + self.sprint.service(env_response) + + if env_response == SubmissionStatus.VALID_SUBMISSION: + break + elif env_response == SubmissionStatus.INVALID_SUBMISSION: + raise ValueError("Invalid submission!") # TODO (@manish): ask to retry? + + action_instr = env_response + "\n" + "Please take the next action" + except Exception as e: + # Make sure the fault cleanup function is unregistered + # after recovering fault ahead because of exceptions + with CriticalSection(): + print("Some exception happened. Recovering the injected fault...") + self.session.problem.recover_fault() + atexit.unregister(exit_cleanup_fault) + raise e self.session.end() @@ -167,13 +189,27 @@ async def start_problem(self, max_steps: int): self.session.set_results(results) self.session.to_json() - self.session.problem.recover_fault() + if self.use_wandb: + self.session.to_wandb() + with CriticalSection(): + self.session.problem.recover_fault() + atexit.unregister(exit_cleanup_fault) + # Beyond recovering from fault, # I feel sometimes it is safer to delete the whole namespace. # But this will take more time. # if not self.session.problem.sys_status_after_recovery(): self.session.problem.app.cleanup() + + if self.session.problem.namespace != "docker": + self.prometheus.teardown() + print("Uninstalling OpenEBS...") + self.kubectl.exec_command("kubectl delete sc openebs-hostpath openebs-device --ignore-not-found") + self.kubectl.exec_command( + "kubectl delete -f https://openebs.github.io/charts/openebs-operator.yaml" + ) + self.kubectl.wait_for_namespace_deletion("openebs") self.execution_end_time = time.time() total_execution_time = self.execution_end_time - self.execution_start_time @@ -190,3 +226,8 @@ async def start_problem(self, max_steps: int): "results": results, "framework_overhead": framework_overhead, } + + +def exit_cleanup_fault(prob): + print("Recovering fault before exit...") + prob.recover_fault() diff --git a/aiopslab/orchestrator/problems/ad_service_failure/ad_service_failure.py b/aiopslab/orchestrator/problems/ad_service_failure/ad_service_failure.py index da07329d..18a1ebf0 100644 --- a/aiopslab/orchestrator/problems/ad_service_failure/ad_service_failure.py +++ b/aiopslab/orchestrator/problems/ad_service_failure/ad_service_failure.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "adservice" + self.faulty_service = "ad" def start_workload(self): print("== Start Workload ==") @@ -24,12 +24,12 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("adServiceFailure") + self.injector.inject_fault("adFailure") print(f"Fault: adServiceFailure | Namespace: {self.namespace}\n") def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("adServiceFailure") + self.injector.recover_fault("adFailure") ################## Detection Problem ################## @@ -61,7 +61,6 @@ class AdServiceFailureLocalization(AdServiceFailureBaseTask, LocalizationTask): def __init__(self): AdServiceFailureBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the ad service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/ad_service_high_cpu/ad_service_high_cpu.py b/aiopslab/orchestrator/problems/ad_service_high_cpu/ad_service_high_cpu.py index bf8d75e4..8cd8c4fd 100644 --- a/aiopslab/orchestrator/problems/ad_service_high_cpu/ad_service_high_cpu.py +++ b/aiopslab/orchestrator/problems/ad_service_high_cpu/ad_service_high_cpu.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "adservice" + self.faulty_service = "ad" def start_workload(self): print("== Start Workload ==") @@ -24,12 +24,12 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("adServiceHighCpu") + self.injector.inject_fault("adHighCpu") print(f"Fault: AdServiceHighCpu | Namespace: {self.namespace}\n") def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("adServiceHighCpu") + self.injector.recover_fault("adHighCpu") ################## Detection Problem ################## @@ -61,7 +61,6 @@ class AdServiceHighCpuLocalization(AdServiceHighCpuBaseTask, LocalizationTask): def __init__(self): AdServiceHighCpuBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the ad service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/ad_service_manual_gc/ad_service_manual_gc.py b/aiopslab/orchestrator/problems/ad_service_manual_gc/ad_service_manual_gc.py index da3a5ce2..f6502450 100644 --- a/aiopslab/orchestrator/problems/ad_service_manual_gc/ad_service_manual_gc.py +++ b/aiopslab/orchestrator/problems/ad_service_manual_gc/ad_service_manual_gc.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "adservice" + self.faulty_service = "ad" def start_workload(self): print("== Start Workload ==") @@ -24,12 +24,12 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("adServiceManualGc") + self.injector.inject_fault("adManualGc") print(f"Fault: adServiceManualGc | Namespace: {self.namespace}\n") def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("adServiceManualGc") + self.injector.recover_fault("adManualGc") ################## Detection Problem ################## @@ -61,7 +61,6 @@ class AdServiceManualGcLocalization(AdServiceManualGcBaseTask, LocalizationTask) def __init__(self): AdServiceManualGcBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the ad service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/assign_non_existent_node/assign_non_existent_node_social_net.py b/aiopslab/orchestrator/problems/assign_non_existent_node/assign_non_existent_node_social_net.py index 2f442ea1..58c69d76 100644 --- a/aiopslab/orchestrator/problems/assign_non_existent_node/assign_non_existent_node_social_net.py +++ b/aiopslab/orchestrator/problems/assign_non_existent_node/assign_non_existent_node_social_net.py @@ -93,7 +93,6 @@ class AssignNonExistentNodeSocialNetLocalization( def __init__(self): AssignNonExistentNodeSocialNetBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compost-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -137,7 +136,6 @@ class AssignNonExistentNodeSocialNetAnalysis( def __init__(self): AssignNonExistentNodeSocialNetBaseTask.__init__(self) AnalysisTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compost-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -179,7 +177,6 @@ class AssignNonExistentNodeSocialNetMitigation( def __init__(self): AssignNonExistentNodeSocialNetBaseTask.__init__(self) MitigationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compost-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float) -> dict: print("== Evaluation ==") @@ -191,7 +188,7 @@ def eval(self, soln: Any, trace: list[SessionItem], duration: float) -> dict: # Check if the faulty service exists faulty_service_exists = any( - pod.metadata.name == self.faulty_service for pod in pod_list.items + self.faulty_service in pod.metadata.name for pod in pod_list.items ) if not faulty_service_exists: print(f"Pod named {self.faulty_service} does not exist.") diff --git a/aiopslab/orchestrator/problems/auth_miss_mongodb/auth_miss_mongodb.py b/aiopslab/orchestrator/problems/auth_miss_mongodb/auth_miss_mongodb.py index 6fe3bc52..a07f2c2c 100644 --- a/aiopslab/orchestrator/problems/auth_miss_mongodb/auth_miss_mongodb.py +++ b/aiopslab/orchestrator/problems/auth_miss_mongodb/auth_miss_mongodb.py @@ -86,7 +86,6 @@ class MongoDBAuthMissingLocalization(MongoDBAuthMissingBaseTask, LocalizationTas def __init__(self): MongoDBAuthMissingBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compose-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -128,7 +127,6 @@ class MongoDBAuthMissingAnalysis(MongoDBAuthMissingBaseTask, AnalysisTask): def __init__(self): MongoDBAuthMissingBaseTask.__init__(self) AnalysisTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compose-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -169,7 +167,6 @@ class MongoDBAuthMissingMitigation(MongoDBAuthMissingBaseTask, MitigationTask): def __init__(self): MongoDBAuthMissingBaseTask.__init__(self) MitigationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compose-post-service` pod" # TODO: this migigate eval should be a bit different. # The error will not be on the container/pod level but the app level, diff --git a/aiopslab/orchestrator/problems/cart_service_failure/cart_service_failure.py b/aiopslab/orchestrator/problems/cart_service_failure/cart_service_failure.py index e65a9c11..50c75e1e 100644 --- a/aiopslab/orchestrator/problems/cart_service_failure/cart_service_failure.py +++ b/aiopslab/orchestrator/problems/cart_service_failure/cart_service_failure.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "cartservice" + self.faulty_service = "cart" def start_workload(self): print("== Start Workload ==") @@ -24,12 +24,12 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("cartServiceFailure") + self.injector.inject_fault("cartFailure") print(f"Fault: cartServiceFailure | Namespace: {self.namespace}\n") def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("cartServiceFailure") + self.injector.recover_fault("cartFailure") ################## Detection Problem ################## @@ -61,7 +61,6 @@ class CartServiceFailureLocalization(CartServiceFailureBaseTask, LocalizationTas def __init__(self): CartServiceFailureBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the cart service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/flower_model_misconfig/__init__.py b/aiopslab/orchestrator/problems/flower_model_misconfig/__init__.py new file mode 100644 index 00000000..1db81e08 --- /dev/null +++ b/aiopslab/orchestrator/problems/flower_model_misconfig/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .model_misconfig import ( + FlowerModelMisconfigDetection +) \ No newline at end of file diff --git a/aiopslab/orchestrator/problems/flower_model_misconfig/model_misconfig.py b/aiopslab/orchestrator/problems/flower_model_misconfig/model_misconfig.py new file mode 100644 index 00000000..62e0d8a9 --- /dev/null +++ b/aiopslab/orchestrator/problems/flower_model_misconfig/model_misconfig.py @@ -0,0 +1,80 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Model misconfiguration fault in the Flower application.""" + +import time +from typing import Any + +from aiopslab.orchestrator.tasks import * +from aiopslab.service.dock import Docker +from aiopslab.service.apps.flower import Flower +from aiopslab.paths import TARGET_MICROSERVICES +from aiopslab.session import SessionItem +from aiopslab.generators.fault.inject_virtual import VirtualizationFaultInjector + + +class FlowerModelMisconfigBaseTask: + def __init__(self, faulty_service: str = "user-service"): + self.app = Flower() + self.docker = Docker() + self.namespace = self.app.namespace + self.faulty_service = faulty_service + self.train_dir = TARGET_MICROSERVICES / "flower" + + def start_workload(self): + print("== Start Workload ==") + command = "flwr run train local-deployment" + self.docker.exec_command(command, cwd=self.train_dir) + + print("Waiting for workload to start...") + time.sleep(10) + + print("Injecting fault again...") + self.inject_fault() + + print("Waiting for faults to propagate...") + time.sleep(60) + print("Faults propagated.") + + def inject_fault(self): + print("== Fault Injection ==") + injector = VirtualizationFaultInjector(namespace=self.namespace) + injector._inject( + fault_type="model_misconfig", + microservices=[self.faulty_service], + ) + print(f"Service: {self.faulty_service} | Namespace: {self.namespace}\n") + + def recover_fault(self): + print("== Fault Recovery ==") + injector = VirtualizationFaultInjector(namespace=self.namespace) + injector._recover( + fault_type="model_misconfig", + microservices=[self.faulty_service], + ) + print(f"Service: {self.faulty_service} | Namespace: {self.namespace}\n") + + +################## Detection Problem ################## +class FlowerModelMisconfigDetection(FlowerModelMisconfigBaseTask, DetectionTask): + def __init__(self, faulty_service: str = "clientapp-1"): + FlowerModelMisconfigBaseTask.__init__(self, faulty_service=faulty_service) + DetectionTask.__init__(self, self.app) + + def eval(self, soln: Any, trace: list[SessionItem], duration: float): + print("== Evaluation ==") + expected_solution = "Yes" + + if isinstance(soln, str): + if soln.strip().lower() == expected_solution.lower(): + print(f"Correct detection: {soln}") + self.add_result("Detection Accuracy", "Correct") + else: + print(f"Incorrect detection: {soln}") + self.add_result("Detection Accuracy", "Incorrect") + else: + print("Invalid solution format") + self.add_result("Detection Accuracy", "Invalid Format") + + return super().eval(soln, trace, duration) diff --git a/aiopslab/orchestrator/problems/flower_node_stop/__init__.py b/aiopslab/orchestrator/problems/flower_node_stop/__init__.py new file mode 100644 index 00000000..fb33063c --- /dev/null +++ b/aiopslab/orchestrator/problems/flower_node_stop/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .node_stop import ( + FlowerNodeStopDetection +) \ No newline at end of file diff --git a/aiopslab/orchestrator/problems/flower_node_stop/node_stop.py b/aiopslab/orchestrator/problems/flower_node_stop/node_stop.py new file mode 100644 index 00000000..79596976 --- /dev/null +++ b/aiopslab/orchestrator/problems/flower_node_stop/node_stop.py @@ -0,0 +1,69 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Docker node stop fault problem in the Flower application.""" + +from typing import Any + +from aiopslab.orchestrator.tasks import * +from aiopslab.service.dock import Docker +from aiopslab.service.apps.flower import Flower +from aiopslab.paths import TARGET_MICROSERVICES +from aiopslab.session import SessionItem +from aiopslab.generators.fault.inject_virtual import VirtualizationFaultInjector + + +class FlowerNodeStopBaseTask: + def __init__(self, faulty_service: str = "user-service"): + self.app = Flower() + self.docker = Docker() + self.namespace = self.app.namespace + self.faulty_service = faulty_service + self.train_dir = TARGET_MICROSERVICES / "flower" + + def start_workload(self): + print("== Start Workload ==") + command = "flwr run train local-deployment" + self.docker.exec_command(command, cwd=self.train_dir) + + def inject_fault(self): + print("== Fault Injection ==") + injector = VirtualizationFaultInjector(namespace=self.namespace) + injector._inject( + fault_type="container_stop", + microservices=[self.faulty_service], + ) + print(f"Service: {self.faulty_service} | Namespace: {self.namespace}\n") + + def recover_fault(self): + print("== Fault Recovery ==") + injector = VirtualizationFaultInjector(namespace=self.namespace) + injector._recover( + fault_type="container_stop", + microservices=[self.faulty_service], + ) + print(f"Service: {self.faulty_service} | Namespace: {self.namespace}\n") + + +################## Detection Problem ################## +class FlowerNodeStopDetection(FlowerNodeStopBaseTask, DetectionTask): + def __init__(self, faulty_service: str = "supernode-1"): + FlowerNodeStopBaseTask.__init__(self, faulty_service=faulty_service) + DetectionTask.__init__(self, self.app) + + def eval(self, soln: Any, trace: list[SessionItem], duration: float): + print("== Evaluation ==") + expected_solution = "Yes" + + if isinstance(soln, str): + if soln.strip().lower() == expected_solution.lower(): + print(f"Correct detection: {soln}") + self.add_result("Detection Accuracy", "Correct") + else: + print(f"Incorrect detection: {soln}") + self.add_result("Detection Accuracy", "Incorrect") + else: + print("Invalid solution format") + self.add_result("Detection Accuracy", "Invalid Format") + + return super().eval(soln, trace, duration) diff --git a/aiopslab/orchestrator/problems/image_slow_load/image_slow_load.py b/aiopslab/orchestrator/problems/image_slow_load/image_slow_load.py index df36db3c..7d075be1 100644 --- a/aiopslab/orchestrator/problems/image_slow_load/image_slow_load.py +++ b/aiopslab/orchestrator/problems/image_slow_load/image_slow_load.py @@ -61,7 +61,6 @@ class ImageSlowLoadLocalization(ImageSlowLoadBaseTask, LocalizationTask): def __init__(self): ImageSlowLoadBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the frontend service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/k8s_target_port_misconfig/target_port.py b/aiopslab/orchestrator/problems/k8s_target_port_misconfig/target_port.py index 16467783..59d0d9cc 100644 --- a/aiopslab/orchestrator/problems/k8s_target_port_misconfig/target_port.py +++ b/aiopslab/orchestrator/problems/k8s_target_port_misconfig/target_port.py @@ -88,7 +88,6 @@ class K8STargetPortMisconfigLocalization( def __init__(self, faulty_service: str = "user-service"): K8STargetPortMisconfigBaseTask.__init__(self, faulty_service=faulty_service) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compose-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -133,7 +132,6 @@ class K8STargetPortMisconfigAnalysis(K8STargetPortMisconfigBaseTask, AnalysisTas def __init__(self, faulty_service: str = "user-service"): K8STargetPortMisconfigBaseTask.__init__(self, faulty_service=faulty_service) AnalysisTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compose-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -167,7 +165,6 @@ class K8STargetPortMisconfigMitigation(K8STargetPortMisconfigBaseTask, Mitigatio def __init__(self, faulty_service: str = "user-service"): K8STargetPortMisconfigBaseTask.__init__(self, faulty_service=faulty_service) MitigationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compose-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float) -> dict: print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/kafka_queue_problems/kafka_queue_problems.py b/aiopslab/orchestrator/problems/kafka_queue_problems/kafka_queue_problems.py index 74bebf28..13e137dc 100644 --- a/aiopslab/orchestrator/problems/kafka_queue_problems/kafka_queue_problems.py +++ b/aiopslab/orchestrator/problems/kafka_queue_problems/kafka_queue_problems.py @@ -61,7 +61,6 @@ class KafkaQueueProblemsLocalization(KafkaQueueProblemsBaseTask, LocalizationTas def __init__(self): KafkaQueueProblemsBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the kafka service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/loadgenerator_flood_homepage/loadgenerator_flood_homepage.py b/aiopslab/orchestrator/problems/loadgenerator_flood_homepage/loadgenerator_flood_homepage.py index 3c7e79ef..2c051acf 100644 --- a/aiopslab/orchestrator/problems/loadgenerator_flood_homepage/loadgenerator_flood_homepage.py +++ b/aiopslab/orchestrator/problems/loadgenerator_flood_homepage/loadgenerator_flood_homepage.py @@ -25,12 +25,12 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("loadgeneratorFloodHomepage") + self.injector.inject_fault("loadGeneratorFloodHomepage") print(f"Fault: loadgeneratorFloodHomepage | Namespace: {self.namespace}\n") def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("loadgeneratorFloodHomepage") + self.injector.recover_fault("loadGeneratorFloodHomepage") ################## Detection Problem ################## @@ -66,7 +66,6 @@ class LoadGeneratorFloodHomepageLocalization( def __init__(self): LoadGeneratorFloodHomepageBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the frontend service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/misconfig_app/misconfig_app_hotel_res.py b/aiopslab/orchestrator/problems/misconfig_app/misconfig_app_hotel_res.py index 861da588..822d777d 100644 --- a/aiopslab/orchestrator/problems/misconfig_app/misconfig_app_hotel_res.py +++ b/aiopslab/orchestrator/problems/misconfig_app/misconfig_app_hotel_res.py @@ -3,6 +3,7 @@ """MongoDB storage user unregistered problem in the HotelReservation application.""" +from time import sleep from typing import Any from aiopslab.orchestrator.tasks import * @@ -166,32 +167,32 @@ def eval(self, soln: Any, trace: list[SessionItem], duration: float) -> dict: super().eval(soln, trace, duration) # Check if all services (not only faulty service) is back to normal (Running) - pod_list = self.kubectl.list_pods(self.namespace) all_normal = True - - for pod in pod_list.items: - # Check container statuses - for container_status in pod.status.container_statuses: - if ( - container_status.state.waiting - and container_status.state.waiting.reason == "CrashLoopBackOff" - ): - print(f"Container {container_status.name} is in CrashLoopBackOff") - all_normal = False - elif ( - container_status.state.terminated - and container_status.state.terminated.reason != "Completed" - ): - print( - f"Container {container_status.name} is terminated with reason: {container_status.state.terminated.reason}" - ) - all_normal = False - elif not container_status.ready: - print(f"Container {container_status.name} is not ready") - all_normal = False + # Polling for 1 minute to check if all services are back to normal + for _ in range(12): # 5 seconds interval, 12 times, total 1 minute + pod_list = self.kubectl.list_pods(self.namespace) + for pod in pod_list.items: + # Check container statuses + for container_status in pod.status.container_statuses: + if container_status.state.waiting: + reason = container_status.state.waiting.reason + if reason in ["CrashLoopBackOff", "Error", "ImagePullBackOff", "ErrImagePull"]: + print(f"Container {container_status.name} is in error state: {reason}") + all_normal = False + elif container_status.state.terminated and container_status.state.terminated.reason != "Completed": + print(f"Container {container_status.name} is terminated with reason: {container_status.state.terminated.reason}") + all_normal = False + elif not container_status.ready: + print(f"Container {container_status.name} is not ready") + all_normal = False + + if not all_normal: + break if not all_normal: break + # Wait for 5 seconds before checking again + sleep(5) self.results["success"] = all_normal return self.results diff --git a/aiopslab/orchestrator/problems/no_op/no_op.py b/aiopslab/orchestrator/problems/no_op/no_op.py index 29cec74d..9ddac7fa 100644 --- a/aiopslab/orchestrator/problems/no_op/no_op.py +++ b/aiopslab/orchestrator/problems/no_op/no_op.py @@ -7,6 +7,7 @@ from aiopslab.service.kubectl import KubeCtl from aiopslab.service.apps.hotelres import HotelReservation from aiopslab.service.apps.socialnet import SocialNetwork +from aiopslab.service.apps.astronomy_shop import AstronomyShop from aiopslab.generators.workload.wrk import Wrk from aiopslab.generators.fault.inject_noop import NoopFaultInjector from aiopslab.session import SessionItem @@ -17,20 +18,25 @@ class NoOpBaseTask: def __init__(self, app_name: str = "hotel"): - if app_name == "hotel": + self.app_name = app_name + + if self.app_name == "hotel": self.app = HotelReservation() self.payload_script = ( TARGET_MICROSERVICES / "hotelReservation/wrk2/scripts/hotel-reservation/mixed-workload_type_1.lua" ) self.faulty_service = "PLACEHOLDER" - elif app_name == "social": + elif self.app_name == "social": self.app = SocialNetwork() self.payload_script = ( TARGET_MICROSERVICES / "socialNetwork/wrk2/scripts/social-network/compose-post.lua" ) self.faulty_service = "PLACEHOLDER" + elif self.app_name == "astronomy_shop": + self.app = AstronomyShop() + self.faulty_service = "PLACEHOLDER" else: raise ValueError(f"Unsupported app_name: {app_name}") @@ -39,14 +45,18 @@ def __init__(self, app_name: str = "hotel"): self.injector = NoopFaultInjector(namespace=self.namespace) def start_workload(self): - print("== Start Workload ==") - frontend_url = get_frontend_url(self.app) + if self.app_name != 'astronomy_shop': + print("== Start Workload ==") + frontend_url = get_frontend_url(self.app) - wrk = Wrk(rate=10, dist="exp", connections=2, duration=10, threads=2) - wrk.start_workload( - payload_script=self.payload_script, - url=f"{frontend_url}", - ) + wrk = Wrk(rate=10, dist="exp", connections=2, duration=10, threads=2) + wrk.start_workload( + payload_script=self.payload_script, + url=f"{frontend_url}", + ) + else: + # Skip workload since astronomy shop has its own workload generator + print("== Workload Skipped ==") def inject_fault(self): print("== Fault Injection ==") diff --git a/aiopslab/orchestrator/problems/payment_service_failure/payment_service_failure.py b/aiopslab/orchestrator/problems/payment_service_failure/payment_service_failure.py index 8352068b..90fd3f0e 100644 --- a/aiopslab/orchestrator/problems/payment_service_failure/payment_service_failure.py +++ b/aiopslab/orchestrator/problems/payment_service_failure/payment_service_failure.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "paymentservice" + self.faulty_service = "payment" def start_workload(self): print("== Start Workload ==") @@ -24,12 +24,12 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("paymentServiceFailure") + self.injector.inject_fault("paymentFailure") print(f"Fault: paymentServiceFailure | Namespace: {self.namespace}\n") def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("paymentServiceFailure") + self.injector.recover_fault("paymentFailure") ################## Detection Problem ################## @@ -63,7 +63,6 @@ class PaymentServiceFailureLocalization( def __init__(self): PaymentServiceFailureBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the payment service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/payment_service_unreachable/payment_service_unreachable.py b/aiopslab/orchestrator/problems/payment_service_unreachable/payment_service_unreachable.py index 529bc1c8..4027a1e6 100644 --- a/aiopslab/orchestrator/problems/payment_service_unreachable/payment_service_unreachable.py +++ b/aiopslab/orchestrator/problems/payment_service_unreachable/payment_service_unreachable.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "paymentservice" + self.faulty_service = "checkout" def start_workload(self): print("== Start Workload ==") @@ -24,12 +24,12 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("paymentServiceUnreachable") + self.injector.inject_fault("paymentUnreachable") print(f"Fault: paymentServiceUnreachable | Namespace: {self.namespace}\n") def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("paymentServiceUnreachable") + self.injector.recover_fault("paymentUnreachable") ################## Detection Problem ################## @@ -65,7 +65,6 @@ class PaymentServiceUnreachableLocalization( def __init__(self): PaymentServiceUnreachableBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the payment service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/product_catalog_failure/product_catalog_failure.py b/aiopslab/orchestrator/problems/product_catalog_failure/product_catalog_failure.py index a9099218..f09d837a 100644 --- a/aiopslab/orchestrator/problems/product_catalog_failure/product_catalog_failure.py +++ b/aiopslab/orchestrator/problems/product_catalog_failure/product_catalog_failure.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "productcatalogservice" + self.faulty_service = "product-catalog" def start_workload(self): print("== Start Workload ==") @@ -65,7 +65,6 @@ class ProductCatalogServiceFailureLocalization( def __init__(self): ProductCatalogServiceFailureBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the product catalog service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/recommendation_service_cache_failure/recommendation_service_cache_failure.py b/aiopslab/orchestrator/problems/recommendation_service_cache_failure/recommendation_service_cache_failure.py index 4827bb83..06b5e982 100644 --- a/aiopslab/orchestrator/problems/recommendation_service_cache_failure/recommendation_service_cache_failure.py +++ b/aiopslab/orchestrator/problems/recommendation_service_cache_failure/recommendation_service_cache_failure.py @@ -16,7 +16,7 @@ def __init__(self): self.kubectl = KubeCtl() self.namespace = self.app.namespace self.injector = OtelFaultInjector(namespace=self.namespace) - self.faulty_service = "recommendationservice" + self.faulty_service = "recommendation" def start_workload(self): print("== Start Workload ==") @@ -24,14 +24,14 @@ def start_workload(self): def inject_fault(self): print("== Fault Injection ==") - self.injector.inject_fault("recommendationServiceCacheFailure") + self.injector.inject_fault("recommendationCacheFailure") print( f"Fault: recommendationServiceCacheFailure | Namespace: {self.namespace}\n" ) def recover_fault(self): print("== Fault Recovery ==") - self.injector.recover_fault("recommendationServiceCacheFailure") + self.injector.recover_fault("recommendationCacheFailure") ################## Detection Problem ################## @@ -67,7 +67,6 @@ class RecommendationServiceCacheFailureLocalization( def __init__(self): RecommendationServiceCacheFailureBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the reccomendation service." def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/registry.py b/aiopslab/orchestrator/problems/registry.py index 2563b27d..9cf36e5e 100644 --- a/aiopslab/orchestrator/problems/registry.py +++ b/aiopslab/orchestrator/problems/registry.py @@ -27,6 +27,8 @@ from aiopslab.orchestrator.problems.redeploy_without_pv import * from aiopslab.orchestrator.problems.wrong_bin_usage import * from aiopslab.orchestrator.problems.operator_misoperation import * +from aiopslab.orchestrator.problems.flower_node_stop import * +from aiopslab.orchestrator.problems.flower_model_misconfig import * class ProblemRegistry: @@ -159,6 +161,7 @@ def __init__(self): app_name="hotel" ), "noop_detection_social_network-1": lambda: NoOpDetection(app_name="social"), + "noop_detection_astronomy_shop-1": lambda: NoOpDetection(app_name="astronomy_shop"), # NOTE: This should be getting fixed by the great powers of @jinghao-jia # Kernel fault -> https://github.com/xlab-uiuc/agent-ops/pull/10#issuecomment-2468992285 # There's a bug in chaos mesh regarding this fault, wait for resolution and retest kernel fault @@ -200,16 +203,19 @@ def __init__(self): "wrong_bin_usage-analysis-1": WrongBinUsageAnalysis, "wrong_bin_usage-mitigation-1": WrongBinUsageMitigation, # K8S operator misoperation - "operator_overload_replicas-detection-1": K8SOperatorOverloadReplicasDetection, - "operator_overload_replicas-localization-1": K8SOperatorOverloadReplicasLocalization, - "operator_non_existent_storage-detection-1": K8SOperatorNonExistentStorageDetection, - "operator_non_existent_storage-localization-1": K8SOperatorNonExistentStorageLocalization, - "operator_invalid_affinity_toleration-detection-1": K8SOperatorInvalidAffinityTolerationDetection, - "operator_invalid_affinity_toleration-localization-1": K8SOperatorInvalidAffinityTolerationLocalization, - "operator_security_context_fault-detection-1": K8SOperatorSecurityContextFaultDetection, - "operator_security_context_fault-localization-1": K8SOperatorSecurityContextFaultLocalization, - "operator_wrong_update_strategy-detection-1": K8SOperatorWrongUpdateStrategyDetection, - "operator_wrong_update_strategy-localization-1": K8SOperatorWrongUpdateStrategyLocalization, + # "operator_overload_replicas-detection-1": K8SOperatorOverloadReplicasDetection, + # "operator_overload_replicas-localization-1": K8SOperatorOverloadReplicasLocalization, + # "operator_non_existent_storage-detection-1": K8SOperatorNonExistentStorageDetection, + # "operator_non_existent_storage-localization-1": K8SOperatorNonExistentStorageLocalization, + # "operator_invalid_affinity_toleration-detection-1": K8SOperatorInvalidAffinityTolerationDetection, + # "operator_invalid_affinity_toleration-localization-1": K8SOperatorInvalidAffinityTolerationLocalization, + # "operator_security_context_fault-detection-1": K8SOperatorSecurityContextFaultDetection, + # "operator_security_context_fault-localization-1": K8SOperatorSecurityContextFaultLocalization, + # "operator_wrong_update_strategy-detection-1": K8SOperatorWrongUpdateStrategyDetection, + # "operator_wrong_update_strategy-localization-1": K8SOperatorWrongUpdateStrategyLocalization, + # Flower + "flower_node_stop-detection": FlowerNodeStopDetection, + "flower_model_misconfig-detection": FlowerModelMisconfigDetection, } def get_problem_instance(self, problem_id: str): diff --git a/aiopslab/orchestrator/problems/revoke_auth/revoke_auth.py b/aiopslab/orchestrator/problems/revoke_auth/revoke_auth.py index 28d4d12c..8151d0bf 100644 --- a/aiopslab/orchestrator/problems/revoke_auth/revoke_auth.py +++ b/aiopslab/orchestrator/problems/revoke_auth/revoke_auth.py @@ -98,7 +98,8 @@ def eval(self, soln: Any, trace: list[SessionItem], duration: float): return self.results # Calculate exact match and subset - is_exact = is_exact_match(soln, self.faulty_service) + is_exact = is_exact_match(soln, self.faulty_service) or is_exact_match(soln, self.faulty_service.removeprefix("mongodb-")) # Given that monogodb-geo and geo are closely coupled + # (likewise with rate), either pod should be an answer is_sub = is_subset([self.faulty_service], soln) # Determine accuracy diff --git a/aiopslab/orchestrator/problems/scale_pod/scale_pod_social_net.py b/aiopslab/orchestrator/problems/scale_pod/scale_pod_social_net.py index 3077dc53..e9672e43 100644 --- a/aiopslab/orchestrator/problems/scale_pod/scale_pod_social_net.py +++ b/aiopslab/orchestrator/problems/scale_pod/scale_pod_social_net.py @@ -92,7 +92,6 @@ class ScalePodSocialNetLocalization(ScalePodSocialNetBaseTask, LocalizationTask) def __init__(self): ScalePodSocialNetBaseTask.__init__(self) LocalizationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compost-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -134,7 +133,6 @@ class ScalePodSocialNetAnalysis(ScalePodSocialNetBaseTask, AnalysisTask): def __init__(self): ScalePodSocialNetBaseTask.__init__(self) AnalysisTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compost-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float): print("== Evaluation ==") @@ -174,7 +172,6 @@ class ScalePodSocialNetMitigation(ScalePodSocialNetBaseTask, MitigationTask): def __init__(self): ScalePodSocialNetBaseTask.__init__(self) MitigationTask.__init__(self, self.app) - self.task_desc += "Start by investigating the `compost-post-service` pod" def eval(self, soln: Any, trace: list[SessionItem], duration: float) -> dict: print("== Evaluation ==") diff --git a/aiopslab/orchestrator/problems/storage_user_unregistered/storage_user_unregistered.py b/aiopslab/orchestrator/problems/storage_user_unregistered/storage_user_unregistered.py index 51abdef1..27e75295 100644 --- a/aiopslab/orchestrator/problems/storage_user_unregistered/storage_user_unregistered.py +++ b/aiopslab/orchestrator/problems/storage_user_unregistered/storage_user_unregistered.py @@ -103,7 +103,9 @@ def eval(self, soln: Any, trace: list[SessionItem], duration: float): return self.results # Calculate exact match and subset - is_exact = is_exact_match(soln, self.faulty_service) + # Given that monogodb-geo and geo are closely coupled + # (likewise with rate), either pod should be an answer + is_exact = is_exact_match(soln, self.faulty_service) or is_exact_match(soln, self.faulty_service.removeprefix("mongodb-")) is_sub = is_subset([self.faulty_service], soln) # Determine accuracy diff --git a/aiopslab/orchestrator/problems/wrong_bin_usage/wrong_bin_usage.py b/aiopslab/orchestrator/problems/wrong_bin_usage/wrong_bin_usage.py index 4c9f55f6..16084d57 100644 --- a/aiopslab/orchestrator/problems/wrong_bin_usage/wrong_bin_usage.py +++ b/aiopslab/orchestrator/problems/wrong_bin_usage/wrong_bin_usage.py @@ -154,7 +154,6 @@ def eval(self, soln: Any, trace: list[SessionItem], duration: float): ################## Mitigation Problem ################## -# TODO: May need to check the application log cause the error will not be illustrated in the pod level. class WrongBinUsageMitigation(WrongBinUsageBaseTask, MitigationTask): def __init__(self, faulty_service: str = "profile"): WrongBinUsageBaseTask.__init__(self, faulty_service=faulty_service) @@ -171,26 +170,44 @@ def eval(self, soln: Any, trace: list[SessionItem], duration: float) -> dict: for pod in pod_list.items: # Check container statuses for container_status in pod.status.container_statuses: - if ( - container_status.state.waiting - and container_status.state.waiting.reason == "CrashLoopBackOff" - ): - print(f"Container {container_status.name} is in CrashLoopBackOff") - all_normal = False - elif ( - container_status.state.terminated - and container_status.state.terminated.reason != "Completed" - ): - print( - f"Container {container_status.name} is terminated with reason: {container_status.state.terminated.reason}" - ) + if container_status.state.waiting: + reason = container_status.state.waiting.reason + if reason in ["CrashLoopBackOff", "Error", "ImagePullBackOff", "ErrImagePull"]: + print(f"Container {container_status.name} is in error state: {reason}") + all_normal = False + elif container_status.state.terminated and container_status.state.terminated.reason != "Completed": + print(f"Container {container_status.name} is terminated with reason: {container_status.state.terminated.reason}") all_normal = False elif not container_status.ready: print(f"Container {container_status.name} is not ready") all_normal = False if not all_normal: - break + print("Pods are not all in a good state.") + self.results["success"] = False + return self.results - self.results["success"] = all_normal - return self.results + # Check if the deployment was updated to use the right binary + expected_command = "profile" # Command dictates which binary will be ran, we want to run /go/bin/profile and not /go/bin/geo + + try: + deployment = self.kubectl.get_deployment(self.faulty_service, self.namespace) + containers = deployment.spec.template.spec.containers + + for container in containers: + command = container.command or [] + if expected_command not in command: + print( + f"[FAIL] Deployment for container '{container.name}' is using wrong binary: {command}" + ) + self.results["success"] = False + return self.results + + print("[PASS] Deployment is using the correct binary.") + self.results["success"] = True + return self.results + + except Exception as e: + print(f"[ERROR] Exception during evaluation: {e}") + self.results["success"] = False + return self.results diff --git a/aiopslab/orchestrator/tasks/analysis.py b/aiopslab/orchestrator/tasks/analysis.py index 018deecc..1186a61b 100644 --- a/aiopslab/orchestrator/tasks/analysis.py +++ b/aiopslab/orchestrator/tasks/analysis.py @@ -25,9 +25,9 @@ def __init__(self, app: Application): self.actions = AnalysisActions() self.task_desc = """\ - You are an expert DevOps engineer who has been tasked with doing root cause analysis in a deployed service. + You are an expert DevOps engineer assigned to do root cause analysis in a deployed service. - The service you are working with today is described below: + Service Details: {app_summary} You will begin by analyzing the service's state and telemetry, and then submit one of two possible solutions: @@ -51,16 +51,23 @@ def __init__(self, app: Application): self.instructions = """\ You will respond with one of the above APIs as your next action. Please respond in the following format in a markdown code block: - ``` - (, ...) - ``` + ```\n(, ...)\n``` - For example: - ``` - exec_shell("ls -l") # will list files in current directory - ``` + For instance, if you want to list files in current directory, your response must be exactly: + + ```\nexec_shell("ls -l")\n``` + + When submitting your analysis, use the following format: + + ```\nsubmit({"system_level": "your_system_level_analysis", "fault_type": "your_fault_type_analysis"})\n``` + + Replace "your_system_level_analysis" and "your_fault_type_analysis" with the actual analysis of the system level and fault type. + + Or, if no fault is detected, you should respond with: + + ```\nsubmit()\n``` - Please respond with only a single action per turn. + Please respond with only a single API call (a.k.a., action) per turn without any additional words, labels, or prefixes. """ def get_task_description(self): @@ -81,6 +88,6 @@ def perform_action(self, action_name, *args, **kwargs): raise InvalidActionError(action_name) def eval(self, soln: Any, trace: list[SessionItem], duration: float): - self.add_result("TTR", duration) + self.add_result("TTA", duration) self.common_eval(trace) return self.results diff --git a/aiopslab/orchestrator/tasks/detection.py b/aiopslab/orchestrator/tasks/detection.py index 31a2d060..449a86cb 100644 --- a/aiopslab/orchestrator/tasks/detection.py +++ b/aiopslab/orchestrator/tasks/detection.py @@ -26,28 +26,33 @@ def __init__(self, app: Application): self.actions = DetectionActions() self.task_desc = """\ - You are an expert DevOps engineer who has been tasked with detecting anomalies in a deployed service. - - The service you are working with today is described below: + You are an expert DevOps engineer assigned to detect anomalies in a deployed service. + + Service Details: {app_summary} You will begin by analyzing the service's state and telemetry, and then submit your solution: - 1. str: `Yes` or `No` to indicate whether there are anomalies detected + 1. str: "Yes" or "No" to indicate whether there are anomalies detected """ self.instructions = """\ You will respond with one of the above APIs as your next action. Please respond in the following format in a markdown code block: - ``` - (, ...) - ``` + ```\n(, ...)\n``` + + For instance, if you want to list files in current directory, your response must be exactly: + + ```\nexec_shell("ls -l")\n``` + + If you decide that there are no anomalies: + + ```\nsubmit(\"No\")\n``` + + Or, if anomalies are found: - For example: - ``` - exec_shell("ls -l") # will list files in current directory - ``` + ```\nsubmit(\"Yes\")\n``` - Please respond with only a single action per turn. + Please respond with only a single API call (a.k.a., action) per turn without any additional words, labels, or prefixes. """ def get_task_description(self): diff --git a/aiopslab/orchestrator/tasks/localization.py b/aiopslab/orchestrator/tasks/localization.py index d02a7c85..6b3af40d 100644 --- a/aiopslab/orchestrator/tasks/localization.py +++ b/aiopslab/orchestrator/tasks/localization.py @@ -26,29 +26,37 @@ def __init__(self, app: Application): self.actions = LocalizationActions() self.task_desc = """\ - You are an expert DevOps engineer who has been tasked with localizing faults in a deployed service. + You are an expert DevOps engineer assigned to localize faults in a deployed service. - The service you are working with today is described below: + Service Details: {app_summary} + You are requested to identify the service(s) where the root cause of the fault lies. You will begin by analyzing the service's state and telemetry, and then submit one of two possible solutions: 1. list[str]: list of faulty components (i.e., service names) - 2. str: `None` if no faults were detected + 2. list[]: an empty list if no faults were detected """ self.instructions = """\ You will respond with one of the above APIs as your next action. Please respond in the following format in a markdown code block: - ``` - (, ...) - ``` + ```\n(, ...)\n``` - For example: - ``` - exec_shell("ls -l") # will list files in current directory - ``` + For instance, if you want to list files in current directory, your response must be exactly: + + ```\nexec_shell("ls -l")\n``` - Please respond with only a single action per turn. + If there are faulty components to submit: + + ```\nsubmit([\"placeholderA\", \"placeholderB\"])\n``` + + (where "placeholderA" and "placeholderB" are placeholders; replace them with the faulty components in your environment) + + Or, if no faults are found: + + ```\nsubmit([])\n``` + + Please respond with only a single API call (a.k.a., action) per turn without any additional words, labels, or prefixes. """ def get_task_description(self): diff --git a/aiopslab/orchestrator/tasks/mitigation.py b/aiopslab/orchestrator/tasks/mitigation.py index 2f45e139..d15b62dd 100644 --- a/aiopslab/orchestrator/tasks/mitigation.py +++ b/aiopslab/orchestrator/tasks/mitigation.py @@ -25,9 +25,9 @@ def __init__(self, app: Application): self.actions = MitigationActions() self.task_desc = """\ - You are an expert DevOps engineer who has been tasked with mitigating anomalies in a deployed service. + You are an expert DevOps engineer assigned to mitigate anomalies in a deployed service. - The service you are working with today is described below: + Service Details: {app_summary} You will begin by analyzing the service's state and telemetry, and then submit a solution that mitigates any detected anomalies. @@ -37,16 +37,21 @@ def __init__(self, app: Application): self.instructions = """\ You will respond with one of the above APIs as your next action. Please respond in the following format in a markdown code block: - ``` - (, ...) - ``` + ```\n(, ...)\n``` - For example: - ``` - exec_shell("ls -l") # will list files in current directory - ``` + For instance, if you want to list files in current directory, your response must be exactly: + + ```\nexec_shell("ls -l")\n``` - Please respond with only a single action per turn. + Once your solution is complete and ready for evaluation, you must call: + + ```\nsubmit()\n``` + + Note: + - The submit() call for the mitigation task does not take any parameters. + - A submission via submit() is considered valid if it is made, though this does not necessarily indicate that your solution is correct. + + Please respond with only a single API call (a.k.a., action) per turn without any additional words, labels, or prefixes. """ def get_task_description(self): diff --git a/aiopslab/paths.py b/aiopslab/paths.py index 6aad9b2a..2391bc41 100644 --- a/aiopslab/paths.py +++ b/aiopslab/paths.py @@ -34,3 +34,4 @@ ASTRONOMY_SHOP_METADATA = BASE_DIR / "service" / "metadata" / "astronomy-shop.json" TIDB_METADATA = BASE_DIR / "service" / "metadata" / "tidb-with-operator.json" FLIGHT_TICKET_METADATA = BASE_DIR / "service" / "metadata" / "flight-ticket.json" +FLOWER_METADATA = BASE_DIR / "service" / "metadata" / "flower.json" \ No newline at end of file diff --git a/aiopslab/service/apps/astronomy_shop.py b/aiopslab/service/apps/astronomy_shop.py index 68b81e09..c1caeb0e 100644 --- a/aiopslab/service/apps/astronomy_shop.py +++ b/aiopslab/service/apps/astronomy_shop.py @@ -5,7 +5,6 @@ from aiopslab.service.helm import Helm from aiopslab.service.kubectl import KubeCtl from aiopslab.service.apps.base import Application -from aiopslab.paths import TARGET_MICROSERVICES from aiopslab.paths import ASTRONOMY_SHOP_METADATA @@ -19,7 +18,7 @@ def __init__(self): def load_app_json(self): super().load_app_json() metadata = self.get_app_json() - self.frontend_service = "astronomy-shop-frontendproxy" + self.frontend_service = "frontend-proxy" self.frontend_port = 8080 def deploy(self): diff --git a/aiopslab/service/apps/base.py b/aiopslab/service/apps/base.py index 214a8e38..b88a0ee6 100644 --- a/aiopslab/service/apps/base.py +++ b/aiopslab/service/apps/base.py @@ -15,6 +15,7 @@ def __init__(self, config_file: str): self.helm_deploy = True self.helm_configs = {} self.k8s_deploy_path = None + self.docker_deploy_path = None def load_app_json(self): """Load (basic) application metadata into attributes. @@ -28,12 +29,16 @@ def load_app_json(self): self.namespace = metadata["Namespace"] if "Helm Config" in metadata: self.helm_configs = metadata["Helm Config"] - if "chart_path" in self.helm_configs: - chart_path = self.helm_configs["chart_path"] + chart_path = self.helm_configs.get("chart_path") + + if chart_path and not self.helm_configs.get("remote_chart", False): self.helm_configs["chart_path"] = str(TARGET_MICROSERVICES / chart_path) if "K8S Deploy Path" in metadata: self.k8s_deploy_path = TARGET_MICROSERVICES / metadata["K8S Deploy Path"] + + if "Docker Deploy Path" in metadata: + self.docker_deploy_path = TARGET_MICROSERVICES / metadata["Docker Deploy Path"] def get_app_json(self) -> dict: """Get application metadata in JSON format. diff --git a/aiopslab/service/apps/flower.py b/aiopslab/service/apps/flower.py new file mode 100644 index 00000000..22834d3e --- /dev/null +++ b/aiopslab/service/apps/flower.py @@ -0,0 +1,29 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from aiopslab.service.dock import Docker +from aiopslab.service.apps.base import Application +from aiopslab.paths import FLOWER_METADATA + + +class Flower(Application): + def __init__(self): + super().__init__(FLOWER_METADATA) + self.docker = Docker() + + self.load_app_json() + + def deploy(self): + """Deploy the docker compose file.""" + print("Deploying docker compose files") + self.docker.compose_up(self.docker_deploy_path) + + def delete(self): + """Stop the docker containers.""" + print("Stopping the docker containers") + self.docker.compose_down(self.docker_deploy_path) + + def cleanup(self): + """Delete all stopped docker containers.""" + print("Deleting stopped containers") + self.docker.cleanup() diff --git a/aiopslab/service/apps/hotelres.py b/aiopslab/service/apps/hotelres.py index f5e8e355..6ac3e21d 100644 --- a/aiopslab/service/apps/hotelres.py +++ b/aiopslab/service/apps/hotelres.py @@ -75,7 +75,7 @@ def deploy_without_wait(self): print(f"Deploying Kubernetes configurations in namespace: {self.namespace}") self.kubectl.apply_configs(self.namespace, self.k8s_deploy_path) - print(f"Waiting for being stable...") + print(f"Waiting for stability...") time.sleep(30) def delete(self): diff --git a/aiopslab/service/apps/socialnet.py b/aiopslab/service/apps/socialnet.py index 98eb765c..a6570344 100644 --- a/aiopslab/service/apps/socialnet.py +++ b/aiopslab/service/apps/socialnet.py @@ -3,8 +3,6 @@ """Interface to the social network application from DeathStarBench""" -import platform - from aiopslab.service.helm import Helm from aiopslab.service.kubectl import KubeCtl from aiopslab.service.apps.base import Application @@ -22,8 +20,6 @@ def __init__(self): ) self.create_namespace() self.create_tls_secret() - # Detect CPU architecture, we need to deploy media-frontend differently on arm - self.is_arm = platform.machine().lower() in ["arm64", "aarch64"] def load_app_json(self): super().load_app_json() @@ -47,15 +43,22 @@ def create_tls_secret(self): print("TLS secret already exists. Skipping creation.") def deploy(self): - """Deploy the Helm configurations.""" - if self.is_arm: - # Update image to use arm build image + """Deploy the Helm configurations with architecture-aware image selection.""" + node_architectures = self.kubectl.get_node_architectures() + is_arm = any(arch in ["arm64", "aarch64"] for arch in node_architectures) + + if is_arm: + # Use the ARM-compatible image for media-frontend if "extra_args" not in self.helm_configs: self.helm_configs["extra_args"] = [] - - self.helm_configs["extra_args"].append("--set media-frontend.container.image=jacksonarthurclark/media-frontend") - self.helm_configs["extra_args"].append("--set media-frontend.container.imageVersion=latest") - + + self.helm_configs["extra_args"].append( + "--set media-frontend.container.image=jacksonarthurclark/media-frontend" + ) + self.helm_configs["extra_args"].append( + "--set media-frontend.container.imageVersion=latest" + ) + Helm.install(**self.helm_configs) Helm.assert_if_deployed(self.helm_configs["namespace"]) diff --git a/aiopslab/service/dock.py b/aiopslab/service/dock.py new file mode 100644 index 00000000..149ff1cc --- /dev/null +++ b/aiopslab/service/dock.py @@ -0,0 +1,57 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Interface to Docker controller service.""" + +import docker +import subprocess + + +class Docker: + def __init__(self): + self.client = docker.from_env() + + def list_containers(self): + """Get all containers.""" + return self.client.containers.list() + + def get_container(self, container_id): + """Get a container by ID.""" + return self.client.containers.get(container_id) + + def get_logs(self, container_id): + """Get logs for a container.""" + return self.get_container(container_id).logs().decode("utf-8") + + def compose_up(self, cwd): + """Run docker-compose up.""" + command = "docker compose up -d" + return self.exec_command(command, cwd=cwd) + + def compose_down(self, cwd): + """Run docker-compose down.""" + command = "docker compose down" + return self.exec_command(command, cwd=cwd) + + def cleanup(self): + """Remove the stopped docker containers.""" + command = "docker container prune -f" + return self.exec_command(command) + + def exec_command(self, command: str, input_data=None, cwd=None): + """Execute an arbitrary command.""" + if input_data is not None: + input_data = input_data.encode("utf-8") + try: + out = subprocess.run( + command, + input=input_data, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + cwd=cwd, + ) + if out is not None: + return out.stdout.decode("utf-8") + except subprocess.CalledProcessError as e: + return e.stderr.decode("utf-8") diff --git a/aiopslab/service/helm.py b/aiopslab/service/helm.py index c3efb2ca..2cc2c8f6 100644 --- a/aiopslab/service/helm.py +++ b/aiopslab/service/helm.py @@ -20,6 +20,7 @@ def install(**args): namespace (str): Namespace to install the chart version (str): Version of the chart extra_args (List[str)]: Extra arguments for the helm install command + remote_chart (bool): Whether the chart is remote (from a Helm repo) """ print("== Helm Install ==") release_name = args.get("release_name") @@ -27,16 +28,18 @@ def install(**args): namespace = args.get("namespace") version = args.get("version") extra_args = args.get("extra_args") - - # Install dependencies for chart before installation - dependency_command = f"helm dependency update {chart_path}" - dependency_process = subprocess.Popen( - dependency_command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - dependency_output, dependency_error = dependency_process.communicate() + remote_chart = args.get("remote_chart", False) + + if not remote_chart: + # Install dependencies for chart before installation + dependency_command = f"helm dependency update {chart_path}" + dependency_process = subprocess.Popen( + dependency_command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + dependency_output, dependency_error = dependency_process.communicate() command = f"helm install {release_name} {chart_path} -n {namespace} --create-namespace" diff --git a/aiopslab/service/kubectl.py b/aiopslab/service/kubectl.py index 93fcc829..d06f73b2 100644 --- a/aiopslab/service/kubectl.py +++ b/aiopslab/service/kubectl.py @@ -80,9 +80,7 @@ def wait_for_ready(self, namespace, sleep=2, max_wait=300): try: pod_list = self.list_pods(namespace) - if not pod_list.items: - console.log(f"[yellow]No pods found in namespace '{namespace}', waiting...") - else: + if pod_list.items: ready_pods = [ pod for pod in pod_list.items if pod.status.container_statuses and @@ -271,6 +269,17 @@ def exec_command(self, command: str, input_data=None): # else: # return out.stdout.decode("utf-8") + def get_node_architectures(self): + """Return a set of CPU architectures from all nodes in the cluster.""" + architectures = set() + try: + nodes = self.core_v1_api.list_node() + for node in nodes.items: + arch = node.status.node_info.architecture + architectures.add(arch) + except ApiException as e: + print(f"Exception when retrieving node architectures: {e}\n") + return architectures # Example usage: if __name__ == "__main__": diff --git a/aiopslab/service/metadata/astronomy-shop.json b/aiopslab/service/metadata/astronomy-shop.json index a7881a20..617ac969 100644 --- a/aiopslab/service/metadata/astronomy-shop.json +++ b/aiopslab/service/metadata/astronomy-shop.json @@ -15,7 +15,8 @@ ], "Helm Config": { "release_name": "astronomy-shop", - "chart_path": "opentelemetry-helm-charts/charts/opentelemetry-demo", - "namespace": "astronomy-shop" + "chart_path": "open-telemetry/opentelemetry-demo", + "namespace": "astronomy-shop", + "remote_chart": true } } \ No newline at end of file diff --git a/aiopslab/service/metadata/flower.json b/aiopslab/service/metadata/flower.json new file mode 100644 index 00000000..87653228 --- /dev/null +++ b/aiopslab/service/metadata/flower.json @@ -0,0 +1,10 @@ +{ + "Name": "Flower", + "Namespace": "docker", + "Desc": "A federated learning application to train models on edge devices and aggregate them on a central server. It consists of four main components, the serverapp which is the central server, the superlink which is the communication link for the serverapp, the clientapp which is the edge device application, and the supernode which is the communication link for the clientapp. The serverapp and superlink are used to aggregate models from the clientapp and supernode, while the clientapp and supernode are used to train models on edge devices. The current deployment consists of one server and two clients.", + "Supported Operations": [ + "Train models on edge devices", + "Aggregate models on a central server" + ], + "Docker Deploy Path": "flower" +} \ No newline at end of file diff --git a/aiopslab/service/shell.py b/aiopslab/service/shell.py index 2b894caa..af482bac 100644 --- a/aiopslab/service/shell.py +++ b/aiopslab/service/shell.py @@ -20,17 +20,16 @@ class Shell: def exec(command: str, input_data=None, cwd=None): """Execute a shell command on localhost, via SSH, or inside kind's control-plane container.""" k8s_host = config.get("k8s_host", "localhost") # Default to localhost - + if k8s_host == "kind": - print("[INFO] Running command inside kind-control-plane Docker container.") return Shell.docker_exec("kind-control-plane", command) elif k8s_host == "localhost": - print( - "[WARNING] Running commands on localhost is not recommended. " - "This may pose safety and security risks when using an AI agent locally. " - "I hope you know what you're doing!!!" - ) + # print( + # "[WARNING] Running commands on localhost is not recommended. " + # "This may pose safety and security risks when using an AI agent locally. " + # "I hope you know what you're doing!!!" + # ) return Shell.local_exec(command, input_data, cwd) else: @@ -51,14 +50,15 @@ def local_exec(command: str, input_data=None, cwd=None): stderr=subprocess.PIPE, shell=True, cwd=cwd, + timeout=10, # need to account for this properly ) - if out.stderr or out.returncode != 0: + if out.returncode != 0: error_message = out.stderr.decode("utf-8") print(f"[ERROR] Command execution failed: {error_message}") return error_message else: - output_message = out.stdout.decode("utf-8") + output_message = out.stdout.decode("utf-8") + out.stderr.decode("utf-8") print(output_message) return output_message @@ -79,7 +79,6 @@ def ssh_exec(host: str, user: str, ssh_key_path: str, command: str): if exit_status != 0: error_message = stderr.read().decode("utf-8") - print(f"[ERROR] SSH Command execution failed: {error_message}") return error_message else: output_message = stdout.read().decode("utf-8") @@ -95,7 +94,9 @@ def ssh_exec(host: str, user: str, ssh_key_path: str, command: str): @staticmethod def docker_exec(container_name: str, command: str): """Execute a command inside a running Docker container.""" - docker_command = f"docker exec {container_name} sh -c '{command}'" + escaped_command = command.replace('"', '\\"') + + docker_command = f'docker exec {container_name} sh -c "{escaped_command}"' try: out = subprocess.run( diff --git a/aiopslab/service/telemetry/prometheus.py b/aiopslab/service/telemetry/prometheus.py index 6bced8e9..721e659a 100644 --- a/aiopslab/service/telemetry/prometheus.py +++ b/aiopslab/service/telemetry/prometheus.py @@ -70,13 +70,13 @@ def deploy(self): print("Prometheus is already running. Skipping redeployment.") return - self._delete_pv() + self._delete_pvc() Helm.uninstall(**self.helm_configs) if self.pvc_config_file: - pv_name = self._get_pv_name_from_file(self.pvc_config_file) - if not self._pv_exists(pv_name): - self._apply_pv() + pvc_name = self._get_pvc_name_from_file(self.pvc_config_file) + if not self._pvc_exists(pvc_name): + self._apply_pvc() Helm.install(**self.helm_configs) Helm.assert_if_deployed(self.namespace) @@ -86,36 +86,36 @@ def teardown(self): Helm.uninstall(**self.helm_configs) if self.pvc_config_file: - self._delete_pv() + self._delete_pvc() - def _apply_pv(self): - """Apply the PersistentVolume configuration.""" - print(f"Applying PersistentVolume from {self.pvc_config_file}") + def _apply_pvc(self): + """Apply the PersistentVolumeClaim configuration.""" + print(f"Applying PersistentVolumeClaim from {self.pvc_config_file}") KubeCtl().exec_command( f"kubectl apply -f {self.pvc_config_file} -n {self.namespace}" ) - def _delete_pv(self): + def _delete_pvc(self): """Delete the PersistentVolume and associated PersistentVolumeClaim.""" - pv_name = self._get_pv_name_from_file(self.pvc_config_file) - result = KubeCtl().exec_command(f"kubectl get pv {pv_name} --ignore-not-found") + pvc_name = self._get_pvc_name_from_file(self.pvc_config_file) + result = KubeCtl().exec_command(f"kubectl get pvc {pvc_name} --ignore-not-found") if result: - print(f"Deleting PersistentVolume {pv_name}") - KubeCtl().exec_command(f"kubectl delete pv {pv_name}") - print(f"Successfully deleted PersistentVolume from {pv_name}") + print(f"Deleting PersistentVolumeClaim {pvc_name}") + KubeCtl().exec_command(f"kubectl delete pvc {pvc_name}") + print(f"Successfully deleted PersistentVolumeClaim from {pvc_name}") else: - print(f"PersistentVolume {pv_name} not found. Skipping deletion.") + print(f"PersistentVolumeClaim {pvc_name} not found. Skipping deletion.") - def _get_pv_name_from_file(self, pv_config_file): - """Extract PV name from the configuration file.""" + def _get_pvc_name_from_file(self, pv_config_file): + """Extract PVC name from the configuration file.""" with open(pv_config_file, "r") as file: pv_config = yaml.safe_load(file) return pv_config["metadata"]["name"] - def _pv_exists(self, pv_name: str) -> bool: - """Check if the PersistentVolume exists.""" - command = f"kubectl get pv {pv_name}" + def _pvc_exists(self, pvc_name: str) -> bool: + """Check if the PersistentVolumeClaim exists.""" + command = f"kubectl get pvc {pvc_name}" try: result = KubeCtl().exec_command(command) if "No resources found" in result or "Error" in result: diff --git a/aiopslab/session.py b/aiopslab/session.py index e1862ddc..dcdc09e4 100644 --- a/aiopslab/session.py +++ b/aiopslab/session.py @@ -6,6 +6,7 @@ import time import uuid import json +import wandb from pydantic import BaseModel from aiopslab.paths import RESULTS_DIR @@ -118,6 +119,10 @@ def to_json(self): with open(RESULTS_DIR / f"{self.session_id}_{self.start_time}.json", "w") as f: json.dump(self.to_dict(), f, indent=4) + + def to_wandb(self): + """Log the session to Weights & Biases.""" + wandb.log(self.to_dict()) def from_json(self, filename: str): """Load a session from a JSON file.""" diff --git a/aiopslab/utils/critical_section.py b/aiopslab/utils/critical_section.py new file mode 100644 index 00000000..9b84ea49 --- /dev/null +++ b/aiopslab/utils/critical_section.py @@ -0,0 +1,28 @@ +import signal +import threading + + +class CriticalSection: + def __enter__(self): + # Only install in the main thread, avoid `ValueError` in worker threads + self.signaled = False + if threading.current_thread() is threading.main_thread(): + # Save the original signal handler + self.original_handler = signal.signal(signal.SIGINT, self.signal_handler) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if hasattr(self, "original_handler"): + # Restore the original signal handler + signal.signal(signal.SIGINT, self.original_handler) + + # If SIGINT was raised during the critical section, handle it after the block + if self.signaled: + raise KeyboardInterrupt # Re-raise KeyboardInterrupt to exit + + return False # Do not suppress exceptions + + def signal_handler(self, signum, frame): + """Handle SIGINT by just setting a flag to delay it.""" + self.signaled = True # Flag that SIGINT occurred + print("\nCtrl+C detected! But deferring the effect for consistency...") diff --git a/assessment.py b/assessment.py new file mode 100644 index 00000000..a39635ac --- /dev/null +++ b/assessment.py @@ -0,0 +1,139 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +"""AIOpsLab CLI client.""" + +import asyncio +import json +from prompt_toolkit import PromptSession +from prompt_toolkit.styles import Style +from prompt_toolkit.patch_stdout import patch_stdout +from rich.console import Console +from rich.markdown import Markdown +from rich.panel import Panel +from prompt_toolkit.completion import WordCompleter + +from aiopslab.onboarding_evaluator import Evaluator + + +WELCOME = """ +# AIOpsLab Onboarding Assessment +""" + +TASK_MESSAGE = """\n\n\n\n +There's a problem in the kubernetes cluster in the test-hotel-reservation namespace. + +The issue is that there are unmet PersistentVolumeClaims (PVCs) because of unbound persistent volumes. + +You need to fix the issue and get all the pods into a ready state. + +You have access to a shell, take whatever action you deem necessary to resolve the issue. + +Once you believe the incident is resolved, run the `submit` command. If your solution is incorrect, it will tell you. + +You can use any resources you want to complete the assessment except for another person. However, please run all shell commands inside of the interface. + +Your results will be saved in a file called yourFistName_results.json, please email it to jclark58@illinois.edu + +If you encounter a bug, send it to jclark58@illinois.edu +""" + + +class HumanAgent: + def __init__(self, orchestrator): + self.session = PromptSession() + self.console = Console(force_terminal=True, color_system="auto") + self.orchestrator = orchestrator + self.pids = self.orchestrator.probs.get_problem_ids() + self.completer = WordCompleter(self.pids, ignore_case=True, match_middle=True) + + def display_welcome_message(self): + self.console.print(Markdown(WELCOME), justify="center") + self.console.print() + + def display_context(self, problem_desc, apis): + self.shell_api = self._filter_dict(apis, lambda k, _: "exec_shell" in k) + self.submit_api = self._filter_dict(apis, lambda k, _: "submit" in k) + self.telemetry_apis = self._filter_dict( + apis, lambda k, _: "exec_shell" not in k and "submit" not in k + ) + + stringify_apis = lambda apis: "\n\n".join( + [f"{k}\n{v}" for k, v in apis.items()] + ) + + self.task_message = TASK_MESSAGE.format( + prob_desc=problem_desc, + telemetry_apis=stringify_apis(self.telemetry_apis), + shell_api=stringify_apis(self.shell_api), + submit_api=stringify_apis(self.submit_api), + ) + + self.console.print(Markdown(self.task_message)) + + def display_env_message(self, env_input): + self.console.print(Panel(env_input, title="Environment", style="white on blue")) + self.console.print() + + async def set_problem(self): + self.init_problem("redeploy_without_PV-mitigation-1") + + async def get_action(self, env_input): + user_input = await self.get_user_input() + template = "Action:```\n{}\n```" + return template.format(user_input) + + def init_problem(self, problem_id="misconfig-mitigation-1"): + problem_desc, _, apis = self.orchestrator.init_problem(problem_id) + self.display_context(problem_desc, apis) + + async def get_user_input(self, completer=None): + loop = asyncio.get_running_loop() + style = Style.from_dict({"prompt": "ansigreen bold"}) + prompt_text = [("class:prompt", "shell> ")] + + with patch_stdout(): + try: + input = await loop.run_in_executor( + None, + lambda: self.session.prompt( + prompt_text, style=style, completer=completer + ), + ) + + if input.lower() == "exit": + raise SystemExit + + return input + except (SystemExit, KeyboardInterrupt, EOFError): + raise SystemExit from None + + def _filter_dict(self, dictionary, filter_func): + return {k: v for k, v in dictionary.items() if filter_func(k, v)} + + +async def main(): + orchestrator = Evaluator() + agent = HumanAgent(orchestrator) + orchestrator.register_agent(agent, name="human") + + first_name = input("What is your first name?: ") + + agent.display_welcome_message() + await agent.set_problem() + + results = await orchestrator.start_problem() + + session_data = orchestrator.session.to_dict() + + with open(f"{first_name}_results.json", "w") as f: + json.dump(session_data, f, indent=2) + + print(f"Results saved to {first_name}_results.json") + + return results + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/clients/README.md b/clients/README.md index 29082d2f..65cf52c5 100644 --- a/clients/README.md +++ b/clients/README.md @@ -5,22 +5,75 @@ These clients are some baselines that we have implemented and evaluated to help ## Clients -- [GPT](/clients/gpt.py): A naive GPT4-based LLM agent with only shell access. +- [GPT](/clients/gpt.py): A naive GPT series LLM agent with only shell access. +- [DeepSeek](/clients/deepseek.py): A naive DeepSeek series LLM agent with only shell access. +- [Qwen](/clients/qwen.py): A naive Qwen series LLM agent with only shell access. +- [vLLM](/clients/vllm.py): A naive vLLM agent with any open source LLM deployed locally and only shell access. - [ReAct](/clients/react.py): A naive LLM agent that uses the ReAct framework. - [FLASH](/clients/flash.py): A naive LLM agent that uses status supervision and hindsight integration components to ensure the high reliability of workflow execution. +### Using the vLLM Client + +The vLLM client allows you to run local open-source models as an agent for AIOpsLab tasks. This approach is particularly useful when you want to: +- Use your own hardware for inference +- Experiment with different open-source models +- Work in environments without internet access to cloud LLM providers + +### Quick Setup Guide + +1. **Launch the vLLM server**: + ```bash + # Make the script executable + chmod +x ./clients/launch_vllm.sh + + # Run the script + ./clients/launch_vllm.sh + ``` + This will launch vLLM in the background using the default model (Qwen/Qwen2.5-3B-Instruct). + +2. **Check server status**: + ```bash + # View the log file to confirm the server is running + cat vllm_Qwen_Qwen2.5-3B-Instruct.log + ``` + +3. **Customize the model** (optional): + Edit `launch_vllm.sh` to change the model: + ```bash + # Open the file + nano ./clients/launch_vllm.sh + + # Change the MODEL variable to your preferred model + # Example: MODEL="mistralai/Mistral-7B-Instruct-v0.1" + ``` + +4. **Run the vLLM agent**: + ``` + python clients/vllm.py + ``` + +### Requirements + +- Poetry for dependency management +- Sufficient GPU resources for your chosen model +- The model must support the OpenAI chat completion API format + +### Advanced Configuration + +The vLLM client connects to `http://localhost:8000/v1` by default. If you've configured vLLM to use a different port or host, update the base_url in `clients/utils/llm.py` in the vLLMClient class. +