Source code for fabulous.processpool

"""A custom ProcessPoolExecutor that uses dill for serialization."""

import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.reduction import ForkingPickler
from typing import Any

import dill


def _init_worker() -> None:
    """Initialize worker process to use dill for pickling."""
    # Override ForkingPickler with dill
    ForkingPickler.dumps = dill.dumps
    ForkingPickler.loads = dill.loads


[docs] class DillProcessPoolExecutor(ProcessPoolExecutor): """ProcessPoolExecutor that uses dill for serialization. This executor patches both the main process and worker processes to use dill instead of pickle, allowing serialization of thread locks and other complex objects that standard pickle cannot handle. """ def __init__( self, max_workers: int | None = None, initargs: tuple[Any, ...] = (), max_tasks_per_child: int | None = None, ) -> None: ForkingPickler.dumps = dill.dumps ForkingPickler.loads = dill.loads super().__init__( max_workers=max_workers, mp_context=multiprocessing.get_context("spawn"), initializer=_init_worker, initargs=initargs, max_tasks_per_child=max_tasks_per_child, )