Skip to content

Commit

Permalink
Merge branch 'main' into byzantium
Browse files Browse the repository at this point in the history
  • Loading branch information
claui committed Sep 4, 2024
2 parents 2cc1d9c + b8d440e commit 7f76fbc
Show file tree
Hide file tree
Showing 19 changed files with 312 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ ignore-comments=yes
ignore-docstrings=yes

# Ignore imports when computing similarities.
ignore-imports=no
ignore-imports=yes

# Minimum lines number of a similarity.
min-similarity-lines=4
Expand Down
9 changes: 9 additions & 0 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ One of the values `regular` or `sensitive`.

The default is `sensitive`, the safer setting of the two.

# Environment

Itchcraft supports the following environment variable:

`DEBUG`
: If set to a non-zero value, causes Itchcraft to enable debug-level
: logging. Also decreases some retry counters and prints stack traces
: for errors where it normally wouldn’t.

# Monitoring the bite healer’s state once activated

## Monitoring the state by observing the LED color (recommended)
Expand Down
17 changes: 12 additions & 5 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
itchcraft (0.3.0~bpo11+1) UNRELEASED; urgency=medium
itchcraft (0.4.1~bpo11+1) UNRELEASED; urgency=medium

* Work around missing Poetry package on Bullseye
* Work around missing `myst-parser` on Bullseye

-- Claudia Pellegrino <[email protected]> Wed, 24 Jul 2024 18:25:11 +0200
-- Claudia Pellegrino <[email protected]> Tue, 4 Sep 2024 20:16:08 +0200

itchcraft (0.3.0) UNRELEASED; urgency=medium
itchcraft (0.4.1) UNRELEASED; urgency=medium

* Initial release
* Detect and show connected bite healer models
* Add support for more models
* Make error handling more robust
* Bump Debian changelog, fix build deps
* Introduce debug mode
* Detach driver if interface busy
* Add typing stubs for pyusb
* Fix regression for drivers with no detach support

-- Claudia Pellegrino <[email protected]> Tue, 30 Jul 2024 16:00:12 +0200
-- Claudia Pellegrino <[email protected]> Tue, 4 Sep 2024 20:14:16 +0200
8 changes: 7 additions & 1 deletion itchcraft/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

from . import prefs
from .devices import find_bite_healers
from .errors import CliError, BiteHealerError
from .errors import (
BackendInitializationError,
BiteHealerError,
CliError,
)
from .format import format_table
from .logging import get_logger
from .prefs import (
Expand Down Expand Up @@ -66,5 +70,7 @@ def start(
)
try:
start_with_preferences(preferences)
except BackendInitializationError as e:
raise CliError(e) from e
except BiteHealerError as e:
raise CliError(e) from e
105 changes: 78 additions & 27 deletions itchcraft/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,24 @@
from abc import ABC, abstractmethod
import array
from collections.abc import Callable
from typing import Optional, Union, cast
from typing import Optional

import usb.core # type: ignore
import usb.core
import usb.util

from .errors import BackendInitializationError, EndpointNotFound
from .logging import get_logger
from .types import SizedPayload, usb as usb_types

logger = get_logger(__name__)


class BulkTransferDevice(ABC):
"""Abstract base class for USB devices with two bulk transfer
endpoints."""

@abstractmethod
def bulk_transfer(
self,
request: Union[list[int], bytes, bytearray],
) -> bytes:
def bulk_transfer(self, request: SizedPayload) -> bytes:
"""Sends a payload via USB bulk transfer and waits for a response
from the device.
Expand Down Expand Up @@ -48,45 +50,80 @@ class UsbBulkTransferDevice(BulkTransferDevice):
endpoint_in: usb.core.Endpoint

def __init__(self, device: usb.core.Device) -> None:
device.set_configuration()
config = cast(
usb.core.Configuration,
device.get_active_configuration(),
)
interface = config[(0, 0)]
if (config := _get_config_if_exists(device)) is None:
try:
device.set_configuration()
except usb.core.USBError as ex:
raise BackendInitializationError(
f'Unable to connect to {device.product}: {ex}'
) from ex
logger.debug('Configuration successful')
config = device.get_active_configuration()

interface = config[(0, 0)]
self.device = device

_detach_driver_if_needed(device, interface.index)

try:
self.endpoint_out = _find_endpoint(interface, _match_out)
except EndpointNotFound as ex:
raise BackendInitializationError(
f'Outbound endpoint not found for {device}',
f'Outbound endpoint not found for {device.product}',
) from ex
logger.debug('Found outbound endpoint: %s', self.endpoint_out)
try:
self.endpoint_in = _find_endpoint(interface, _match_in)
except EndpointNotFound as ex:
raise BackendInitializationError(
f'Inbound endpoint not found for {device}',
f'Inbound endpoint not found for {device.product}',
) from ex
logger.debug('Found inbound endpoint: %s', self.endpoint_in)

def bulk_transfer(
self,
request: Union[list[int], bytes, bytearray],
) -> bytes:
response = array.array('B', bytearray(self.MAX_RESPONSE_LENGTH))
def bulk_transfer(self, request: SizedPayload) -> bytes:
buffer = array.array('B', bytearray(self.MAX_RESPONSE_LENGTH))
assert self.device.write(self.endpoint_out, request) == len(
request
)
bytes_received = self.device.read(self.endpoint_in, response)
return response[:bytes_received].tobytes()
num_bytes_received = self.device.read(self.endpoint_in, buffer)
response = buffer[:num_bytes_received].tobytes()
logger.debug(
'Got response: %s (%s)', response.hex(' '), response
)
return response

@property
def product_name(self) -> Optional[str]:
return cast(Optional[str], self.device.product)
return self.device.product

@property
def serial_number(self) -> Optional[str]:
return cast(Optional[str], self.device.serial_number)
return self.device.serial_number


def _detach_driver_if_needed(
device: usb.core.Device, interface_index: usb_types.InterfaceIndex
) -> None:
try:
want_to_detach_driver = device.is_kernel_driver_active(
interface_index
)
except NotImplementedError:
logger.debug(
'Note: unable to detach driver for interface #%d; proceeding',
interface_index,
)
want_to_detach_driver = False

if not want_to_detach_driver:
return

logger.debug(
'Detaching driver from interface #%d',
interface_index,
)
device.detach_kernel_driver(interface_index)
logger.debug('Driver successfully detached')


def _find_endpoint(
Expand All @@ -100,18 +137,32 @@ def _find_endpoint(
)
) is None:
raise EndpointNotFound('find_descriptor returned None')
return cast(usb.core.Endpoint, endpoint)
return endpoint


def _get_config_if_exists(
device: usb.core.Device,
) -> Optional[usb.core.Configuration]:
try:
config = device.get_active_configuration()
except usb.core.USBError:
logger.debug('Device has no active configuration')
config = None
else:
logger.debug('Device already configured')
logger.debug('Active configuration: %s', config)
return config


def _match_in(endpoint: usb.core.Endpoint) -> bool:
address = endpoint.bEndpointAddress # pyright: ignore
address = endpoint.bEndpointAddress
return bool(
usb.util.endpoint_direction(address) == usb.util.ENDPOINT_IN
)


def _match_out(device: usb.core.Device) -> bool:
address = device.bEndpointAddress # pyright: ignore
def _match_out(endpoint: usb.core.Endpoint) -> bool:
address = endpoint.bEndpointAddress
return bool(
usb.util.endpoint_direction(address) == usb.util.ENDPOINT_OUT
)
8 changes: 6 additions & 2 deletions itchcraft/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import __version__, api, fire_workarounds
from .errors import CliError
from .logging import get_logger
from .settings import PROJECT_ROOT, PYPROJECT_TOML
from .settings import debugMode, PROJECT_ROOT, PYPROJECT_TOML


logger = get_logger(__name__)
Expand All @@ -19,8 +19,10 @@ def _version_text() -> str:
if __version__ is None:
return 'Itchcraft (unknown version)'
if os.path.exists(PYPROJECT_TOML):
return f'Itchcraft v{__version__}' \
return (
f'Itchcraft v{__version__}'
+ f' (in development at {PROJECT_ROOT})'
)
return f'Itchcraft v{__version__}'


Expand All @@ -36,6 +38,8 @@ def run(*args: str) -> NoReturn:
try:
fire.Fire(api.Api, command=list(args) + sys.argv[1:])
except CliError as e:
if debugMode:
raise e
logger.error(e)
sys.exit(1)
sys.exit(0)
2 changes: 1 addition & 1 deletion itchcraft/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import functools
from typing import cast, Literal, Optional, Union

import usb.core # type: ignore
import usb.core

from .logging import get_logger
from .support import SupportStatement
Expand Down
7 changes: 6 additions & 1 deletion itchcraft/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from collections.abc import Iterator, Generator
from typing import Any, cast

import usb.core # type: ignore
import usb.core

from .device import from_usb_device, BiteHealerMetadata
from .logging import get_logger
from .support import SUPPORT_STATEMENTS, SupportStatement, VidPid

logger = get_logger(__name__)


def find_bite_healers() -> Iterator[BiteHealerMetadata]:
"""Finds available bite healers."""
Expand All @@ -24,8 +27,10 @@ def find_bite_healers() -> Iterator[BiteHealerMetadata]:
vid_pid = VidPid(vid=device.idVendor, pid=device.idProduct)

if (statement := vid_pid_dict.get(vid_pid)) is None:
logger.debug('Ignoring USB device %s', vid_pid)
continue

logger.debug('Detected bite healer %s', vid_pid)
yield from_usb_device(
usb_device=device,
support_statement=statement,
Expand Down
19 changes: 8 additions & 11 deletions itchcraft/heat_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

from collections.abc import Iterable
from functools import reduce
from typing import Optional, Union
from typing import Optional

from tenacity import retry
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed
import usb.core # type: ignore
import usb.core

from .backend import BulkTransferDevice
from .logging import get_logger
from .prefs import Preferences
from .types import BiteHealer
from .settings import debugMode
from .types import BiteHealer, SizedPayload


RESPONSE_LENGTH = 12
Expand All @@ -22,7 +23,7 @@


class HeatItDevice(BiteHealer):
"""A heat-it bite healer, configured over USB."""
"""A heat it” bite healer, configured over USB."""

device: BulkTransferDevice

Expand Down Expand Up @@ -73,9 +74,7 @@ def checksum(payload: Iterable[int]) -> int:
)

def _command(
self,
request: Union[list[int], bytes, bytearray],
command_name: Optional[str] = None,
self, request: SizedPayload, command_name: Optional[str] = None
) -> bytes:
if command_name is not None:
logger.info('Sending command: %s', command_name)
Expand All @@ -86,13 +85,11 @@ def _command(
@retry(
reraise=True,
retry=retry_if_exception_type(usb.core.USBError), # type: ignore
stop=stop_after_attempt(10), # type: ignore
stop=stop_after_attempt(3 if debugMode else 10), # type: ignore
wait=wait_fixed(1), # type: ignore
)
def self_test(self) -> None:
"""Tries up to five times to test the bootloader and obtain
the device status.
"""
"""Tests the bootloader and obtains the device status."""
logger.debug('Response: %s', self.test_bootloader().hex(' '))
logger.debug('Response: %s', self.get_status().hex(' '))

Expand Down
13 changes: 10 additions & 3 deletions itchcraft/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,33 @@

from colorama import Fore, Style

from .settings import debugMode


def get_logger(name: str) -> python_logging.Logger:
"""Instantiate a custom logger with color support."""
logger = python_logging.getLogger(name)
logger.setLevel(python_logging.DEBUG)
logger.setLevel(
python_logging.DEBUG if debugMode else python_logging.INFO
)
handler = python_logging.StreamHandler()
handler.setFormatter(_CustomFormatter())
logger.addHandler(handler)
return logger


class _CustomFormatter(python_logging.Formatter):
_format = "[%(levelname)s] %(message)s"
_format = '[%(levelname)s] %(message)s'

FORMATS = {
python_logging.DEBUG: Style.DIM + _format + Style.RESET_ALL,
python_logging.INFO: Style.DIM + _format + Style.RESET_ALL,
python_logging.WARNING: Fore.YELLOW + _format + Style.RESET_ALL,
python_logging.ERROR: Fore.RED + _format + Style.RESET_ALL,
python_logging.CRITICAL: Style.BRIGHT + Fore.RED + _format + Style.RESET_ALL
python_logging.CRITICAL: Style.BRIGHT
+ Fore.RED
+ _format
+ Style.RESET_ALL,
}

def format(self, record: python_logging.LogRecord) -> str:
Expand Down
Loading

0 comments on commit 7f76fbc

Please sign in to comment.