Skip to content

Commit

Permalink
Merge pytest-xdist latest changes + correct lint errors in nTop fork (#6
Browse files Browse the repository at this point in the history
)

* [pre-commit.ci] pre-commit autoupdate (#1120)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.1 → v0.6.2](astral-sh/ruff-pre-commit@v0.6.1...v0.6.2)
- [github.com/pre-commit/mirrors-mypy: v1.11.1 → v1.11.2](pre-commit/mirrors-mypy@v1.11.1...v1.11.2)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* build(deps): bump pypa/gh-action-pypi-publish (#1123)

Bumps the github-actions group with 1 update: [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish).


Updates `pypa/gh-action-pypi-publish` from 1.9.0 to 1.10.0
- [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases)
- [Commits](pypa/gh-action-pypi-publish@v1.9.0...v1.10.0)

---
updated-dependencies:
- dependency-name: pypa/gh-action-pypi-publish
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: github-actions
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [pre-commit.ci] pre-commit autoupdate (#1124)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.2 → v0.6.3](astral-sh/ruff-pre-commit@v0.6.2...v0.6.3)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* Publish package with attestations (#1125)

Follow up to #1123.

* build(deps): bump the github-actions group with 2 updates (#1127)

Bumps the github-actions group with 2 updates: [hynek/build-and-inspect-python-package](https://github.com/hynek/build-and-inspect-python-package) and [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish).


Updates `hynek/build-and-inspect-python-package` from 2.8 to 2.9
- [Release notes](https://github.com/hynek/build-and-inspect-python-package/releases)
- [Changelog](https://github.com/hynek/build-and-inspect-python-package/blob/main/CHANGELOG.md)
- [Commits](hynek/build-and-inspect-python-package@v2.8...v2.9)

Updates `pypa/gh-action-pypi-publish` from 1.10.0 to 1.10.1
- [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases)
- [Commits](pypa/gh-action-pypi-publish@v1.10.0...v1.10.1)

---
updated-dependencies:
- dependency-name: hynek/build-and-inspect-python-package
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: github-actions
- dependency-name: pypa/gh-action-pypi-publish
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: github-actions
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [pre-commit.ci] pre-commit autoupdate (#1128)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.3 → v0.6.4](astral-sh/ruff-pre-commit@v0.6.3...v0.6.4)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* [pre-commit.ci] pre-commit autoupdate (#1129)

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.6.4 → v0.6.5](astral-sh/ruff-pre-commit@v0.6.4...v0.6.5)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* all ruff checks should pass

* mypy fixes

* undo protocol/other scheduler changes, add asserts for mypy check

* undo newline add/remove changes in diff for other schedulers

* remove unused function from dsession.py

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Bruno Oliveira <[email protected]>
  • Loading branch information
4 people authored Sep 20, 2024
1 parent f73e306 commit c82840f
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 41 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- uses: actions/checkout@v4

- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

deploy:
needs: package
Expand All @@ -39,7 +39,9 @@ jobs:
path: dist

- name: Publish package to PyPI
uses: pypa/[email protected]
uses: pypa/[email protected]
with:
attestations: true

- name: Push tag
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build and Check Package
uses: hynek/build-and-inspect-python-package@v2.8
uses: hynek/build-and-inspect-python-package@v2.9

test:

Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.6.1"
rev: "v0.6.5"
hooks:
- id: ruff
args: ["--fix"]
Expand All @@ -23,7 +23,7 @@ repos:
language: python
additional_dependencies: [pygments, restructuredtext_lint]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.1
rev: v1.11.2
hooks:
- id: mypy
files: ^(src/|testing/)
Expand Down
53 changes: 32 additions & 21 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@
from queue import Empty
from queue import Queue
import sys
import traceback
from typing import Any
from typing import Callable
from typing import Sequence
import warnings
import traceback

import execnet
import pytest

from xdist.remote import Producer
from xdist.remote import WorkerInfo
from xdist.scheduler import CustomGroup
from xdist.scheduler import EachScheduling
from xdist.scheduler import LoadFileScheduling
from xdist.scheduler import LoadGroupScheduling
from xdist.scheduler import LoadScheduling
from xdist.scheduler import LoadScopeScheduling
from xdist.scheduler import Scheduling
from xdist.scheduler import WorkStealingScheduling
from xdist.scheduler import CustomGroup
from xdist.workermanage import NodeManager
from xdist.workermanage import WorkerController

Expand Down Expand Up @@ -60,14 +61,14 @@ def __init__(self, config: pytest.Config) -> None:
self._failed_collection_errors: dict[object, bool] = {}
self._active_nodes: set[WorkerController] = set()
self._failed_nodes_count = 0
self.saved_put = None
self.saved_put: Callable[[tuple[str, dict[str, Any]]], None]
self.remake_nodes = False
self.ready_to_run_tests = False
self._max_worker_restart = get_default_max_worker_restart(self.config)
# summary message to print at the end of the session
self._summary_report: str | None = None
self.terminal = config.pluginmanager.getplugin("terminalreporter")
self.worker_status: dict[WorkerController, str] = {}
self.worker_status: dict[str, str] = {}
if self.terminal:
self.trdist = TerminalDistReporter(config)
config.pluginmanager.register(self.trdist, "terminaldistreporter")
Expand Down Expand Up @@ -180,68 +181,75 @@ def loop_once(self) -> None:
self.triggershutdown()


def is_node_finishing(self, node: WorkerController):
def is_node_finishing(self, node: WorkerController) -> bool:
"""Check if a test worker is considered to be finishing.
Evaluate whether it's on its last test, or if no tests are pending.
"""
assert self.sched is not None
assert type(self.sched) is CustomGroup
pending = self.sched.node2pending.get(node)
return pending is not None and len(pending) < 2


def is_node_clear(self, node: WorkerController):
"""Check if a test worker has no pending tests."""
pending = self.sched.node2pending.get(node)
return pending is None or len(pending) == 0


def are_all_nodes_finishing(self):
def are_all_nodes_finishing(self) -> bool:
"""Check if all workers are finishing (See 'is_node_finishing' above)."""
assert self.sched is not None
return all(self.is_node_finishing(node) for node in self.sched.nodes)


def are_all_nodes_done(self):
def are_all_nodes_done(self) -> bool:
"""Check if all nodes have reported to finish."""
return all(s == "finished" for s in self.worker_status.values())


def are_all_active_nodes_collected(self):
def are_all_active_nodes_collected(self) -> bool:
"""Check if all nodes have reported collection to be complete."""
if not all(n.gateway.id in self.worker_status for n in self._active_nodes):
return False
return all(self.worker_status[n.gateway.id] == "collected" for n in self._active_nodes)


def reset_nodes_if_needed(self):
def reset_nodes_if_needed(self) -> None:
assert self.sched is not None
assert type(self.sched) is CustomGroup
if self.are_all_nodes_finishing() and self.ready_to_run_tests and not self.sched.do_resched:
self.reset_nodes()


def reset_nodes(self):
def reset_nodes(self) -> None:
"""Issue shutdown notices to workers for rescheduling purposes."""
assert self.sched is not None
assert type(self.sched) is CustomGroup
if len(self.sched.pending) != 0:
self.remake_nodes = True
for node in self.sched.nodes:
if self.is_node_finishing(node):
node.shutdown()


def reschedule(self):
def reschedule(self) -> None:
"""Reschedule tests."""
assert self.sched is not None
assert type(self.sched) is CustomGroup
self.sched.do_resched = False
self.sched.check_schedule(self.sched.nodes[0], 1.0, True)


def prepare_for_reschedule(self):
def prepare_for_reschedule(self) -> None:
"""Update test workers and their status tracking so rescheduling is ready."""
assert type(self.sched) is CustomGroup
assert self.sched is not None
self.remake_nodes = False
num_workers = self.sched.dist_groups[self.sched.pending_groups[0]]['group_workers']
self.trdist._status = {}
assert self.nodemanager is not None
new_nodes = self.nodemanager.setup_nodes(self.saved_put, num_workers)
self.worker_status = {}
self._active_nodes = set()
self._active_nodes.update(new_nodes)
self.sched.node2pending = {}
assert type(self.sched) is CustomGroup
self.sched.do_resched = True

#
Expand Down Expand Up @@ -287,7 +295,9 @@ def worker_workerfinished(self, node: WorkerController) -> None:
try:
self.prepare_for_reschedule()
except Exception as e:
self.shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n{''.join(traceback.format_exception(e))}"
msg = ("Exception caught during preparation for rescheduling. Giving up."
f"\n{''.join(traceback.format_exception(e))}")
self.shouldstop = msg
return
self.config.hook.pytest_testnodedown(node=node, error=None)
if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt
Expand All @@ -308,10 +318,11 @@ def worker_workerfinished(self, node: WorkerController) -> None:
assert not crashitem, (crashitem, node)
self._active_nodes.remove(node)

def update_worker_status(self, node, status):
def update_worker_status(self, node: WorkerController, status: str) -> None:
"""Track the worker status.
Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker.
Can be used at callbacks like 'worker_workerfinished' so we remember wchic event
was reported last by each worker.
"""
self.worker_status[node.workerinfo["id"]] = status

Expand Down
2 changes: 1 addition & 1 deletion src/xdist/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from xdist.scheduler.customgroup import CustomGroup as CustomGroup
from xdist.scheduler.each import EachScheduling as EachScheduling
from xdist.scheduler.load import LoadScheduling as LoadScheduling
from xdist.scheduler.customgroup import CustomGroup as CustomGroup
from xdist.scheduler.loadfile import LoadFileScheduling as LoadFileScheduling
from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling
from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling
Expand Down
28 changes: 17 additions & 11 deletions src/xdist/scheduler/customgroup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

from itertools import cycle
from typing import Sequence, Any
from typing import Any
from typing import Sequence

import pytest

Expand All @@ -10,6 +11,7 @@
from xdist.workermanage import parse_spec_config
from xdist.workermanage import WorkerController


class CustomGroup:
"""Implement grouped load scheduling across a variable number of nodes.
Expand Down Expand Up @@ -203,7 +205,7 @@ def remove_pending_tests_from_node(
) -> None:
raise NotImplementedError()

def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession=False) -> None:
def check_schedule(self, node: WorkerController, duration: float = 0, from_dsession: bool = False) -> None:
"""Maybe schedule new items on the node.
If there are any globally pending nodes left then this will
Expand All @@ -226,7 +228,7 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]}
schedule_log: dict[str, Any] = {n.gateway.id:[] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
n = next(nodes)
#needs cleaner way to be identified
Expand All @@ -235,13 +237,16 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess

self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}"
message = (f"\n[-] [csg] check_schedule: processed scheduling for {dist_group_key}:"
f" {' '.join([f'{nid} ({len(nt)})' for nid,nt in schedule_log.items()])}")
self.report_line(message)

else:
pending = self.node2pending.get(node)
pending = self.node2pending.get(node, [])
if len(pending) < 2:
self.report_line(f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending")
self.report_line(
f"[-] [csg] Shutting down {node.workerinput['workerid']} because only one case is pending"
)
node.shutdown()

self.log("num items waiting for node:", len(self.pending))
Expand Down Expand Up @@ -301,7 +306,7 @@ def schedule(self) -> None:
if not self.collection:
return

dist_groups = {}
dist_groups: dict[str, dict[Any, Any]] = {}

if self.is_first_time:
for i, test in enumerate(self.collection):
Expand Down Expand Up @@ -338,15 +343,16 @@ def schedule(self) -> None:
dist_group_key = self.pending_groups.pop(0)
dist_group = self.dist_groups[dist_group_key]
nodes = cycle(self.nodes[0:dist_group['group_workers']])
schedule_log = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]}
schedule_log: dict[str, Any] = {n.gateway.id: [] for n in self.nodes[0:dist_group['group_workers']]}
for _ in range(len(dist_group['test_indices'])):
n = next(nodes)
# needs cleaner way to be identified
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:1]
schedule_log[n.gateway.id].extend(tests_per_node)
self._send_tests_group(n, 1, dist_group_key)
del self.dist_groups[dist_group_key]
message = f"\n[-] [csg] schedule: processed scheduling for {dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}"
message = ("\n[-] [csg] schedule: processed scheduling for "
f"{dist_group_key}: {' '.join([f'{nid} ({len(nt)})' for nid, nt in schedule_log.items()])}")
self.report_line(message)

def _send_tests(self, node: WorkerController, num: int) -> None:
Expand All @@ -356,7 +362,7 @@ def _send_tests(self, node: WorkerController, num: int) -> None:
self.node2pending[node].extend(tests_per_node)
node.send_runtest_some(tests_per_node)

def _send_tests_group(self, node: WorkerController, num: int, dist_group_key) -> None:
def _send_tests_group(self, node: WorkerController, num: int, dist_group_key: str) -> None:
tests_per_node = self.dist_groups[dist_group_key]['pending_indices'][:num]
if tests_per_node:
del self.dist_groups[dist_group_key]['pending_indices'][:num]
Expand Down Expand Up @@ -396,4 +402,4 @@ def _check_nodes_have_same_collection(self) -> bool:

def report_line(self, line: str) -> None:
if self.terminal and self.config.option.verbose >= 0:
self.terminal.write_line(line)
self.terminal.write_line(line)
3 changes: 1 addition & 2 deletions src/xdist/workermanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from typing import Literal
from typing import Sequence
from typing import Union
from typing import Optional
import uuid
import warnings

Expand Down Expand Up @@ -83,7 +82,7 @@ def rsync_roots(self, gateway: execnet.Gateway) -> None:
def setup_nodes(
self,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
max_nodes: Optional[int] = None
max_nodes: int | None = None
) -> list[WorkerController]:
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.trace("setting up nodes")
Expand Down
3 changes: 2 additions & 1 deletion xdist-testing-ntop/test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import time

import pytest


@pytest.mark.xdist_custom(name="low_4")
def test_1():
Expand Down

0 comments on commit c82840f

Please sign in to comment.