Skip to content

Make sleeping between scheduling dynamic. #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ repos:
rev: v2.0.0
hooks:
- id: setup-cfg-fmt
- repo: https://github.com./myint/docformatter
rev: v1.5.0-rc1
- repo: https://github.com./PYCQA/docformatter
rev: v1.5.0
hooks:
- id: docformatter
args: [--in-place, --wrap-summaries, "88", --wrap-descriptions, "88", --blank]
Expand Down Expand Up @@ -67,7 +67,7 @@ repos:
- id: interrogate
args: [-v, --fail-under=40, src, tests]
- repo: https://github.com./executablebooks/mdformat
rev: 0.7.14
rev: 0.7.15
hooks:
- id: mdformat
additional_dependencies: [
Expand All @@ -76,7 +76,7 @@ repos:
]
args: [--wrap, "88"]
- repo: https://github.com./codespell-project/codespell
rev: v2.1.0
rev: v2.2.1
hooks:
- id: codespell
- repo: https://github.com./pre-commit/mirrors-mypy
Expand Down
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ chronological order. Releases follow [semantic versioning](https://semver.org/)
releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
[Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel).

## 0.2.1 - 2022-08-xx
## 0.2.1 - 2022-08-19

- {pull}`43` adds docformatter.
- {pull}`44` allows to capture warnings from subprocesses. Fixes {issue}`41`.
- {pull}`45` replaces the delay command line option with an internal, dynamic parameter.
Fixes {issue}`41`.
- {pull}`46` adds a dynamic sleep duration during the execution. Fixes {issue}`42`.

## 0.2.0 - 2022-04-15

Expand Down
9 changes: 0 additions & 9 deletions src/pytask_parallel/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,5 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
),
default=None,
),
click.Option(
["--delay"],
help=(
"Delay between checking whether tasks have finished. [default: 0.1 "
"(seconds)]"
),
metavar="NUMBER > 0",
default=None,
),
]
cli.commands["build"].params.extend(additional_parameters)
16 changes: 0 additions & 16 deletions src/pytask_parallel/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,3 @@ def parallel_backend_callback(value: Any) -> str | None:
f"parallel_backend has to be one of {list(PARALLEL_BACKENDS)}."
)
return value


def delay_callback(value: Any) -> float | None:
"""Validate the delay option."""
if value in [None, "None", "none"]:
value = None
else:
try:
value = float(value)
except ValueError:
pass

if not (isinstance(value, float) and value > 0):
raise ValueError("delay has to be a number greater than 0.")

return value
9 changes: 1 addition & 8 deletions src/pytask_parallel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from pytask import hookimpl
from pytask_parallel.backends import PARALLEL_BACKENDS_DEFAULT
from pytask_parallel.callbacks import delay_callback
from pytask_parallel.callbacks import n_workers_callback
from pytask_parallel.callbacks import parallel_backend_callback

Expand All @@ -29,13 +28,7 @@ def pytask_parse_config(
if config["n_workers"] == "auto":
config["n_workers"] = max(os.cpu_count() - 1, 1)

config["delay"] = _get_first_non_none_value(
config_from_cli,
config_from_file,
key="delay",
default=0.1,
callback=delay_callback,
)
config["delay"] = 0.1

config["parallel_backend"] = _get_first_non_none_value(
config_from_cli,
Expand Down
32 changes: 31 additions & 1 deletion src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from types import TracebackType
from typing import Any
from typing import Callable
from typing import List

import attr
import cloudpickle
from pybaum.tree_util import tree_map
from pytask import console
Expand Down Expand Up @@ -65,6 +67,7 @@ def pytask_execute_build(session: Session) -> bool | None:
with parallel_backend(max_workers=session.config["n_workers"]) as executor:

session.executor = executor
sleeper = _Sleeper()

while session.scheduler.is_active():

Expand Down Expand Up @@ -96,6 +99,10 @@ def pytask_execute_build(session: Session) -> bool | None:
running_tasks[task_name] = session.hook.pytask_execute_task(
session=session, task=task
)
sleeper.reset()

if not ready_tasks:
sleeper.increment()

for task_name in list(running_tasks):
future = running_tasks[task_name]
Expand Down Expand Up @@ -146,7 +153,7 @@ def pytask_execute_build(session: Session) -> bool | None:
if session.should_stop:
break
else:
time.sleep(session.config["delay"])
sleeper.sleep()
except KeyboardInterrupt:
break

Expand Down Expand Up @@ -316,3 +323,26 @@ def _create_kwargs_for_task(task: Task) -> dict[Any, Any]:
kwargs[arg_name] = tree_map(lambda x: x.value, attribute)

return kwargs


@attr.s(kw_only=True)
class _Sleeper:
"""A sleeper that always sleeps a bit and up to 1 second if you don't wake it up.

This class controls when the next iteration of the execution loop starts. If new
tasks are scheduled, the time spent sleeping is reset to a lower value.

"""

timings = attr.ib(type=List[float], default=[(i / 10) ** 2 for i in range(1, 11)])
timing_idx = attr.ib(type=int, default=0)

def reset(self) -> None:
self.timing_idx = 0

def increment(self) -> None:
if self.timing_idx < len(self.timings) - 1:
self.timing_idx += 1

def sleep(self) -> None:
time.sleep(self.timings[self.timing_idx])
19 changes: 0 additions & 19 deletions tests/test_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest
from pytask_parallel.backends import PARALLEL_BACKENDS
from pytask_parallel.callbacks import delay_callback
from pytask_parallel.callbacks import n_workers_callback
from pytask_parallel.callbacks import parallel_backend_callback

Expand Down Expand Up @@ -45,21 +44,3 @@ def test_n_workers_callback(value, expectation):
def test_parallel_backend_callback(value, expectation):
with expectation:
parallel_backend_callback(value)


@pytest.mark.unit
@pytest.mark.parametrize(
"value, expectation",
[
(-1, pytest.raises(ValueError)),
(0.1, does_not_raise()),
(1, does_not_raise()),
("asdad", pytest.raises(ValueError)),
(None, does_not_raise()),
("None", does_not_raise()),
("none", does_not_raise()),
],
)
def test_delay_callback(value, expectation):
with expectation:
delay_callback(value)
27 changes: 0 additions & 27 deletions tests/test_cli.py

This file was deleted.

2 changes: 0 additions & 2 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ def test_interplay_between_debugging_and_parallel(tmp_path, pdb, n_workers, expe
("n_workers", "auto", ExitCode.OK),
("n_workers", 1, ExitCode.OK),
("n_workers", 2, ExitCode.OK),
("delay", 0.1, ExitCode.OK),
("delay", 1, ExitCode.OK),
("parallel_backend", "unknown_backend", ExitCode.CONFIGURATION_FAILED),
]
+ [
Expand Down
51 changes: 22 additions & 29 deletions tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pytask import main
from pytask import Task
from pytask_parallel.backends import PARALLEL_BACKENDS
from pytask_parallel.execute import _Sleeper
from pytask_parallel.execute import DefaultBackendNameSpace
from pytask_parallel.execute import ProcessesNameSpace

Expand Down Expand Up @@ -141,35 +142,6 @@ def myfunc():
assert exception is None


@pytest.mark.end_to_end
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
def test_parallel_execution_delay(tmp_path, parallel_backend):
source = """
import pytask

@pytask.mark.produces("out_1.txt")
def task_1(produces):
produces.write_text("1")

@pytask.mark.produces("out_2.txt")
def task_2(produces):
produces.write_text("2")
"""
tmp_path.joinpath("task_dummy.py").write_text(textwrap.dedent(source))

session = main(
{
"paths": tmp_path,
"delay": 3,
"n_workers": 2,
"parallel_backend": parallel_backend,
}
)

assert session.exit_code == ExitCode.OK
assert 3 < session.execution_end - session.execution_start < 10


@pytest.mark.end_to_end
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
def test_stop_execution_when_max_failures_is_reached(tmp_path, parallel_backend):
Expand Down Expand Up @@ -325,3 +297,24 @@ def task_example(produces):
warnings_block = result.output.split("Warnings")[1]
assert "task_example.py::task_example[0]" in warnings_block
assert "task_example.py::task_example[1]" in warnings_block


def test_sleeper():
sleeper = _Sleeper(timings=[1, 2, 3], timing_idx=0)

assert sleeper.timings == [1, 2, 3]
assert sleeper.timing_idx == 0

sleeper.increment()
assert sleeper.timing_idx == 1

sleeper.increment()
assert sleeper.timing_idx == 2

sleeper.reset()
assert sleeper.timing_idx == 0

start = time()
sleeper.sleep()
end = time()
assert 1 <= end - start <= 2