-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdev_engine.py
More file actions
276 lines (250 loc) · 10.2 KB
/
dev_engine.py
File metadata and controls
276 lines (250 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import os
import json
import re
import subprocess
import sys
import tempfile
import uuid
import importlib.util
from datetime import datetime
DEV_SETTINGS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "dev_settings.json")
MCP_SERVERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "mcp_servers.json")
CUSTOM_PROFILES_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "custom_profiles.json")
PLUGINS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "plugins")
DEV_SETTINGS_DEFAULT = {
"3d_model": {
"enabled": True,
"animations": True,
"idle_cycle": True
},
"avatar": {
"type": "3d", # '3d', 'image', 'video'
"path": "/static/june_idle.png" # Used if type is image or video
},
"ui": {
"show_workbench": True,
"show_radial_menu": True,
"show_agent_lab": True,
"show_tts_button": True,
"show_mic_button": True,
"show_attach_button": True
},
"custom_profiles": [],
"radial_tools": {
"standard": [
{ "label": "Open Path", "icon": "📂", "action": "qp('Open ')" },
{ "label": "Run Command", "icon": "🖥", "action": "qp('Run command: ')" },
{ "label": "Generate Image", "icon": "🎨", "action": "qp('Draw ')" },
{ "label": "Run Code", "icon": "💻", "action": "qp('Write me a Python function that ')" },
{ "label": "Summarize Day", "icon": "📝", "action": "qp('Summarize my day for me.')" },
{ "label": "Get News", "icon": "📡", "action": "qp('Get news about ')" },
{ "label": "Organize Files", "icon": "🗂", "action": "qp('Organize files in: ')" },
{ "label": "Diagnose", "icon": "🩺", "action": "qp('Diagnose system performance')" },
{ "label": "Agent Lab", "icon": "⚗️", "action": "toggleAgentLab()" }
],
"network": [
{ "label": "Nmap Scan", "icon": "🔍", "action": "qp('Run a full nmap scan on ')" },
{ "label": "Hydra Brute", "icon": "🗝", "action": "qp('Bruteforce ssh on 127.0.0.1 using users.txt and pass.txt')" },
{ "label": "Nikto Web", "icon": "🕸", "action": "qp('Scan this website with nikto: ')" },
{ "label": "Wifite", "icon": "📶", "action": "qp('start wifite on wlan0')" },
{ "label": "Metasploit", "icon": "💣", "action": "qp('run metasploit on ')" },
{ "label": "Wireshark", "icon": "🦈", "action": "qp('capture packets on eth0 with wireshark')" },
{ "label": "BurpSuite", "icon": "🕷", "action": "qp('run burpsuite spider on ')" },
{ "label": "SQLMap", "icon": "💉", "action": "qp('run sqlmap on ')" },
{ "label": "Red Team", "icon": "🩸", "action": "qp('start red team mode on ')" },
{ "label": "Agent Lab", "icon": "⚗️", "action": "toggleAgentLab()" }
],
"ctf": [
{ "label": "Masscan", "icon": "🚀", "action": "qp('masscan -p1-65535 ')" },
{ "label": "Web Fuzz", "icon": "🧬", "action": "qp('gobuster dir -u ')" },
{ "label": "SQLi", "icon": "💉", "action": "qp('sqlmap -u ')" },
{ "label": "Hash Cracker", "icon": "🔐", "action": "qp('hashcat -m 0 ')" },
{ "label": "Stego Extractor", "icon": "🖼️", "action": "qp('steghide extract -sf ')" },
{ "label": "Forensics", "icon": "🔎", "action": "qp('binwalk -e ')" },
{ "label": "Binary Analyzer", "icon": "⚙️", "action": "qp('checksec --file=')" },
{ "label": "Reverse Shell", "icon": "📡", "action": "qp('nc -nlvp 4444')" },
{ "label": "Flag Hunter", "icon": "🏴☠️", "action": "qp('grep -r \"HTB{\" /')" },
{ "label": "Agent Lab", "icon": "⚗️", "action": "toggleAgentLab()" }
]
}
}
def _ensure_dirs():
os.makedirs(os.path.join(os.path.dirname(os.path.abspath(__file__)), "data"), exist_ok=True)
os.makedirs(PLUGINS_DIR, exist_ok=True)
def load_dev_settings():
_ensure_dirs()
if not os.path.exists(DEV_SETTINGS_FILE):
return dict(DEV_SETTINGS_DEFAULT)
try:
with open(DEV_SETTINGS_FILE, "r") as f:
saved = json.load(f)
# Deep merge to ensure all keys like radial_tools and avatar are present
merged = dict(DEV_SETTINGS_DEFAULT)
for k, v in saved.items():
if isinstance(v, dict) and k in merged:
merged[k].update(v)
else:
merged[k] = v
return merged
except:
return dict(DEV_SETTINGS_DEFAULT)
def save_dev_settings(settings):
_ensure_dirs()
# Merge on save as well
merged = load_dev_settings()
for k, v in settings.items():
if isinstance(v, dict) and k in merged:
merged[k].update(v)
else:
merged[k] = v
with open(DEV_SETTINGS_FILE, "w") as f:
json.dump(merged, f, indent=2)
return merged
def load_mcp_servers():
_ensure_dirs()
if not os.path.exists(MCP_SERVERS_FILE):
return get_default_mcp_servers()
try:
with open(MCP_SERVERS_FILE, "r") as f:
return json.load(f)
except:
return get_default_mcp_servers()
def save_mcp_servers(servers):
_ensure_dirs()
with open(MCP_SERVERS_FILE, "w") as f:
json.dump(servers, f, indent=2)
return servers
def get_default_mcp_servers():
return [
{
"id": "mcp_github",
"name": "GitHub",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"enabled": True,
"description": "GitHub API - repos, issues, PRs"
},
{
"id": "mcp_kali",
"name": "Kali Tools",
"command": "docker",
"args": ["run", "-i", "--rm", "june-kali-runner", "python3", "/opt/kali_mcp_server.py"],
"enabled": True,
"description": "Kali Linux network tools (nmap, gobuster, hydra)"
}
]
def load_custom_profiles():
_ensure_dirs()
if not os.path.exists(CUSTOM_PROFILES_FILE):
return []
try:
with open(CUSTOM_PROFILES_FILE, "r") as f:
return json.load(f)
except:
return []
def save_custom_profiles(profiles):
_ensure_dirs()
with open(CUSTOM_PROFILES_FILE, "w") as f:
json.dump(profiles, f, indent=2)
return profiles
def execute_user_script(code, context=None):
_ensure_dirs()
# We will run this code inside a temporary docker container
script_id = uuid.uuid4().hex[:8]
script_filename = f"script_{script_id}.py"
# Create a temporary directory that we will mount to docker
temp_dir = os.path.join(tempfile.gettempdir(), "june_dev_scripts")
os.makedirs(temp_dir, exist_ok=True)
script_path = os.path.join(temp_dir, script_filename)
with open(script_path, "w", encoding="utf-8") as f:
f.write(code)
try:
# We need to map the temp directory to a directory in docker.
# Ensure we pass absolute path formatted for Windows/WSL depending on environment.
# Since this runs inside Windows, Docker Desktop usually maps paths correctly,
# but if we run in WSL, it maps standard linux paths. Let's just use docker run --rm -v.
# Pull python:3.10-slim if not present, but for speed we assume it's available or we let docker handle it.
# We will set a timeout of 30 seconds for safety.
cmd = [
"docker", "run", "--rm",
"-v", f"{temp_dir}:/scripts",
"-w", "/scripts",
# memory limit for safety
"--memory=256m",
"--network=none", # No network access for extra security during script execution
"python:3.10-slim",
"python", script_filename
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
output = result.stdout
if result.stderr:
output += "\n" + result.stderr
success = result.returncode == 0
# Cleanup
try:
os.remove(script_path)
except:
pass
return {"success": success, "output": output.strip()}
except subprocess.TimeoutExpired:
# Cleanup
try:
os.remove(script_path)
except:
pass
return {"success": False, "output": "Execution timed out after 30 seconds."}
except Exception as e:
import traceback
# Cleanup
try:
os.remove(script_path)
except:
pass
return {"success": False, "error": str(e), "traceback": traceback.format_exc()}
def list_plugins():
_ensure_dirs()
if not os.path.exists(PLUGINS_DIR):
return []
plugins = []
for fname in os.listdir(PLUGINS_DIR):
if fname.endswith(".py"):
try:
fpath = os.path.join(PLUGINS_DIR, fname)
with open(fpath, "r") as f:
content = f.read()
meta = parse_plugin_metadata(content)
meta["filename"] = fname
meta["path"] = fpath
plugins.append(meta)
except:
pass
return plugins
def parse_plugin_metadata(content):
meta = {"name": "Unnamed", "version": "0.1", "description": "", "author": ""}
for line in content.split("\n"):
for key in meta:
if line.strip().startswith(f"#{key}:"):
meta[key] = line.strip().split(":", 1)[1].strip()
return meta
def register_plugin(code, filename=None):
_ensure_dirs()
if not filename:
filename = f"plugin_{uuid.uuid4().hex[:8]}.py"
if not filename.endswith(".py"):
filename += ".py"
fpath = os.path.join(PLUGINS_DIR, filename)
with open(fpath, "w") as f:
f.write(code)
return {"filename": filename, "path": fpath}
def delete_plugin(filename):
_ensure_dirs()
fpath = os.path.join(PLUGINS_DIR, filename)
if os.path.exists(fpath) and filename.endswith(".py"):
os.remove(fpath)
return True
return False