Pretty process
Track delegated processes with rich progress meters.
Based on: https://www.deanmontgomery.com/2022/03/24/rich-progress-and-multiprocessing
Functions⚓︎
____private ⚓︎
____private(task_id, shared_progress, data)
Return True for testing a long running task.
Note: this function can’t be in the if-block below
Source code in corallium/pretty_process.py
def ____private(task_id: int, shared_progress: DictProxy, data: list[_ItemT]) -> Any: # type: ignore[type-arg]
"""Return True for testing a long running task.
Note: this function can't be in the if-block below
"""
for _val in data:
sleep(1)
shared_progress[task_id] += 1
return True
pretty_process ⚓︎
pretty_process(delegated_task, *, data, num_workers=3, num_cpus=4)
Run a task in parallel to process all provided data.
Uses rich to display pretty progress bars
| PARAMETER | DESCRIPTION |
|---|---|
delegated_task
|
must call
TYPE:
|
data
|
the list of data to distribute
TYPE:
|
num_workers
|
number of worker processes
TYPE:
|
num_cpus
|
number of CPUs
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
List of results |
Source code in corallium/pretty_process.py
def pretty_process(
delegated_task: _DelegatedTask, # type: ignore[type-arg]
*,
data: list[_ItemT],
num_workers: int = 3,
num_cpus: int = 4,
) -> Any:
"""Run a task in parallel to process all provided data.
Uses `rich` to display pretty progress bars
Args:
delegated_task: must call `shared_progress[task_id] += 1` on each item in data
data: the list of data to distribute
num_workers: number of worker processes
num_cpus: number of CPUs
Returns:
List of results
"""
# Docs: https://rich.readthedocs.io/en/latest/progress.html
columns: list[str | ProgressColumn] = [
'[progress.description]{task.description}',
BarColumn(),
'[progress.percentage]{task.percentage:>3.0f}%',
TimeRemainingColumn(),
TimeElapsedColumn(),
]
with Progress(*columns, refresh_per_second=1) as progress: # noqa: SIM117 (Py>3.9)
# Share state between process and workers
with multiprocessing.Manager() as manager:
shared_progress = manager.dict()
jobs = []
totals = {}
task_id_all = progress.add_task('[green]All jobs progress:')
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for ix, chunk in enumerate(_chunked(data, count=num_cpus)):
task_id = progress.add_task(f'task {ix}')
shared_progress[task_id] = 0
totals[task_id] = len(chunk)
jobs.append(executor.submit(delegated_task, task_id, shared_progress, chunk))
# Update progress bar from shared state
remaining = len(jobs)
while remaining:
n_done = 0
for task_id, latest in shared_progress.items():
n_done += latest
progress.update(task_id, completed=latest, total=totals[task_id])
progress.update(task_id_all, completed=n_done, total=len(data))
remaining = len(jobs) - sum(job.done() for job in jobs)
if remaining:
sleep(0.1) # 100ms refresh rate to avoid busy-waiting
# Collect results and catch any errors
return [job.result() for job in jobs]