Skip to content

Commit bcb63cf

Browse files
committed
refactor: drop mp.Queue() for error handling
1 parent e34d233 commit bcb63cf

1 file changed

Lines changed: 9 additions & 13 deletions

File tree

taskqueue/taskqueue.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -524,11 +524,10 @@ def multiprocess_upload(QueueClass, queue_name, tasks, parallel=True, total=None
524524

525525
def capturing_soloprocess_upload(*args, **kwargs):
526526
try:
527-
return soloprocess_upload(*args, **kwargs)
527+
return (soloprocess_upload(*args, **kwargs), None)
528528
except Exception as err:
529529
print(err)
530-
error_queue.put(err)
531-
return 0
530+
return (0, err)
532531

533532
uploadfn = partial(
534533
capturing_soloprocess_upload, QueueClass, queue_name
@@ -568,12 +567,15 @@ def capturing_soloprocess_upload(*args, **kwargs):
568567
# Don't fork, spawn entirely new processes. This
569568
# avoids accidental deadlocks.
570569
spawn_ctx = mp.get_context("spawn")
571-
error_queue = spawn_ctx.Manager().Queue()
572570

573571
ct = 0
572+
errors = []
574573
with tqdm(desc="Upload", total=total) as pbar:
575574
with pathos.pools.ProcessPool(parallel, context=spawn_ctx) as pool:
576-
for num_inserted in pool.imap(uploadfn, sip(tasks, block_size)):
575+
for num_inserted, err in pool.imap(uploadfn, sip(tasks, block_size)):
576+
if err is not None:
577+
errors.append(err)
578+
continue
577579
pbar.update(num_inserted)
578580
ct += num_inserted
579581

@@ -583,14 +585,8 @@ def capturing_soloprocess_upload(*args, **kwargs):
583585
os.environ["no_proxy"] = no_proxy
584586
# task.__class__.__module__ = cls_module
585587

586-
if not error_queue.empty():
587-
errors = []
588-
while not error_queue.empty():
589-
err = error_queue.get()
590-
if err is not StopIteration:
591-
errors.append(err)
592-
if len(errors):
593-
raise Exception(errors)
588+
if errors:
589+
raise Exception(errors)
594590

595591
return ct
596592

0 commit comments

Comments
 (0)