Skip to content

Pretty process

Track delegated processes with rich progress meters.

Based on: https://www.deanmontgomery.com/2022/03/24/rich-progress-and-multiprocessing

Functions⚓︎

____private ⚓︎

____private(task_id, shared_progress, data)

Return True for testing a long running task.

Note: this function can’t be in the if-block below

Source code in corallium/pretty_process.py
def ____private(task_id: int, shared_progress: DictProxy, data: list[_ItemT]) -> Any:  # type: ignore[type-arg]
    """Return True for testing a long running task.

    Note: this function can't be in the if-block below

    """
    for _val in data:
        sleep(1)
        shared_progress[task_id] += 1
    return True

pretty_process ⚓︎

pretty_process(delegated_task, *, data, num_workers=3, num_cpus=4)

Run a task in parallel to process all provided data.

Uses rich to display pretty progress bars

PARAMETER DESCRIPTION
delegated_task

must call shared_progress[task_id] += 1 on each item in data

TYPE: _DelegatedTask

data

the list of data to distribute

TYPE: list[_ItemT]

num_workers

number of worker processes

TYPE: int DEFAULT: 3

num_cpus

number of CPUs

TYPE: int DEFAULT: 4

RETURNS DESCRIPTION
Any

List of results

Source code in corallium/pretty_process.py
def pretty_process(
    delegated_task: _DelegatedTask,  # type: ignore[type-arg]
    *,
    data: list[_ItemT],
    num_workers: int = 3,
    num_cpus: int = 4,
) -> Any:
    """Run a task in parallel to process all provided data.

    Uses `rich` to display pretty progress bars

    Args:
        delegated_task: must call `shared_progress[task_id] += 1` on each item in data
        data: the list of data to distribute
        num_workers: number of worker processes
        num_cpus: number of CPUs

    Returns:
        List of results

    """
    # Docs: https://rich.readthedocs.io/en/latest/progress.html
    columns: list[str | ProgressColumn] = [
        '[progress.description]{task.description}',
        BarColumn(),
        '[progress.percentage]{task.percentage:>3.0f}%',
        TimeRemainingColumn(),
        TimeElapsedColumn(),
    ]
    with Progress(*columns, refresh_per_second=1) as progress:  # noqa: SIM117 (Py>3.9)
        # Share state between process and workers
        with multiprocessing.Manager() as manager:
            shared_progress = manager.dict()
            jobs = []
            totals = {}
            task_id_all = progress.add_task('[green]All jobs progress:')

            with ProcessPoolExecutor(max_workers=num_workers) as executor:
                for ix, chunk in enumerate(_chunked(data, count=num_cpus)):
                    task_id = progress.add_task(f'task {ix}')
                    shared_progress[task_id] = 0
                    totals[task_id] = len(chunk)
                    jobs.append(executor.submit(delegated_task, task_id, shared_progress, chunk))

                # Update progress bar from shared state
                remaining = len(jobs)
                while remaining:
                    n_done = 0
                    for task_id, latest in shared_progress.items():
                        n_done += latest
                        progress.update(task_id, completed=latest, total=totals[task_id])
                    progress.update(task_id_all, completed=n_done, total=len(data))
                    remaining = len(jobs) - sum(job.done() for job in jobs)
                    if remaining:
                        sleep(0.1)  # 100ms refresh rate to avoid busy-waiting

                # Collect results and catch any errors
                return [job.result() for job in jobs]