Skip to content

Commit 9de2fd9

Browse files
clin1234lysnikolaouferdnycmethane
authored
Add no-GIL interpreter support (#641)
Co-authored-by: Lysandros Nikolaou <lisandrosnik@gmail.com> Co-authored-by: Frank Dana <ferdnyc@gmail.com> Co-authored-by: Inada Naoki <songofacandy@gmail.com>
1 parent 284782d commit 9de2fd9

2 files changed

Lines changed: 71 additions & 0 deletions

File tree

.github/workflows/test.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ jobs:
1414
exclude:
1515
- os: windows-11-arm
1616
py: "3.10"
17+
1718
runs-on: ${{ matrix.os }}
1819
name: Run test with Python ${{ matrix.py }} on ${{ matrix.os }}
1920

@@ -33,22 +34,41 @@ jobs:
3334
run: |
3435
python -m pip install -r requirements.txt pytest
3536
37+
- name: Install pytest-run-parallel under free-threading
38+
if: contains(matrix.py, 't')
39+
run: |
40+
pip install pytest-run-parallel
41+
3642
- name: Build
3743
shell: bash
3844
run: |
3945
make cython
4046
pip install .
4147
4248
- name: Test (C extension)
49+
if: ${{ ! contains(matrix.py, 't') }}
4350
shell: bash
4451
run: |
4552
pytest -v test
4653
4754
- name: Test (pure Python fallback)
55+
if: ${{ ! contains(matrix.py, 't') }}
4856
shell: bash
4957
run: |
5058
MSGPACK_PUREPYTHON=1 pytest -v test
5159
60+
- name: Test (C extension) in parallel under free-threading
61+
if: contains(matrix.py, 't')
62+
shell: bash
63+
run: |
64+
pytest -v --parallel-threads=auto --iterations=20 test
65+
66+
- name: Test (pure Python fallback) in parallel under free-threading
67+
if: contains(matrix.py, 't')
68+
shell: bash
69+
run: |
70+
MSGPACK_PUREPYTHON=1 pytest -v --parallel-threads=auto --iterations=20 test
71+
5272
- name: build packages
5373
shell: bash
5474
run: |
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env python3
2+
import threading
3+
from concurrent.futures import ThreadPoolExecutor
4+
5+
from msgpack import Packer
6+
7+
8+
def run_threaded(
9+
func,
10+
num_threads=8,
11+
pass_count=False,
12+
pass_barrier=False,
13+
outer_iterations=1,
14+
prepare_args=None,
15+
):
16+
"""Runs a function many times in parallel"""
17+
for _ in range(outer_iterations):
18+
with ThreadPoolExecutor(max_workers=num_threads) as tpe:
19+
if prepare_args is None:
20+
args = []
21+
else:
22+
args = prepare_args()
23+
if pass_barrier:
24+
barrier = threading.Barrier(num_threads)
25+
args.append(barrier)
26+
if pass_count:
27+
all_args = [(func, i, *args) for i in range(num_threads)]
28+
else:
29+
all_args = [(func, *args) for i in range(num_threads)]
30+
try:
31+
futures = []
32+
for arg in all_args:
33+
futures.append(tpe.submit(*arg))
34+
finally:
35+
if len(futures) < num_threads and pass_barrier:
36+
barrier.abort()
37+
for f in futures:
38+
f.result()
39+
40+
41+
def test_multithread_packing():
42+
output = []
43+
test_data = "abcd" * 10_000_000
44+
packer = Packer()
45+
46+
def closure(b):
47+
data = packer.pack(test_data)
48+
output.append(data)
49+
b.wait()
50+
51+
run_threaded(closure, num_threads=10, pass_barrier=True, pass_count=False)

0 commit comments

Comments
 (0)