-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflowintel_erroneous_tests.py
More file actions
171 lines (140 loc) · 7.25 KB
/
flowintel_erroneous_tests.py
File metadata and controls
171 lines (140 loc) · 7.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Unit tests for CaseEndpoint class.
The cases tested in this file obtain an erroneous response
from the Flowintel REST API or do not behave as expected.
We compile them here to make it easy to track them and follow up
on updates to the Flowintel API that may fix these issues.
Note: The tests in this file are not meant to be run as part of the regular test suite,
but rather when fixing the issues with the API or when testing changes to the API that
may affect these cases. Therefore, the file's name does not follow the test file naming
convention on purpose, to avoid automatic execution.
"""
import unittest
from pyflowintel.api_wrapper import PyFlowintel
from pyflowintel.commons.utils import read_yaml
from pyflowintel.settings import DEFAULT_CONFIG_FILE
# Load configuration for integration tests from default configuration file 'config.yaml'
_test_config = {}
if DEFAULT_CONFIG_FILE.exists():
_test_config = read_yaml(DEFAULT_CONFIG_FILE).get("testing", {}) or {}
TEST_BASE_URL = _test_config.get("base_url")
TEST_API_KEY = _test_config.get("api_key")
TEST_TIMEOUT = _test_config.get("timeout", 5)
def skip_if_no_credentials(test_item):
"""Decorator to skip integration tests if credentials are not set."""
return unittest.skipIf(
not TEST_API_KEY or not TEST_BASE_URL,
"Skipping integration test: api_key or base_url for testing not set in the configuration file"
)(test_item)
@skip_if_no_credentials
class TestCaseEndpointScenarios(unittest.TestCase):
"""Integration tests for case endpoint with errors from Flowintel."""
@classmethod
def setUpClass(cls):
cls.client = PyFlowintel.from_args(TEST_BASE_URL, TEST_API_KEY, TEST_TIMEOUT)
@classmethod
def tearDownClass(cls):
try:
cls.client.close()
except Exception as e:
print(f"Cleanup failed (ignoring): {e}")
def setUp(self):
self.case_id = 0
self.case_payload = {
"title": "Integration tests case",
"description": "Created by automated integration tests",
}
def tearDown(self):
# Clean up the created case after each test if it exists
print(f"Tearing down test case with ID: {self.case_id}")
if self.case_id > 0:
try:
self.client.cases.delete(self.case_id)
except Exception as e:
print(f"Failed to delete case {self.case_id} during cleanup: {e}")
def test_create_case_with_deadline_date(self):
"""Create a case with a deadline date"""
self.case_payload["deadline_date"] = "2026-12-31"
created = self.client.cases.create(**self.case_payload)
self.case_id = created.get("case_id", 0)
self.assertGreater(self.case_id, 0)
# Verify that the case was created with the given deadline date
case_info = self.client.cases.search_by_id(self.case_id)
self.assertIsNotNone(case_info.get("deadline"))
self.assertIn(self.case_payload.get("deadline_date"), case_info.get("deadline"))
def test_create_case_with_deadline_time(self):
"""Create a case with a deadline time (without date)
Note: currently the time is ignored by the API and no error is returned
"""
self.case_payload["deadline_time"] = "11-30"
created = self.client.cases.create(**self.case_payload)
self.case_id = created.get("case_id", 0)
self.assertGreater(self.case_id, 0)
# Verify that the case was created with the given deadline time
case_info = self.client.cases.search_by_id(self.case_id)
self.assertIsNotNone(case_info.get("deadline")) # Note: currently the time is ignored by the API and no error is returned
def test_create_case_with_deadline(self):
"""Create a case with both deadline date and time
Note: the API returns a server error message
"""
self.case_payload["deadline_date"] = "2026-12-31"
self.case_payload["deadline_time"] = "11-30"
created = self.client.cases.create(**self.case_payload)
# validations
self.case_id = created.get("case_id", 0)
self.assertGreater(self.case_id, 0)
case_info = self.client.cases.search_by_id(self.case_id)
self.assertIsNotNone(case_info.get("deadline"))
def test_update_case_deadline(self):
"""Update the case deadline and verify that changes took place."""
created = self.client.cases.create(**self.case_payload)
self.case_id = created.get("case_id", 0)
new_fields = {
"deadline_date": "2026-05-15",
"deadline_time": "14-30"
}
result = self.client.cases.update(self.case_id, new_fields)
self.assertIn("message", result)
self.assertIn("edited", result.get("message"))
# Verify that the updates took effect
updated = self.client.cases.search_by_id(self.case_id)
self._check_fields_equality(new_fields, updated)
def test_update_case_tags(self):
"""Update the case tags and verify that changes took place."""
created = self.client.cases.create(**self.case_payload)
self.case_id = created.get("case_id", 0)
new_fields = {
"tags": ["priority-level:low"]
}
result = self.client.cases.update(self.case_id, new_fields)
self.assertIn("edited", result.get("message", ""))
# Verify that the updates took effect
updated = self.client.cases.search_by_id(self.case_id)
self._check_fields_equality(new_fields, updated)
def test_update_case_clusters(self):
"""Update the case clusters and verify that changes took place."""
created = self.client.cases.create(**self.case_payload)
self.case_id = created.get("case_id", 0)
new_fields = {
"clusters": [{'id': 9174, 'name': 'applovin', 'uuid': 'e212433e-6dac-40ab-8793-8dcfe4a1538f', 'version': 23, 'description': 'AppLovin is an advertisement library that is bundled with certain Android applications.', 'exclude': False, 'galaxy_id': 30, 'tag': 'misp-galaxy:android="applovin"', 'icon': 'android'}],
# "custom_tags": []
}
result = self.client.cases.update(self.case_id, new_fields)
self.assertIn("edited", result.get("message", ""))
# Verify that the updates took effect
updated = self.client.cases.search_by_id(self.case_id)
self._check_fields_equality(new_fields, updated)
def test_append_note_to_case(self):
"""Append a note to the case and verify success response."""
created = self.client.cases.create(**self.case_payload)
self.case_id = created.get("case_id", 0)
result = self.client.cases.append_note(self.case_id, "Note from integration test")
self.assertIsInstance(result, dict)
def _check_fields_equality(self, dict1, dict2):
"""Verify that all the key:value pairs in dict1 are present in dict2 with the same values.
"""
for key, value in dict1.items():
if key == "deadline_date" or key == "deadline_time":
# The API response collapses deadline_date and deadline_time into a single "deadline" field in the format "YYYY-MM-DD HH:MM"
self.assertIn(value, dict2.get("deadline", ""))
else:
self.assertEqual(dict2.get(key), value)