diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 627c9c4..82e84e5 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -47,7 +47,9 @@ jobs: with: submodules: true - name: Install OS dependencies - run: sudo add-apt-repository universe + run: | + sudo add-apt-repository universe + sudo apt-get install qemu-system-arm - uses: actions/setup-python@v5 with: python-version: '3.10' @@ -71,6 +73,12 @@ jobs: run: make dist ARCH=armv7m V=1 - name: Build module armv7emsp run: make dist ARCH=armv7emsp V=1 + - name: Build module armv7emdp + run: make dist ARCH=armv7emdp V=1 + - name: Build QEMU MicroPython image + run: make qemu_build V=1 + - name: Run tests under QEMU + run: make check_qemu V=1 - name: Archive dist artifacts uses: actions/upload-artifact@v4 with: diff --git a/Makefile b/Makefile index f3d4599..637bc43 100644 --- a/Makefile +++ b/Makefile @@ -80,15 +80,27 @@ $(MODULES_PATH)/emlearn_cnn_int8.mpy: $(MODULES_PATH)/emlearn_cnn_fp32.mpy # Generate list of .mpy files MODULE_MPYS = $(addprefix $(MODULES_PATH)/,$(addsuffix .mpy,$(MODULES))) -# Build dynamic native module (without forced clean) -$(MODULES_PATH)/%.mpy: +# Build dynamic native module +$(MODULES_PATH)/%.mpy: $(ARCH_SENTINEL) $(MAKE) -C $(or $($(*)_SRC),src/$*) \ ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) CFLAGS_EXTRA=$(CFLAGS_EXTRA) \ V=1 $($(*)_CONFIG) dist +# A file to track ARCH changes etc, to nuke old artifact +ARCH_SENTINEL := .sentinel_arch_$(ARCH) +$(ARCH_SENTINEL): + @echo "ARCH changed or not set — cleaning old build artifacts" + @rm -f .sentinel_arch_* + find src -name ".mpy_ld_cache" -type d -exec rm -rf {} + ; \ + find src -name "build" -type d -exec rm -rf {} + ; \ + find src -name "*.mpy" -delete ; \ + find src -name "*.o" -delete + @mkdir -p $(MODULES_PATH) + @touch $@ + # CNN modules need clean build due to shared build directory # They must also build sequentially (fp32 first, then int8) -$(MODULES_PATH)/emlearn_cnn_fp32.mpy: +$(MODULES_PATH)/emlearn_cnn_fp32.mpy: $(ARCH_SENTINEL) $(MAKE) -C src/tinymaix_cnn \ ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) CFLAGS_EXTRA=$(CFLAGS_EXTRA) \ V=1 CONFIG=fp32 clean @@ -96,7 +108,7 @@ $(MODULES_PATH)/emlearn_cnn_fp32.mpy: ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) CFLAGS_EXTRA=$(CFLAGS_EXTRA) \ V=1 CONFIG=fp32 dist -$(MODULES_PATH)/emlearn_cnn_int8.mpy: $(MODULES_PATH)/emlearn_cnn_fp32.mpy +$(MODULES_PATH)/emlearn_cnn_int8.mpy: $(MODULES_PATH)/emlearn_cnn_fp32.mpy $(ARCH_SENTINEL) $(MAKE) -C src/tinymaix_cnn \ ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) CFLAGS_EXTRA=$(CFLAGS_EXTRA) \ V=1 CONFIG=int8 clean @@ -159,10 +171,7 @@ codesize: python3 tools/code_size.py $(MPY_DIR_ABS)/ports/unix/build-standard clean: - make -C src/emlearn_trees/ ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) V=1 clean - make -C src/emlearn_neighbors/ ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) V=1 clean - make -C src/emlearn_iir/ ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) V=1 clean - make -C src/emlearn_logreg/ ARCH=$(ARCH) MPY_DIR=$(MPY_DIR_ABS) V=1 clean + git clean -dfx src/ rm -rf ./dist RELEASE_NAME = emlearn-micropython-$(VERSION) @@ -177,4 +186,24 @@ check: check_unix_natmod dist: $(MODULE_MPYS) +# ============================================================================= +# QEMU MicroPython targets +# ============================================================================= + +QEMU_PORT_DIR = $(MPY_DIR)/ports/qemu +QEMU_BOARD ?= MPS2_AN500 +QEMU_ARCH ?= armv7emdp +QEMU_FIRMWARE = $(QEMU_PORT_DIR)/build-$(QEMU_BOARD)/firmware.elf + +# Build firmware for QEMU +.PHONY: qemu_build +qemu_build: + $(MAKE) -C $(QEMU_PORT_DIR) BOARD=$(QEMU_BOARD) MICROPY_HEAP_SIZE=1024000 + +# Run tests/test_all.py on QEMU using mpremote mount +# Usage: make check_qemu QEMU_BOARD=MPS2_AN500 QEMU_ARCH=armv7emdp +.PHONY: check_qemu +check_qemu: $(QEMU_FIRMWARE) + python3 $(abspath tools/run_qemu_tests.py) --board $(QEMU_BOARD) --arch $(QEMU_ARCH) --abi-version $(MPY_ABI_VERSION) --mount $(abspath .) + diff --git a/spectrofood_download.py b/spectrofood_download.py deleted file mode 100644 index 9b69a5a..0000000 --- a/spectrofood_download.py +++ /dev/null @@ -1,133 +0,0 @@ -import os -import pandas as pd -import numpy as np -import urllib.request -from io import StringIO - -import pandas as pd -import numpy as np -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import StandardScaler -from sklearn.cross_decomposition import PLSRegression -from sklearn.metrics import mean_squared_error, r2_score - - -DATA_URL = "https://zenodo.org/records/8362947/files/SpectroFood_dataset.csv?download=1" - -def download_dataset(data_dir): - os.makedirs(data_dir, exist_ok=True) - csv_file = os.path.join(data_dir, "SpectroFood_dataset.csv") - if not os.path.exists(csv_file): - print("Downloading SpectroFood CSV...") - urllib.request.urlretrieve(DATA_URL, csv_file) - return csv_file - - -def load_spectrofood_chunks(csv_file, target_col="dry_matter", food_col="food"): - """ - Splits CSV into chunks using empty lines (newlines) as separators. - Each chunk is loaded with pandas.read_csv separately. - Returns list of tuples: (food_name, DataFrame) - """ - chunks = [] - with open(csv_file, 'r') as f: - content = f.read() - - # Split into raw text blocks on empty lines - raw_chunks = [c.strip() for c in content.split("\n\n") if c.strip()] - # FIXME: only returns 1 chunk right now - print(len(raw_chunks)) - - for chunk_text in raw_chunks: - # Use StringIO to read the chunk as CSV - chunk_io = StringIO(chunk_text) - try: - df_chunk = pd.read_csv(chunk_io, dtype=str, keep_default_na=False) - except pd.errors.EmptyDataError: - continue # skip empty chunks - - # Determine food name: use the first column of the first row - if food_col in df_chunk.columns: - food_name = df_chunk[food_col].iloc[0].strip().replace(" ", "_") - else: - food_name = str(df_chunk.iloc[0, 0]).strip().replace(" ", "_") - - # Convert numeric columns to float, ignore errors - df_chunk = df_chunk.apply(pd.to_numeric, errors='coerce') - chunks.append((food_name, df_chunk)) - - return chunks - -def preprocess_chunk(df_chunk, target_col="DRY MATTER"): - """ - Converts DataFrame to C-contiguous X and y numpy arrays - """ - - #print(df_chunk.columns) - - # Keep only rows where the target column is numeric - df_chunk = df_chunk[pd.to_numeric(df_chunk[target_col], errors='coerce').notna()].copy() - - # Drop columns that are entirely NaN - df_chunk = df_chunk.dropna(axis=1, how='all') - - # Drop rows that are entirely NaN - df_chunk = df_chunk.dropna(axis=0, how='any') - - exclude_cols = [c for c in df_chunk.columns if c == target_col or df_chunk[c].dtype == object] - X = df_chunk.drop(columns=exclude_cols).values.astype(np.float32) - y = df_chunk[target_col].values.astype(np.float32).reshape(-1, 1) - - # Standardize - scaler_X = StandardScaler() - #scaler_y = StandardScaler() - X = scaler_X.fit_transform(X) - #y = scaler_y.fit_transform(y) - - X = np.ascontiguousarray(X) - y = np.ascontiguousarray(y) - return X, y - -def save_all_chunks(chunks, data_dir): - """ - Saves all chunks as numpy files - """ - for food_name, df in chunks: - X, y = preprocess_chunk(df) - dataset_dir = data_dir+f'_{food_name}' - os.makedirs(dataset_dir, exist_ok=True) - np.save(os.path.join(dataset_dir, f"X.npy"), X) - np.save(os.path.join(dataset_dir, f"y.npy"), y) - print(f"Saved chunk for {food_name}: {dataset_dir}") - -def train_pls_for_chunks(chunks, n_components=10): - """ - Trains a scikit-learn PLSRegression model for each chunk - and prints MSE and R2 - """ - for food_name, df in chunks: - X, y = preprocess_chunk(df) - # Split 80/20 - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0) - - # Train PLS - pls = PLSRegression(n_components=n_components) - pls.fit(X_train, y_train) - - # Predict and inverse scale - y_pred = pls.predict(X_test) - - mse = mean_squared_error(y_test, y_pred) - r2 = r2_score(y_test, y_pred) - print(f"{food_name}: PLSRegression n_components={n_components} | MSE={np.sqrt(mse):.4f} | R2={r2:.4f}") - -def main(data_dir="spectrofood_data"): - csv_file = download_dataset(data_dir) - chunks = load_spectrofood_chunks(csv_file) - print(f"Found {len(chunks)} chunks (food types)") - save_all_chunks(chunks, data_dir) - train_pls_for_chunks(chunks, n_components=5) - -if __name__ == "__main__": - main(data_dir="my_spectrofood_data") - diff --git a/tests/npyfile.py b/tests/npyfile.py index 9bb0660..d246040 100644 --- a/tests/npyfile.py +++ b/tests/npyfile.py @@ -271,23 +271,21 @@ def write_values(self, arr, typecode=None): #print('write', self.written_bytes) -def load(filelike) -> tuple[tuple, array.array]: +def load(filelike, chunk_size=64) -> tuple[tuple, array.array]: """ Load array from .npy file - Convenience function for doing it in one shot. For streaming, use npyfile.Reader instead """ - - chunks = [] with Reader(filelike) as reader: - # Just read everything in one chunk total_items = compute_items(reader.shape) - for c in reader.read_data_chunks(total_items): - chunks.append(c) + data = array.array(reader.typecode, (0 for _ in range(total_items))) + idx = 0 + for chunk in reader.read_data_chunks(chunk_size): + data[idx:idx + len(chunk)] = chunk + idx += len(chunk) + return reader.shape, data - assert len(chunks) == 1 - return reader.shape, chunks[0] def save(filelike, arr : array.array, shape=None, typecode=None): """ diff --git a/tests/test_all.py b/tests/test_all.py index 147a872..d6b257a 100644 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -1,5 +1,6 @@ import sys +import gc # Find the module path (architecture+version specific) sys_mpy = sys.implementation._mpy @@ -63,6 +64,16 @@ def main(): passed = 0 failed = 0 + # Test markers for external test runners (mpremote, etc.) + # These allow the runner to know when tests are complete without relying on timeouts + print('\n=== TEST START ===') + + free = gc.mem_free() + alloc = gc.mem_alloc() + print(f"RAM free={free} used={alloc} total={free+alloc}") + + print('sys.path', sys.path) + for module_name in modules: mod = None print(f'{module_name}:') @@ -75,6 +86,9 @@ def main(): failed += 1 continue + # Try to free space + gc.collect() + module_attributes = dir(mod) tests = [ o for o in module_attributes if o.startswith('test_') ] for test_name in tests: @@ -92,8 +106,12 @@ def main(): print(f'\t PASS') passed += 1 + # Try to free space + gc.collect() + print(f'Passed: {passed}') print(f'Failed: {failed}') + print('\n=== TEST END ===') # Let status code reflect number of failures return failed diff --git a/tests/test_extratrees_cancer.py b/tests/test_extratrees_cancer.py index ef43784..f425357 100644 --- a/tests/test_extratrees_cancer.py +++ b/tests/test_extratrees_cancer.py @@ -27,14 +27,11 @@ def load_npy_labels_int16(filename): return array.array('h', labels) def test_real_dataset(): - print("=== REAL DATASET TEST ===") - X_train_flat = load_npy_features_int16(DATA_FILES['X_train']) y_train = load_npy_labels_int16(DATA_FILES['y_train']) X_test_flat = load_npy_features_int16(DATA_FILES['X_test']) y_test = load_npy_labels_int16(DATA_FILES['y_test']) - n_features = 30 n_train = len(y_train) n_test = len(y_test) diff --git a/tests/test_extratrees_wine.py b/tests/test_extratrees_wine.py index b4c13db..d36213e 100644 --- a/tests/test_extratrees_wine.py +++ b/tests/test_extratrees_wine.py @@ -19,28 +19,22 @@ def load_npy_features_int16(filename): """Load .npy file and convert to int16 array (scaled from float32)""" shape, data = npyfile.load(filename) # Scale float32 data to int16 range (multiply by 1000 and convert) - scaled = [int(v * 1000) for v in data] - return array.array('h', scaled) + return array.array('h', (int(v * 1000) for v in data)) def load_npy_labels_int16(filename): """Load .npy file and convert to int16 array (labels, no scaling)""" shape, data = npyfile.load(filename) # Labels are already integers (0.0, 1.0, 2.0), just convert directly - labels = [int(v) for v in data] - return array.array('h', labels) + return array.array('h', (int(v) for v in data)) def test_wine(): print("=== WINE DATASET TEST (3-class) ===") # Load preprocessed data - try: - X_train_flat = load_npy_features_int16(DATA_FILES['X_train']) - y_train = load_npy_labels_int16(DATA_FILES['y_train']) - X_test_flat = load_npy_features_int16(DATA_FILES['X_test']) - y_test = load_npy_labels_int16(DATA_FILES['y_test']) - except: - print("Error: Run wine/prepare.py first") - return + X_train_flat = load_npy_features_int16(DATA_FILES['X_train']) + y_train = load_npy_labels_int16(DATA_FILES['y_train']) + X_test_flat = load_npy_features_int16(DATA_FILES['X_test']) + y_test = load_npy_labels_int16(DATA_FILES['y_test']) n_features = 13 # 13 wine features (alcohol, malic_acid, ash, etc.) n_train = len(y_train) @@ -54,12 +48,13 @@ def test_wine(): print(f"Classes: {n_classes} (wine cultivars 0, 1, 2)") print("Task: Classify wine cultivar") + max_samples = n_train # Create model model = emlearn_extratrees.new( n_features, n_classes, - n_trees=20, max_depth=12, min_samples_leaf=2, - n_thresholds=15, subsample_ratio=0.8, feature_subsample_ratio=1.0, - max_nodes=3000, max_samples=500, rng_seed=42 + n_trees=5, max_depth=5, min_samples_leaf=2, + n_thresholds=8, subsample_ratio=0.8, feature_subsample_ratio=1.0, + max_nodes=1000, max_samples=max_samples, rng_seed=42 ) train_start = time.ticks_ms() diff --git a/tests/test_linreg_california.py b/tests/test_linreg_california.py index eb1c33b..24d1e43 100644 --- a/tests/test_linreg_california.py +++ b/tests/test_linreg_california.py @@ -13,8 +13,6 @@ def load_npy(filename): def test_elasticnet_full(): """Test with full dataset.""" - print("\n=== Full Dataset Test ===") - # Load full datasets X_train_shape, X_train_data = load_npy(data_dir+'X_train.npy') y_train_shape, y_train_data = load_npy(data_dir+'y_train.npy') @@ -38,8 +36,8 @@ def test_elasticnet_full(): print("Training on full dataset...") stop_iter, stop_mse = emlearn_linreg.train(model, X_train_data, y_train_data, - max_iterations=1000, check_interval=50, - verbose=2, tolerance=0.0001, score_limit=0.60, + max_iterations=100, check_interval=10, + verbose=2, tolerance=0.0001, score_limit=0.55, ) train_duration = time.ticks_diff(time.ticks_ms(), train_start) print('Train time (ms)', train_duration, 'per iter', train_duration/stop_iter) @@ -52,22 +50,3 @@ def test_elasticnet_full(): assert test_mse <= 0.65, test_mse -def main(): - """Main test function.""" - print("ElasticNet MicroPython Module Test") - print("==================================") - - try: - # Compare regularization approaches - gc.collect() - - test_elasticnet_full() - print("\n=== All Tests Completed Successfully! ===") - - except Exception as e: - print(f"Error during testing: {e}") - import sys - sys.print_exception(e) - -if __name__ == "__main__": - main() diff --git a/tests/test_plsr_spectrofood.py b/tests/test_plsr_spectrofood.py index a37503d..c158067 100644 --- a/tests/test_plsr_spectrofood.py +++ b/tests/test_plsr_spectrofood.py @@ -4,6 +4,7 @@ import array import emlearn_plsr import npyfile +import gc DATA_DIR = 'examples/datasets/spectrofood/' @@ -26,6 +27,11 @@ def test_plsr_spectrofood(): """Test PLSR on SpectroFood dataset (regression with 421 spectral features).""" print("\n=== SpectroFood PLSR Test ===") + total_ram = gc.mem_alloc() + gc.mem_free() + if total_ram < 2000_000: + print("SKIP: insufficient RAM") + return + # Load data shape_X_train, X_train = npyfile.load(DATA_DIR + 'X_train.npy') shape_y_train, y_train = npyfile.load(DATA_DIR + 'y_train.npy') diff --git a/tools/run_qemu_tests.py b/tools/run_qemu_tests.py new file mode 100644 index 0000000..9f4532b --- /dev/null +++ b/tools/run_qemu_tests.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +""" +Helper script to run tests on QEMU MicroPython using mpremote. +Starts QEMU with a PTY, mounts local directory, runs test script, and cleans up. + +Usage: + python3 run_qemu_tests.py [options] + +Options: + --test TEST_SCRIPT Test script to run (default: tests/test_all.py) + --mount DIR Local directory to mount on device + --board BOARD QEMU board to use (default: MPS2_AN500) + --arch ARCH MicroPython architecture (default: armv7emdp) + --abi-version VERSION MicroPython ABI version (default: 6.3) + --modules DIR MicroPython modules directory + --timeout SECONDS Timeout for tests (default: 120) + --debug Enable debug output + +Examples: + python3 run_qemu_tests.py --board MPS2_AN500 --mount . +""" + +import argparse +import os +import re +import signal +import subprocess +import sys +import time +from pathlib import Path + + +def debug_print(msg): + """Print debug message and flush.""" + print(f"DEBUG: {msg}") + sys.stdout.flush() + + +def run_tests(pty_path, test_script, mount_path, modules_path, remote_modules): + """Run the test script on QEMU using mpremote mount. + + Reads output line by line until TEST END marker is found. + Returns 0 only if tests completed with failed == 0. + """ + test_script_path = Path(test_script) + + debug_print(f"test_script: {test_script}") + debug_print(f"mount_path: {mount_path}") + + debug_print(f"Mounting {mount_path} on device...") + debug_print(f"Running {test_script}...") + print(f"Mounting {mount_path} on device...") + print(f"Running {test_script_path}...") + sys.stdout.flush() + + cmd = [ + 'mpremote', 'connect', pty_path, + 'mount', str(mount_path), + 'run', str(test_script_path), + ] + + debug_print(f"Running command: {' '.join(cmd)}") + + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + output_lines = [] + test_complete = False + passed = None + failed = None + + while True: + line = proc.stdout.readline() + if not line and proc.poll() is not None: + debug_print(f"Process ended, poll={proc.poll()}") + break + + if line: + output_lines.append(line) + print(line, end='') + sys.stdout.flush() + + # Parse test results + if line.startswith('Passed: '): + passed = int(line.split(': ')[1].strip()) + elif line.startswith('Failed: '): + failed = int(line.split(': ')[1].strip()) + + if '=== TEST END ===' in line: + test_complete = True + stderr = proc.stderr.read() + if stderr: + print("STDERR:", stderr, file=sys.stderr) + proc.wait() + if passed is not None and failed is not None and failed == 0: + return 0 + else: + print(f"\nError: Tests failed (passed={passed}, failed={failed})", file=sys.stderr) + return 1 + + if proc.poll() is not None: + debug_print(f"Process poll check: {proc.poll()}") + break + + stderr = proc.stderr.read() + if stderr: + print("STDERR:", stderr, file=sys.stderr) + sys.stderr.flush() + + if not test_complete: + print("\nError: Tests did not complete (no TEST END marker)", file=sys.stderr) + print(f"Output so far: {len(output_lines)} lines", file=sys.stderr) + for line in output_lines[-20:]: + print(f" {line.rstrip()}", file=sys.stderr) + + return 1 + + +def main(): + parser = argparse.ArgumentParser( + description='Run tests on QEMU MicroPython via mpremote', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('--test', '-t', default='tests/test_all.py', + help='Test script to run (default: tests/test_all.py)') + parser.add_argument('--mount', '-m', + help='Local directory to mount (default: cwd)') + parser.add_argument('--board', '-b', default='MPS2_AN500', + help='QEMU board name (default: MPS2_AN500)') + parser.add_argument('--arch', '-a', default='armv7emdp', + help='MicroPython architecture (default: armv7emdp)') + parser.add_argument('--abi-version', '-v', default='6.3', + help='MicroPython ABI version (default: 6.3)') + parser.add_argument('--modules', + help='MicroPython modules directory (default: auto-detect from dist/)') + parser.add_argument('--timeout', type=int, default=120, + help='Timeout in seconds (default: 120)') + parser.add_argument('--debug', '-d', action='store_true', + help='Enable debug output') + + args = parser.parse_args() + + # Force unbuffered output + sys.stdout.reconfigure(line_buffering=True) + + script_dir = Path(__file__).parent.parent + mount_path = Path(args.mount) if args.mount else script_dir + + debug_print(f"script_dir: {script_dir}") + debug_print(f"mount_path: {mount_path}") + + # Find MicroPython modules directory + module_dir = f'{args.arch}_{args.abi_version}' + + if args.modules: + modules_path = str(Path(args.modules).absolute()) + else: + candidates = list((script_dir / 'dist').glob(module_dir)) + if candidates: + modules_path = str(candidates[0]) + print(f"Auto-detected modules: {modules_path}") + else: + modules_path = '' + print(f"Warning: Could not find {module_dir} in dist/", file=sys.stderr) + + debug_print(f"module_dir: {module_dir}") + debug_print(f"modules_path: {modules_path}") + + remote_modules = '' + if modules_path: + try: + modules_rel = Path(modules_path).relative_to(mount_path) + except ValueError: + modules_rel = Path(modules_path).name + remote_modules = f'/remote/{modules_rel}' + + debug_print(f"remote_modules: {remote_modules}") + + # Find QEMU port directory and firmware + qemu_port_dir = script_dir / 'dependencies' / 'micropython' / 'ports' / 'qemu' + if not qemu_port_dir.exists(): + print(f"Error: QEMU port directory not found: {qemu_port_dir}", file=sys.stderr) + return 1 + + firmware_path = qemu_port_dir / f'build-{args.board}' / 'firmware.elf' + if not firmware_path.exists(): + print(f"Error: Firmware not found: {firmware_path}", file=sys.stderr) + print(f"Run 'make qemu_build QEMU_BOARD={args.board}' first", file=sys.stderr) + return 1 + + qemu_machine = { + 'MPS2_AN500': 'mps2-an500', + 'MPS2_AN385': 'mps2-an385', + }.get(args.board, 'mps2-an500') + + qemu_cmd = [ + 'qemu-system-arm', + '-machine', qemu_machine, + '-nographic', + '-monitor', 'null', + '-semihosting', + '-serial', 'pty', + '-kernel', str(firmware_path) + ] + + print(f"Starting QEMU with firmware: {firmware_path}") + print(f"Board: {args.board}, Arch: {args.arch}, ABI: {args.abi_version}") + sys.stdout.flush() + + debug_print(f"QEMU command: {' '.join(qemu_cmd)}") + + qemu_proc = subprocess.Popen( + qemu_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + cwd=str(qemu_port_dir) + ) + + output_lines = [] + pty_path = None + start_time = time.time() + + print("Waiting for QEMU to start...") + sys.stdout.flush() + + while time.time() - start_time < 30: + line = qemu_proc.stdout.readline() + if line: + output_lines.append(line) + print(line, end='') + sys.stdout.flush() + match = re.search(r'char device redirected to (/dev/pts/\d+)', line) + if match: + pty_path = match.group(1) + break + else: + time.sleep(0.1) + + if not pty_path: + print("Error: Could not find PTY device from QEMU output", file=sys.stderr) + print("Output:", ''.join(output_lines), file=sys.stderr) + qemu_proc.terminate() + return 1 + + print(f"PTY device: {pty_path}") + sys.stdout.flush() + + print("Waiting for MicroPython to boot...") + sys.stdout.flush() + time.sleep(2) + + result = run_tests(pty_path, args.test, mount_path, modules_path, remote_modules) + + print("Stopping QEMU...") + sys.stdout.flush() + qemu_proc.terminate() + try: + qemu_proc.wait(timeout=5) + except subprocess.TimeoutExpired: + print("Warning: QEMU did not terminate gracefully, killing...", file=sys.stderr) + qemu_proc.kill() + qemu_proc.wait() + + return result + + +if __name__ == '__main__': + sys.exit(main())