diff --git a/src/HwCodecDetect/bitdepth_chroma_detect.py b/src/HwCodecDetect/bitdepth_chroma_detect.py index 94b724e..50ed487 100644 --- a/src/HwCodecDetect/bitdepth_chroma_detect.py +++ b/src/HwCodecDetect/bitdepth_chroma_detect.py @@ -12,7 +12,7 @@ from colorama import init, Fore, Style from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm -from .utils import check_codec_support +from .utils import check_codec_support, get_stty_cfg, set_stty_cfg init(autoreset=True) @@ -301,15 +301,19 @@ def _run_encoder_bitdepth_tests(test_dir, max_workers, verbose, unsupported_enco for pix_fmt, bit_depth, chroma, desc in PIXEL_FORMATS: tasks.append((codec, encoder, pix_fmt, bit_depth, chroma, test_dir, verbose, unsupported_encoders)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(_run_encoder_bitdepth_test, task) for task in tasks] - - for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder bit-depth tests"): - title, pix_fmt, bit_depth, chroma, status = future.result() - key = f"{bit_depth}-bit {chroma}" - if title not in results: - results[title] = {} - results[title][key] = status + stty_cfg = get_stty_cfg() + try: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(_run_encoder_bitdepth_test, task) for task in tasks] + + for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder bit-depth tests"): + title, pix_fmt, bit_depth, chroma, status = future.result() + key = f"{bit_depth}-bit {chroma}" + if title not in results: + results[title] = {} + results[title][key] = status + finally: + set_stty_cfg(stty_cfg) return results @@ -423,14 +427,19 @@ def _run_decoder_bitdepth_tests(test_dir, max_workers, verbose, unsupported_deco for pix_fmt, bit_depth, chroma, desc in PIXEL_FORMATS: tasks.append((codec, hw_decoder, pix_fmt, bit_depth, chroma, test_dir, verbose, unsupported_decoders)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(_run_decoder_bitdepth_test, task) for task in tasks] - for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder bit-depth tests"): - title, pix_fmt, bit_depth, chroma, status = future.result() - key = f"{bit_depth}-bit {chroma}" - if title not in results: - results[title] = {} - results[title][key] = status + stty_cfg = get_stty_cfg() + try: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(_run_decoder_bitdepth_test, task) for task in tasks] + for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder bit-depth tests"): + title, pix_fmt, bit_depth, chroma, status = future.result() + key = f"{bit_depth}-bit {chroma}" + if title not in results: + results[title] = {} + results[title][key] = status + finally: + set_stty_cfg(stty_cfg) + return results @@ -464,15 +473,17 @@ def _print_bitdepth_chroma_table(results, table_type="Encoder"): col_width = max(len(col) for col in format_columns) row_header_width = max([_get_display_width(t) for t in titles] + [20, _get_display_width(table_type)]) - print("\n" + "=" * (row_header_width + 3 + (col_width + 3) * len(format_columns))) + print("\n" + "=" * (row_header_width + 4 + (col_width + 3) * len(format_columns))) header_text = f"Bit-depth/Chroma {table_type} Support" padding_left = (row_header_width - _get_display_width(header_text)) // 2 padding_right = row_header_width - _get_display_width(header_text) - padding_left header_row = f"| {' ' * padding_left}{header_text}{' ' * padding_right} |" + line_row = f"|-{'-' * row_header_width}-|" for col in format_columns: header_row += f" {col.center(col_width)} |" + line_row += f"-{'-' * col_width}-|" print(header_row) - print("-" * (row_header_width + 3 + (col_width + 3) * len(format_columns))) + print(line_row) for title in titles: padding_needed = row_header_width - _get_display_width(title) @@ -485,7 +496,7 @@ def _print_bitdepth_chroma_table(results, table_type="Encoder"): padding_right = col_width - symbol_width - padding_left row_string += f" {' ' * padding_left}{symbol}{' ' * padding_right} |" print(row_string) - print("=" * (row_header_width + 3 + (col_width + 3) * len(format_columns))) + print("=" * (row_header_width + 4 + (col_width + 3) * len(format_columns))) def run_bitdepth_chroma_tests(encoder_count, decoder_count, verbose): diff --git a/src/HwCodecDetect/run_tests.py b/src/HwCodecDetect/run_tests.py index 5b7eec9..40d3b65 100644 --- a/src/HwCodecDetect/run_tests.py +++ b/src/HwCodecDetect/run_tests.py @@ -10,7 +10,7 @@ from collections import defaultdict from .install_ffmpeg_if_needed import install_ffmpeg_if_needed from .bitdepth_chroma_detect import run_bitdepth_chroma_tests, print_bitdepth_chroma_results -from .utils import check_codec_support +from .utils import check_codec_support, get_stty_cfg, set_stty_cfg from colorama import init, Fore, Style from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm @@ -390,12 +390,16 @@ def _run_encoder_tests(test_dir, max_workers, verbose, unsupported_encoders=None for res_name, res_size in RESOLUTIONS.items(): tasks.append((codec, encoder, res_name, res_size, test_dir, verbose, unsupported_encoders)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(_run_encoder_test_single, task) for task in tasks] - - for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder tests"): - title, res_name, status = future.result() - results[title][res_name] = status + stty_cfg = get_stty_cfg() + try: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(_run_encoder_test_single, task) for task in tasks] + + for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder tests"): + title, res_name, status = future.result() + results[title][res_name] = status + finally: + set_stty_cfg(stty_cfg) return results @@ -517,12 +521,16 @@ def _run_decoder_tests(test_dir, max_workers, verbose, unsupported_decoders=None for res_name, res_size in RESOLUTIONS.items(): tasks.append((codec, hw_decoder, res_name, res_size, test_dir, verbose, unsupported_decoders)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [executor.submit(_run_decoder_test_single, task) for task in tasks] + stty_cfg = get_stty_cfg() + try: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(_run_decoder_test_single, task) for task in tasks] - for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder tests"): - title, res_name, status = future.result() - results[title][res_name] = status + for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder tests"): + title, res_name, status = future.result() + results[title][res_name] = status + finally: + set_stty_cfg(stty_cfg) return results @@ -548,15 +556,17 @@ def _print_summary_table(results): row_header_width = max([_get_display_width(t) for t in results.keys()] + [20, _get_display_width("Decoder"), _get_display_width("Encoder")]) if decoder_titles: - print("\n" + "-" * (row_header_width + 3 + (res_width + 3) * len(resolutions))) + print("\n" + "=" * (row_header_width + 4 + (res_width + 3) * len(resolutions))) header_text = "Decoder" padding_left = (row_header_width - _get_display_width(header_text)) // 2 padding_right = row_header_width - _get_display_width(header_text) - padding_left header_row = f"| {' ' * padding_left}{header_text}{' ' * padding_right} |" + line_row = f"|-{'-' * row_header_width}-|" for res in resolutions: header_row += f" {res.center(res_width)} |" + line_row += f"-{'-' * res_width}-|" print(header_row) - print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions))) + print(line_row) for title in decoder_titles: padding_needed = row_header_width - _get_display_width(title) @@ -569,18 +579,20 @@ def _print_summary_table(results): padding_right = res_width - symbol_width - padding_left row_string += f" {' ' * padding_left}{symbol}{' ' * padding_right} |" print(row_string) - print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions))) + print("=" * (row_header_width + 4 + (res_width + 3) * len(resolutions))) if encoder_titles: - print("\n" + "-" * (row_header_width + 3 + (res_width + 3) * len(resolutions))) + print("\n" + "=" * (row_header_width + 4 + (res_width + 3) * len(resolutions))) header_text = "Encoder" padding_left = (row_header_width - _get_display_width(header_text)) // 2 padding_right = row_header_width - _get_display_width(header_text) - padding_left header_row = f"| {' ' * padding_left}{header_text}{' ' * padding_right} |" + line_row = f"|-{'-' * row_header_width}-|" for res in resolutions: header_row += f" {res.center(res_width)} |" + line_row += f"-{'-' * res_width}-|" print(header_row) - print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions))) + print(line_row) for title in encoder_titles: padding_needed = row_header_width - _get_display_width(title) @@ -593,7 +605,7 @@ def _print_summary_table(results): padding_right = res_width - symbol_width - padding_left row_string += f" {' ' * padding_left}{symbol}{' ' * padding_right} |" print(row_string) - print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions))) + print("=" * (row_header_width + 4 + (res_width + 3) * len(resolutions))) def run_all_tests(args): diff --git a/src/HwCodecDetect/utils.py b/src/HwCodecDetect/utils.py index 9a8569a..bb686ca 100644 --- a/src/HwCodecDetect/utils.py +++ b/src/HwCodecDetect/utils.py @@ -169,4 +169,17 @@ def check_codec_support(encoders_dict, decoders_dict): unsupported_decoders.add(decoder) print(f"{Fore.YELLOW}Warning: Decoder '{decoder}' is not supported by current FFmpeg version{Style.RESET_ALL}") - return unsupported_encoders, unsupported_decoders \ No newline at end of file + return unsupported_encoders, unsupported_decoders + +def get_stty_cfg(): + try: + return subprocess.check_output(["stty", "--save"], text=True).strip() + except: + return None + +def set_stty_cfg(cfg): + if not cfg: return + try: + subprocess.run(["stty", cfg]) + except: + pass