Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions src/HwCodecDetect/bitdepth_chroma_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from colorama import init, Fore, Style
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from .utils import check_codec_support
from .utils import check_codec_support, get_stty_cfg, set_stty_cfg

init(autoreset=True)

Expand Down Expand Up @@ -301,15 +301,19 @@ def _run_encoder_bitdepth_tests(test_dir, max_workers, verbose, unsupported_enco
for pix_fmt, bit_depth, chroma, desc in PIXEL_FORMATS:
tasks.append((codec, encoder, pix_fmt, bit_depth, chroma, test_dir, verbose, unsupported_encoders))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_encoder_bitdepth_test, task) for task in tasks]

for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder bit-depth tests"):
title, pix_fmt, bit_depth, chroma, status = future.result()
key = f"{bit_depth}-bit {chroma}"
if title not in results:
results[title] = {}
results[title][key] = status
stty_cfg = get_stty_cfg()
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_encoder_bitdepth_test, task) for task in tasks]

for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder bit-depth tests"):
title, pix_fmt, bit_depth, chroma, status = future.result()
key = f"{bit_depth}-bit {chroma}"
if title not in results:
results[title] = {}
results[title][key] = status
finally:
set_stty_cfg(stty_cfg)

return results

Expand Down Expand Up @@ -423,14 +427,19 @@ def _run_decoder_bitdepth_tests(test_dir, max_workers, verbose, unsupported_deco
for pix_fmt, bit_depth, chroma, desc in PIXEL_FORMATS:
tasks.append((codec, hw_decoder, pix_fmt, bit_depth, chroma, test_dir, verbose, unsupported_decoders))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_decoder_bitdepth_test, task) for task in tasks]
for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder bit-depth tests"):
title, pix_fmt, bit_depth, chroma, status = future.result()
key = f"{bit_depth}-bit {chroma}"
if title not in results:
results[title] = {}
results[title][key] = status
stty_cfg = get_stty_cfg()
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_decoder_bitdepth_test, task) for task in tasks]
for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder bit-depth tests"):
title, pix_fmt, bit_depth, chroma, status = future.result()
key = f"{bit_depth}-bit {chroma}"
if title not in results:
results[title] = {}
results[title][key] = status
finally:
set_stty_cfg(stty_cfg)

return results


Expand Down Expand Up @@ -464,15 +473,17 @@ def _print_bitdepth_chroma_table(results, table_type="Encoder"):
col_width = max(len(col) for col in format_columns)
row_header_width = max([_get_display_width(t) for t in titles] + [20, _get_display_width(table_type)])

print("\n" + "=" * (row_header_width + 3 + (col_width + 3) * len(format_columns)))
print("\n" + "=" * (row_header_width + 4 + (col_width + 3) * len(format_columns)))
header_text = f"Bit-depth/Chroma {table_type} Support"
padding_left = (row_header_width - _get_display_width(header_text)) // 2
padding_right = row_header_width - _get_display_width(header_text) - padding_left
header_row = f"| {' ' * padding_left}{header_text}{' ' * padding_right} |"
line_row = f"|-{'-' * row_header_width}-|"
for col in format_columns:
header_row += f" {col.center(col_width)} |"
line_row += f"-{'-' * col_width}-|"
print(header_row)
print("-" * (row_header_width + 3 + (col_width + 3) * len(format_columns)))
print(line_row)

for title in titles:
padding_needed = row_header_width - _get_display_width(title)
Expand All @@ -485,7 +496,7 @@ def _print_bitdepth_chroma_table(results, table_type="Encoder"):
padding_right = col_width - symbol_width - padding_left
row_string += f" {' ' * padding_left}{symbol}{' ' * padding_right} |"
print(row_string)
print("=" * (row_header_width + 3 + (col_width + 3) * len(format_columns)))
print("=" * (row_header_width + 4 + (col_width + 3) * len(format_columns)))


def run_bitdepth_chroma_tests(encoder_count, decoder_count, verbose):
Expand Down
48 changes: 30 additions & 18 deletions src/HwCodecDetect/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import defaultdict
from .install_ffmpeg_if_needed import install_ffmpeg_if_needed
from .bitdepth_chroma_detect import run_bitdepth_chroma_tests, print_bitdepth_chroma_results
from .utils import check_codec_support
from .utils import check_codec_support, get_stty_cfg, set_stty_cfg
from colorama import init, Fore, Style
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
Expand Down Expand Up @@ -390,12 +390,16 @@ def _run_encoder_tests(test_dir, max_workers, verbose, unsupported_encoders=None
for res_name, res_size in RESOLUTIONS.items():
tasks.append((codec, encoder, res_name, res_size, test_dir, verbose, unsupported_encoders))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_encoder_test_single, task) for task in tasks]

for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder tests"):
title, res_name, status = future.result()
results[title][res_name] = status
stty_cfg = get_stty_cfg()
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_encoder_test_single, task) for task in tasks]

for future in tqdm(as_completed(futures), total=len(tasks), desc="Running encoder tests"):
title, res_name, status = future.result()
results[title][res_name] = status
finally:
set_stty_cfg(stty_cfg)

return results

Expand Down Expand Up @@ -517,12 +521,16 @@ def _run_decoder_tests(test_dir, max_workers, verbose, unsupported_decoders=None
for res_name, res_size in RESOLUTIONS.items():
tasks.append((codec, hw_decoder, res_name, res_size, test_dir, verbose, unsupported_decoders))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_decoder_test_single, task) for task in tasks]
stty_cfg = get_stty_cfg()
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(_run_decoder_test_single, task) for task in tasks]

for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder tests"):
title, res_name, status = future.result()
results[title][res_name] = status
for future in tqdm(as_completed(futures), total=len(tasks), desc="Running decoder tests"):
title, res_name, status = future.result()
results[title][res_name] = status
finally:
set_stty_cfg(stty_cfg)

return results

Expand All @@ -548,15 +556,17 @@ def _print_summary_table(results):
row_header_width = max([_get_display_width(t) for t in results.keys()] + [20, _get_display_width("Decoder"), _get_display_width("Encoder")])

if decoder_titles:
print("\n" + "-" * (row_header_width + 3 + (res_width + 3) * len(resolutions)))
print("\n" + "=" * (row_header_width + 4 + (res_width + 3) * len(resolutions)))
header_text = "Decoder"
padding_left = (row_header_width - _get_display_width(header_text)) // 2
padding_right = row_header_width - _get_display_width(header_text) - padding_left
header_row = f"| {' ' * padding_left}{header_text}{' ' * padding_right} |"
line_row = f"|-{'-' * row_header_width}-|"
for res in resolutions:
header_row += f" {res.center(res_width)} |"
line_row += f"-{'-' * res_width}-|"
print(header_row)
print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions)))
print(line_row)

for title in decoder_titles:
padding_needed = row_header_width - _get_display_width(title)
Expand All @@ -569,18 +579,20 @@ def _print_summary_table(results):
padding_right = res_width - symbol_width - padding_left
row_string += f" {' ' * padding_left}{symbol}{' ' * padding_right} |"
print(row_string)
print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions)))
print("=" * (row_header_width + 4 + (res_width + 3) * len(resolutions)))

if encoder_titles:
print("\n" + "-" * (row_header_width + 3 + (res_width + 3) * len(resolutions)))
print("\n" + "=" * (row_header_width + 4 + (res_width + 3) * len(resolutions)))
header_text = "Encoder"
padding_left = (row_header_width - _get_display_width(header_text)) // 2
padding_right = row_header_width - _get_display_width(header_text) - padding_left
header_row = f"| {' ' * padding_left}{header_text}{' ' * padding_right} |"
line_row = f"|-{'-' * row_header_width}-|"
for res in resolutions:
header_row += f" {res.center(res_width)} |"
line_row += f"-{'-' * res_width}-|"
print(header_row)
print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions)))
print(line_row)

for title in encoder_titles:
padding_needed = row_header_width - _get_display_width(title)
Expand All @@ -593,7 +605,7 @@ def _print_summary_table(results):
padding_right = res_width - symbol_width - padding_left
row_string += f" {' ' * padding_left}{symbol}{' ' * padding_right} |"
print(row_string)
print("-" * (row_header_width + 3 + (res_width + 3) * len(resolutions)))
print("=" * (row_header_width + 4 + (res_width + 3) * len(resolutions)))


def run_all_tests(args):
Expand Down
15 changes: 14 additions & 1 deletion src/HwCodecDetect/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,17 @@ def check_codec_support(encoders_dict, decoders_dict):
unsupported_decoders.add(decoder)
print(f"{Fore.YELLOW}Warning: Decoder '{decoder}' is not supported by current FFmpeg version{Style.RESET_ALL}")

return unsupported_encoders, unsupported_decoders
return unsupported_encoders, unsupported_decoders

def get_stty_cfg():
try:
return subprocess.check_output(["stty", "--save"], text=True).strip()
except:
return None

def set_stty_cfg(cfg):
if not cfg: return
try:
subprocess.run(["stty", cfg])
except:
pass