ok_serial

A Python serial port library (based on PySerial) with improved port discovery and I/O semantics. (Usage guide)

 1"""
 2A Python serial port library
 3(based on [PySerial](https://www.pyserial.com/))
 4with improved port discovery
 5and I/O semantics.
 6[(Usage guide)](https://github.com/egnor/ok-py-serial#readme)
 7"""
 8
 9from beartype.claw import beartype_this_package as _beartype_me
10
11# ruff: noqa: E402
12_beartype_me()
13
14from ok_serial._connection import (
15    SerialConnection,
16    SerialConnectionOptions,
17    SerialControlSignals,
18)
19
20from ok_serial._scanning import scan_serial_ports, SerialPort
21from ok_serial._matcher import SerialPortMatcher
22from ok_serial._tracker import SerialPortTracker, TrackerOptions
23from ok_serial._locking import SerialSharingType
24
25from ok_serial._exceptions import (
26    SerialException,
27    SerialIoClosed,
28    SerialIoException,
29    SerialMatcherInvalid,
30    SerialOpenBusy,
31    SerialOpenException,
32    SerialScanException,
33)
34
35__all__ = [n for n in globals() if not n.startswith("_")]
class SerialConnection(contextlib.AbstractContextManager):
 53class SerialConnection(contextlib.AbstractContextManager):
 54    """An open connection to a serial port."""
 55
 56    def __init__(
 57        self,
 58        *,
 59        match: str | SerialPortMatcher | None = None,
 60        port: str | SerialPort | None = None,
 61        opts: SerialConnectionOptions = SerialConnectionOptions(),
 62        **kwargs,
 63    ):
 64        """
 65        Opens a serial port to make it available for use.
 66        - `match` can be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching exactly one port...
 67          - OR `port` must name a raw system serial device to open.
 68        - `opts` can define baud rate and other port parameters...
 69          - OR other keywords are forwarded to `SerialConnectionOptions`
 70
 71        Call `close` to release the port; use
 72        `SerialConnection` as the target of a
 73        [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with)
 74        to automatically close the port on exit from the `with` body.
 75
 76        Example:
 77        ```
 78        with SerialConnection(match="vid_pid=0403:6001", baud=115200, sharing="polite") as p:
 79            ... interact with `p` ...
 80            # automatically closed on exit from block
 81        ```
 82
 83        Raises:
 84        - `SerialOpenException`: I/O error opening the specified port
 85        - `SerialOpenBusy`: The port is already in use
 86        - `SerialScanException`: System error scanning ports to find `match`
 87        - `SerialMatcherInvalid`: Bad format of `match` string
 88        """
 89
 90        assert (match is not None) + (port is not None) == 1
 91        opts = dataclasses.replace(opts, **kwargs)
 92
 93        if match is not None:
 94            if isinstance(match, str):
 95                match = SerialPortMatcher(match)
 96            if not (found := scan_serial_ports()):
 97                raise _exceptions.SerialOpenException("No ports found")
 98            if not (matched := match.filter(found)):
 99                msg = f"No ports match {match!r}"
100                raise _exceptions.SerialOpenException(msg)
101            if len(matched) > 1:
102                matched_text = "".join(f"\n  {p}" for p in matched)
103                msg = f'Multiple ports match "{match}": {matched_text}'
104                raise _exceptions.SerialOpenException(msg)
105            port = matched[0].name
106            log.debug("Scanned %r, found %s", match, port)
107
108        assert port is not None
109        if isinstance(port, SerialPort):
110            port = port.name
111
112        with contextlib.ExitStack() as cleanup:
113            cleanup.enter_context(using_lock_file(port, opts.sharing))
114
115            try:
116                pyserial = cleanup.enter_context(
117                    serial.Serial(
118                        port=port,
119                        baudrate=opts.baud,
120                        write_timeout=0.1,
121                    )
122                )
123                log.debug("Opened %s %s", port, opts)
124            except OSError as ex:
125                if ex.errno == errno.EBUSY:
126                    msg = "Serial port busy (EBUSY)"
127                    raise _exceptions.SerialOpenBusy(msg, port) from ex
128                else:
129                    msg = "Serial port open error"
130                    raise _exceptions.SerialOpenException(msg, port) from ex
131
132            if hasattr(pyserial, "fileno"):
133                fd, share = pyserial.fileno(), opts.sharing
134                cleanup.enter_context(using_fd_lock(port, fd, share))
135
136            self._io = cleanup.enter_context(_IoThreads(pyserial))
137            self._io.start()
138            self._cleanup = cleanup.pop_all()
139
140    def __del__(self) -> None:
141        if hasattr(self, "_cleanup"):
142            self._cleanup.close()
143
144    def __exit__(self, exc_type, exc_value, traceback) -> None:
145        self._cleanup.__exit__(exc_type, exc_value, traceback)
146
147    def __repr__(self) -> str:
148        return f"SerialConnection({self._io.pyserial.port!r})"
149
150    def close(self) -> None:
151        """
152        Releases the serial port connection and any associated locks.
153
154        Any I/O operations in progress or attempted after closure will
155        raise an immediate `SerialIoClosed` exception.
156        """
157
158        self._cleanup.close()
159
160    def read_sync(
161        self,
162        *,
163        timeout: float | int | None = None,
164    ) -> bytes:
165        """
166        Waits up to `timeout` seconds (forever for `None`) for data,
167        then returns all of it (b"" on timeout).
168
169        Raises:
170        - `SerialIoException`: port I/O failed and there is no matching data
171        - `SerialIoClosed`: the port was closed and there is no matching data
172        """
173
174        deadline = to_deadline(timeout)
175        while True:
176            with self._io.monitor:
177                if self._io.incoming:
178                    output = bytes(self._io.incoming)
179                    self._io.incoming.clear()
180                    return output
181                elif self._io.exception:
182                    raise self._io.exception
183                elif (wait := from_deadline(deadline)) <= 0:
184                    return b""
185                else:
186                    self._io.monitor.wait(timeout=wait)
187
188    async def read_async(self) -> bytes:
189        """
190        Similar to `read_sync` but returns a
191        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
192        instead of blocking the current thread.
193        """
194
195        while True:
196            future = self._io.create_future_in_loop()  # BEFORE read_sync
197            if out := self.read_sync(timeout=0):
198                return out
199            await future
200
201    def write(self, data: bytes | bytearray) -> None:
202        """
203        Adds data to the outgoing buffer to be sent immediately.
204        Never blocks; the buffer can grow indefinitely.
205        (Use `outgoing_size` and `drain_sync`/`drain_async` to manage
206        buffer size.)
207
208        Raises:
209        - `SerialIoException`: port I/O failed
210        - `SerialIoClosed`: the port was closed
211        """
212
213        with self._io.monitor:
214            if self._io.exception:
215                raise self._io.exception
216            elif data:
217                self._io.outgoing.extend(data)
218                self._io.monitor.notify_all()
219
220    def drain_sync(self, *, timeout: float | int | None = None) -> bool:
221        """
222        Waits up to `timeout` seconds (forever for `None`) until
223        all buffered data is transmitted.
224
225        Returns `True` if the drain completed, `False` on timeout.
226
227        Raises:
228        - `SerialIoException`: port I/O failed
229        - `SerialIoClosed`: the port was closed
230        """
231
232        deadline = to_deadline(timeout)
233        while True:
234            with self._io.monitor:
235                if self._io.exception:
236                    raise self._io.exception
237                elif not self._io.outgoing:
238                    return True
239                elif (wait := from_deadline(deadline)) <= 0:
240                    return False
241                else:
242                    self._io.monitor.wait(timeout=wait)
243
244    async def drain_async(self) -> bool:
245        """
246        Similar to `drain_sync` but returns a
247        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
248        instead of blocking the current thread.
249        """
250
251        while True:
252            future = self._io.create_future_in_loop()  # BEFORE drain_sync
253            if self.drain_sync(timeout=0):
254                return True
255            await future
256
257    def incoming_size(self) -> int:
258        """
259        Returns the number of bytes waiting to be read.
260        """
261        with self._io.monitor:
262            return len(self._io.incoming)
263
264    def outgoing_size(self) -> int:
265        """
266        Returns the number of bytes waiting to be sent.
267        """
268        with self._io.monitor:
269            return len(self._io.outgoing)
270
271    def set_signals(
272        self,
273        dtr: bool | None = None,
274        rts: bool | None = None,
275        send_break: bool | None = None,
276    ) -> None:
277        """
278        Sets outgoing
279        [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals)
280        state (use `None` for no change):
281        - `dtr`: assert Data Terminal Ready
282        - `rts`: assert Ready To Send
283        - `send_break`: send a continuous BREAK condition
284
285        Raises:
286        - `SerialIoException`: port I/O failed
287        - `SerialIoClosed`: the port was closed
288        """
289
290        with self._io.monitor:
291            if self._io.exception:
292                raise self._io.exception
293            try:
294                if dtr is not None:
295                    self._io.pyserial.dtr = dtr
296                if rts is not None:
297                    self._io.pyserial.rts = rts
298                if send_break is not None:
299                    self._io.pyserial.break_condition = send_break
300            except OSError as ex:
301                msg, dev = "Can't set control signals", self._io.pyserial.port
302                self._io.exception = _exceptions.SerialIoException(msg, dev)
303                self._io.exception.__cause__ = ex
304                raise self._io.exception
305
306    def get_signals(self) -> SerialControlSignals:
307        """
308        Returns the current
309        [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals) state.
310
311        Raises:
312        - `SerialIoException`: port I/O failed
313        - `SerialIoClosed`: the port was closed
314        """
315
316        with self._io.monitor:
317            if self._io.exception:
318                raise self._io.exception
319            try:
320                return SerialControlSignals(
321                    dtr=self._io.pyserial.dtr,
322                    dsr=self._io.pyserial.dsr,
323                    cts=self._io.pyserial.cts,
324                    rts=self._io.pyserial.rts,
325                    ri=self._io.pyserial.ri,
326                    cd=self._io.pyserial.cd,
327                    sending_break=self._io.pyserial.break_condition,
328                )
329            except OSError as ex:
330                msg, dev = "Can't get control signals", self._io.pyserial.port
331                self._io.exception = _exceptions.SerialIoException(msg, dev)
332                self._io.exception.__cause__ = ex
333                raise self._io.exception
334
335    @property
336    def port_name(self) -> str:
337        """
338        The port's device name, eg. `/dev/ttyACM0` or `COM3`.
339        """
340        return self._io.pyserial.port
341
342    @property
343    def pyserial(self) -> serial.Serial:
344        """
345        The underlying
346        [`pyserial.Serial`](https://pyserial.readthedocs.io/en/latest/pyserial_api.html#serial.Serial)
347        object (API escape hatch).
348        """
349        return self._io.pyserial
350
351    def fileno(self) -> int:
352        """
353        The [Unix FD](https://en.wikipedia.org/wiki/File_descriptor)
354        for the serial connection, -1 if not available.
355        """
356        try:
357            return self._io.serial.fileno()
358        except AttributeError:
359            return -1

An open connection to a serial port.

SerialConnection( *, match: str | SerialPortMatcher | None = None, port: str | SerialPort | None = None, opts: SerialConnectionOptions = SerialConnectionOptions(baud=115200, sharing='exclusive'), **kwargs)
 56    def __init__(
 57        self,
 58        *,
 59        match: str | SerialPortMatcher | None = None,
 60        port: str | SerialPort | None = None,
 61        opts: SerialConnectionOptions = SerialConnectionOptions(),
 62        **kwargs,
 63    ):
 64        """
 65        Opens a serial port to make it available for use.
 66        - `match` can be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching exactly one port...
 67          - OR `port` must name a raw system serial device to open.
 68        - `opts` can define baud rate and other port parameters...
 69          - OR other keywords are forwarded to `SerialConnectionOptions`
 70
 71        Call `close` to release the port; use
 72        `SerialConnection` as the target of a
 73        [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with)
 74        to automatically close the port on exit from the `with` body.
 75
 76        Example:
 77        ```
 78        with SerialConnection(match="vid_pid=0403:6001", baud=115200, sharing="polite") as p:
 79            ... interact with `p` ...
 80            # automatically closed on exit from block
 81        ```
 82
 83        Raises:
 84        - `SerialOpenException`: I/O error opening the specified port
 85        - `SerialOpenBusy`: The port is already in use
 86        - `SerialScanException`: System error scanning ports to find `match`
 87        - `SerialMatcherInvalid`: Bad format of `match` string
 88        """
 89
 90        assert (match is not None) + (port is not None) == 1
 91        opts = dataclasses.replace(opts, **kwargs)
 92
 93        if match is not None:
 94            if isinstance(match, str):
 95                match = SerialPortMatcher(match)
 96            if not (found := scan_serial_ports()):
 97                raise _exceptions.SerialOpenException("No ports found")
 98            if not (matched := match.filter(found)):
 99                msg = f"No ports match {match!r}"
100                raise _exceptions.SerialOpenException(msg)
101            if len(matched) > 1:
102                matched_text = "".join(f"\n  {p}" for p in matched)
103                msg = f'Multiple ports match "{match}": {matched_text}'
104                raise _exceptions.SerialOpenException(msg)
105            port = matched[0].name
106            log.debug("Scanned %r, found %s", match, port)
107
108        assert port is not None
109        if isinstance(port, SerialPort):
110            port = port.name
111
112        with contextlib.ExitStack() as cleanup:
113            cleanup.enter_context(using_lock_file(port, opts.sharing))
114
115            try:
116                pyserial = cleanup.enter_context(
117                    serial.Serial(
118                        port=port,
119                        baudrate=opts.baud,
120                        write_timeout=0.1,
121                    )
122                )
123                log.debug("Opened %s %s", port, opts)
124            except OSError as ex:
125                if ex.errno == errno.EBUSY:
126                    msg = "Serial port busy (EBUSY)"
127                    raise _exceptions.SerialOpenBusy(msg, port) from ex
128                else:
129                    msg = "Serial port open error"
130                    raise _exceptions.SerialOpenException(msg, port) from ex
131
132            if hasattr(pyserial, "fileno"):
133                fd, share = pyserial.fileno(), opts.sharing
134                cleanup.enter_context(using_fd_lock(port, fd, share))
135
136            self._io = cleanup.enter_context(_IoThreads(pyserial))
137            self._io.start()
138            self._cleanup = cleanup.pop_all()

Opens a serial port to make it available for use.

  • match can be a port match expression matching exactly one port...
    • OR port must name a raw system serial device to open.
  • opts can define baud rate and other port parameters...

Call close to release the port; use SerialConnection as the target of a with statement to automatically close the port on exit from the with body.

Example:

with SerialConnection(match="vid_pid=0403:6001", baud=115200, sharing="polite") as p:
    ... interact with `p` ...
    # automatically closed on exit from block

Raises:

def close(self) -> None:
150    def close(self) -> None:
151        """
152        Releases the serial port connection and any associated locks.
153
154        Any I/O operations in progress or attempted after closure will
155        raise an immediate `SerialIoClosed` exception.
156        """
157
158        self._cleanup.close()

Releases the serial port connection and any associated locks.

Any I/O operations in progress or attempted after closure will raise an immediate SerialIoClosed exception.

def read_sync(self, *, timeout: float | int | None = None) -> bytes:
160    def read_sync(
161        self,
162        *,
163        timeout: float | int | None = None,
164    ) -> bytes:
165        """
166        Waits up to `timeout` seconds (forever for `None`) for data,
167        then returns all of it (b"" on timeout).
168
169        Raises:
170        - `SerialIoException`: port I/O failed and there is no matching data
171        - `SerialIoClosed`: the port was closed and there is no matching data
172        """
173
174        deadline = to_deadline(timeout)
175        while True:
176            with self._io.monitor:
177                if self._io.incoming:
178                    output = bytes(self._io.incoming)
179                    self._io.incoming.clear()
180                    return output
181                elif self._io.exception:
182                    raise self._io.exception
183                elif (wait := from_deadline(deadline)) <= 0:
184                    return b""
185                else:
186                    self._io.monitor.wait(timeout=wait)

Waits up to timeout seconds (forever for None) for data, then returns all of it (b"" on timeout).

Raises:

async def read_async(self) -> bytes:
188    async def read_async(self) -> bytes:
189        """
190        Similar to `read_sync` but returns a
191        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
192        instead of blocking the current thread.
193        """
194
195        while True:
196            future = self._io.create_future_in_loop()  # BEFORE read_sync
197            if out := self.read_sync(timeout=0):
198                return out
199            await future

Similar to read_sync but returns a Future instead of blocking the current thread.

def write(self, data: bytes | bytearray) -> None:
201    def write(self, data: bytes | bytearray) -> None:
202        """
203        Adds data to the outgoing buffer to be sent immediately.
204        Never blocks; the buffer can grow indefinitely.
205        (Use `outgoing_size` and `drain_sync`/`drain_async` to manage
206        buffer size.)
207
208        Raises:
209        - `SerialIoException`: port I/O failed
210        - `SerialIoClosed`: the port was closed
211        """
212
213        with self._io.monitor:
214            if self._io.exception:
215                raise self._io.exception
216            elif data:
217                self._io.outgoing.extend(data)
218                self._io.monitor.notify_all()

Adds data to the outgoing buffer to be sent immediately. Never blocks; the buffer can grow indefinitely. (Use outgoing_size and drain_sync/drain_async to manage buffer size.)

Raises:

def drain_sync(self, *, timeout: float | int | None = None) -> bool:
220    def drain_sync(self, *, timeout: float | int | None = None) -> bool:
221        """
222        Waits up to `timeout` seconds (forever for `None`) until
223        all buffered data is transmitted.
224
225        Returns `True` if the drain completed, `False` on timeout.
226
227        Raises:
228        - `SerialIoException`: port I/O failed
229        - `SerialIoClosed`: the port was closed
230        """
231
232        deadline = to_deadline(timeout)
233        while True:
234            with self._io.monitor:
235                if self._io.exception:
236                    raise self._io.exception
237                elif not self._io.outgoing:
238                    return True
239                elif (wait := from_deadline(deadline)) <= 0:
240                    return False
241                else:
242                    self._io.monitor.wait(timeout=wait)

Waits up to timeout seconds (forever for None) until all buffered data is transmitted.

Returns True if the drain completed, False on timeout.

Raises:

async def drain_async(self) -> bool:
244    async def drain_async(self) -> bool:
245        """
246        Similar to `drain_sync` but returns a
247        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
248        instead of blocking the current thread.
249        """
250
251        while True:
252            future = self._io.create_future_in_loop()  # BEFORE drain_sync
253            if self.drain_sync(timeout=0):
254                return True
255            await future

Similar to drain_sync but returns a Future instead of blocking the current thread.

def incoming_size(self) -> int:
257    def incoming_size(self) -> int:
258        """
259        Returns the number of bytes waiting to be read.
260        """
261        with self._io.monitor:
262            return len(self._io.incoming)

Returns the number of bytes waiting to be read.

def outgoing_size(self) -> int:
264    def outgoing_size(self) -> int:
265        """
266        Returns the number of bytes waiting to be sent.
267        """
268        with self._io.monitor:
269            return len(self._io.outgoing)

Returns the number of bytes waiting to be sent.

def set_signals( self, dtr: bool | None = None, rts: bool | None = None, send_break: bool | None = None) -> None:
271    def set_signals(
272        self,
273        dtr: bool | None = None,
274        rts: bool | None = None,
275        send_break: bool | None = None,
276    ) -> None:
277        """
278        Sets outgoing
279        [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals)
280        state (use `None` for no change):
281        - `dtr`: assert Data Terminal Ready
282        - `rts`: assert Ready To Send
283        - `send_break`: send a continuous BREAK condition
284
285        Raises:
286        - `SerialIoException`: port I/O failed
287        - `SerialIoClosed`: the port was closed
288        """
289
290        with self._io.monitor:
291            if self._io.exception:
292                raise self._io.exception
293            try:
294                if dtr is not None:
295                    self._io.pyserial.dtr = dtr
296                if rts is not None:
297                    self._io.pyserial.rts = rts
298                if send_break is not None:
299                    self._io.pyserial.break_condition = send_break
300            except OSError as ex:
301                msg, dev = "Can't set control signals", self._io.pyserial.port
302                self._io.exception = _exceptions.SerialIoException(msg, dev)
303                self._io.exception.__cause__ = ex
304                raise self._io.exception

Sets outgoing RS-232 modem control line state (use None for no change):

  • dtr: assert Data Terminal Ready
  • rts: assert Ready To Send
  • send_break: send a continuous BREAK condition

Raises:

def get_signals(self) -> SerialControlSignals:
306    def get_signals(self) -> SerialControlSignals:
307        """
308        Returns the current
309        [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals) state.
310
311        Raises:
312        - `SerialIoException`: port I/O failed
313        - `SerialIoClosed`: the port was closed
314        """
315
316        with self._io.monitor:
317            if self._io.exception:
318                raise self._io.exception
319            try:
320                return SerialControlSignals(
321                    dtr=self._io.pyserial.dtr,
322                    dsr=self._io.pyserial.dsr,
323                    cts=self._io.pyserial.cts,
324                    rts=self._io.pyserial.rts,
325                    ri=self._io.pyserial.ri,
326                    cd=self._io.pyserial.cd,
327                    sending_break=self._io.pyserial.break_condition,
328                )
329            except OSError as ex:
330                msg, dev = "Can't get control signals", self._io.pyserial.port
331                self._io.exception = _exceptions.SerialIoException(msg, dev)
332                self._io.exception.__cause__ = ex
333                raise self._io.exception

Returns the current RS-232 modem control line state.

Raises:

port_name: str
335    @property
336    def port_name(self) -> str:
337        """
338        The port's device name, eg. `/dev/ttyACM0` or `COM3`.
339        """
340        return self._io.pyserial.port

The port's device name, eg. /dev/ttyACM0 or COM3.

pyserial: serial.serialposix.Serial
342    @property
343    def pyserial(self) -> serial.Serial:
344        """
345        The underlying
346        [`pyserial.Serial`](https://pyserial.readthedocs.io/en/latest/pyserial_api.html#serial.Serial)
347        object (API escape hatch).
348        """
349        return self._io.pyserial

The underlying pyserial.Serial object (API escape hatch).

def fileno(self) -> int:
351    def fileno(self) -> int:
352        """
353        The [Unix FD](https://en.wikipedia.org/wiki/File_descriptor)
354        for the serial connection, -1 if not available.
355        """
356        try:
357            return self._io.serial.fileno()
358        except AttributeError:
359            return -1

The Unix FD for the serial connection, -1 if not available.

@dataclasses.dataclass(frozen=True)
class SerialConnectionOptions:
20@dataclasses.dataclass(frozen=True)
21class SerialConnectionOptions:
22    """Optional parameters for `SerialConnection`."""
23
24    baud: int = 115200
25    """The [baud rate](https://en.wikipedia.org/wiki/Baud) to use."""
26    sharing: SerialSharingType = "exclusive"
27    """
28    Port access negotiation strategy:
29    - `"oblivious"`: Don't perform any locking.
30    - `"polite"`: Defer to other users, don't lock the port.
31    - `"exclusive":` Require exclusive access, lock the port or fail.
32    - `"stomp"`: Try to kill other users, try to lock the port, open the
33      port regardless. Use with care!
34    """

Optional parameters for SerialConnection.

SerialConnectionOptions( baud: int = 115200, sharing: Literal['oblivious', 'polite', 'exclusive', 'stomp'] = 'exclusive')
baud: int = 115200

The baud rate to use.

sharing: Literal['oblivious', 'polite', 'exclusive', 'stomp'] = 'exclusive'

Port access negotiation strategy:

  • "oblivious": Don't perform any locking.
  • "polite": Defer to other users, don't lock the port.
  • "exclusive": Require exclusive access, lock the port or fail.
  • "stomp": Try to kill other users, try to lock the port, open the port regardless. Use with care!
@dataclasses.dataclass(frozen=True)
class SerialControlSignals:
37@dataclasses.dataclass(frozen=True)
38class SerialControlSignals:
39    """
40    [RS-232 modem control lines](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals),
41    outgoing ("DTE to DCE") and incoming ("DCE to DTE").
42    """
43
44    dtr: bool
45    dsr: bool
46    cts: bool
47    rts: bool
48    ri: bool
49    cd: bool
50    sending_break: bool

RS-232 modem control lines, outgoing ("DTE to DCE") and incoming ("DCE to DTE").

SerialControlSignals( dtr: bool, dsr: bool, cts: bool, rts: bool, ri: bool, cd: bool, sending_break: bool)
dtr: bool
dsr: bool
cts: bool
rts: bool
ri: bool
cd: bool
sending_break: bool
def scan_serial_ports() -> list[SerialPort]:
37def scan_serial_ports() -> list[SerialPort]:
38    """
39    Returns a list of serial ports currently attached to the system.
40
41    For testing and encapsulation, if the environment variable
42    `$OK_SERIAL_SCAN_OVERRIDE` is the pathname of a JSON file in
43    `{"port-name": {"attr": "value", ...}, ...}` format, that port listing
44    is returned instead of actual system scan results.
45
46    Raises:
47    - `SerialScanException`: System error scanning ports
48    """
49
50    if ov_path := os.getenv("OK_SERIAL_SCAN_OVERRIDE"):
51        try:
52            out = _ports_from_json_text(pathlib.Path(ov_path).read_text())
53        except (OSError, ValueError) as ex:
54            msg = f"Can't read $OK_SERIAL_SCAN_OVERRIDE {ov_path}"
55            raise SerialScanException(msg) from ex
56
57        log.debug("Read $OK_SERIAL_SCAN_OVERRIDE %s", ov_path)
58    else:
59        try:
60            pyserial_ports = list_ports.comports()
61        except OSError as ex:
62            raise SerialScanException("Can't scan serial") from ex
63        out = []
64        for pyserial_port in pyserial_ports:
65            if port := _port_from_pyserial(pyserial_port):
66                out.append(port)
67
68    out.sort(key=natsort.natsort_keygen(key=lambda p: p.name, alg=natsort.ns.P))
69    log.debug("Found %d ports", len(out))
70    return out

Returns a list of serial ports currently attached to the system.

For testing and encapsulation, if the environment variable $OK_SERIAL_SCAN_OVERRIDE is the pathname of a JSON file in {"port-name": {"attr": "value", ...}, ...} format, that port listing is returned instead of actual system scan results.

Raises:

@dataclasses.dataclass(frozen=True)
class SerialPort:
21@dataclasses.dataclass(frozen=True)
22class SerialPort:
23    """Metadata about a serial port found on the system"""
24
25    name: str
26    """The OS device identifier, eg. `/dev/ttyUSB3`, 'COM4', etc."""
27
28    attr: dict[str, str]
29    """
30    [Metadata](https://github.com/egnor/py-ok-serial#serial-port-attributes)
31    """
32
33    def __str__(self):
34        return self.name

Metadata about a serial port found on the system

SerialPort(name: str, attr: dict[str, str])
name: str

The OS device identifier, eg. /dev/ttyUSB3, 'COM4', etc.

attr: dict[str, str]
class SerialPortMatcher:
 57class SerialPortMatcher:
 58    """A parsed expression for identifying serial ports of interest."""
 59
 60    def __init__(self, match: str):
 61        """Parses a
 62        [serial port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions)
 63        in preparation to identify matching `SerialPort` objects.
 64        """
 65
 66        self._input = match
 67        self._pos: list[_Rule] = []
 68        self._neg: list[_Rule] = []
 69        self._oldest = False
 70        self._newest = False
 71
 72        next_pos = 0
 73        while next_pos < len(match):
 74            tm = _TERM_RE.match(match, pos=next_pos)
 75            if not (tm and tm[0]):
 76                jmatch = json.dumps(match)  # use JSON for consistent quoting
 77                jpre = json.dumps(match[:next_pos])
 78                msg = f"Bad port expr:\n  {jmatch}\n -{'-' * (len(jpre) - 1)}^"
 79                raise SerialMatcherInvalid(msg)
 80
 81            next_pos, groups = tm.end(), tm.groups(default="")
 82            rx_att, rx_ex, rx_text, att, eq_ex, ex, qstr, num, naked = groups
 83            out_list = self._neg if bool(rx_ex or eq_ex or ex) else self._pos
 84            if rx_text:
 85                try:
 86                    out_list.append(_Rule(rx_att, re.compile(rx_text)))
 87                except re.error as ex:
 88                    msg = f"Bad port regex: /{rx_text}/"
 89                    raise SerialMatcherInvalid(msg) from ex
 90            elif qstr:
 91                rx = _qstr_rx(qstr, glob=False, full=bool(att))
 92                out_list.append(_Rule(att, rx))
 93            elif num:
 94                if num[-1:] in "hH":
 95                    value = int("{num[:-1]", 16)
 96                elif num.isdigit():
 97                    value = int(num, 10)  # int('0123', 0) is an error!
 98                else:
 99                    value = int(num, 0)
100                rx_text = f"({num}|0*{value}|(0x)?0*{value:x}h?)"
101                rx = _wrap_rx(rx_text, full=bool(att), wb=True, we=True)
102                out_list.append(_Rule(att, rx))
103            elif _OLDEST_RE.match(naked) and not att and out_list is self._pos:
104                self._oldest = True
105            elif _NEWEST_RE.match(naked) and not att and out_list is self._pos:
106                self._newest = True
107            elif naked:
108                rx = _qstr_rx(naked, glob=True, full=bool(att))
109                out_list.append(_Rule(att, rx))
110            else:
111                assert False, f"bad term match: {tm[0]!r}"
112
113        if self._oldest and self._newest:
114            msg = f"oldest & newest: {match!r}"
115            raise SerialMatcherInvalid(msg)
116        elif not self:
117            log.debug("Parsed %s (any port)", repr(match))
118        elif log.isEnabledFor(logging.DEBUG):
119            rules = "".join(f"\n  {r}" for r in self.patterns())
120            log.debug("Parsed %s:%s", repr(match), rules)
121
122    def __repr__(self) -> str:
123        return f"SerialPortMatcher({self._input!r})"
124
125    def __str__(self) -> str:
126        return self._input
127
128    def __bool__(self) -> bool:
129        return bool(self._oldest or self._newest or self._pos or self._neg)
130
131    def patterns(self) -> list[str]:
132        return [
133            *(["oldest"] if self._oldest else []),
134            *(["newest"] if self._newest else []),
135            *[str(r).replace("~/", "!~/", 1) for r in self._neg],
136            *[str(r) for r in self._pos],
137        ]
138
139    def filter(self, ports: list[SerialPort]) -> list[SerialPort]:
140        """Filters a list of ports according to the match criteria."""
141
142        matches = []
143        for p in ports:
144            attrs = list(p.attr.items())
145            pos = all(any(r.match(k, v) for k, v in attrs) for r in self._pos)
146            neg = any(r.match(k, v) for k, v in attrs for r in self._neg)
147            if pos and not neg:
148                matches.append(p)
149
150        if self._oldest or self._newest:
151            timed = [m for m in matches if "time" in p.attr]
152            timed.sort(key=lambda m: m.attr["time"])
153            return timed[:1] if self._oldest else timed[-1:]
154
155        return matches
156
157    def attr_hit(self, port: SerialPort, key: str) -> bool:
158        """True if the attribute given by 'key' is a positive match."""
159
160        if v := port.attr.get(key, ""):
161            return any(r.match(key, v) for r in self._pos)
162        return False

A parsed expression for identifying serial ports of interest.

SerialPortMatcher(match: str)
 60    def __init__(self, match: str):
 61        """Parses a
 62        [serial port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions)
 63        in preparation to identify matching `SerialPort` objects.
 64        """
 65
 66        self._input = match
 67        self._pos: list[_Rule] = []
 68        self._neg: list[_Rule] = []
 69        self._oldest = False
 70        self._newest = False
 71
 72        next_pos = 0
 73        while next_pos < len(match):
 74            tm = _TERM_RE.match(match, pos=next_pos)
 75            if not (tm and tm[0]):
 76                jmatch = json.dumps(match)  # use JSON for consistent quoting
 77                jpre = json.dumps(match[:next_pos])
 78                msg = f"Bad port expr:\n  {jmatch}\n -{'-' * (len(jpre) - 1)}^"
 79                raise SerialMatcherInvalid(msg)
 80
 81            next_pos, groups = tm.end(), tm.groups(default="")
 82            rx_att, rx_ex, rx_text, att, eq_ex, ex, qstr, num, naked = groups
 83            out_list = self._neg if bool(rx_ex or eq_ex or ex) else self._pos
 84            if rx_text:
 85                try:
 86                    out_list.append(_Rule(rx_att, re.compile(rx_text)))
 87                except re.error as ex:
 88                    msg = f"Bad port regex: /{rx_text}/"
 89                    raise SerialMatcherInvalid(msg) from ex
 90            elif qstr:
 91                rx = _qstr_rx(qstr, glob=False, full=bool(att))
 92                out_list.append(_Rule(att, rx))
 93            elif num:
 94                if num[-1:] in "hH":
 95                    value = int("{num[:-1]", 16)
 96                elif num.isdigit():
 97                    value = int(num, 10)  # int('0123', 0) is an error!
 98                else:
 99                    value = int(num, 0)
100                rx_text = f"({num}|0*{value}|(0x)?0*{value:x}h?)"
101                rx = _wrap_rx(rx_text, full=bool(att), wb=True, we=True)
102                out_list.append(_Rule(att, rx))
103            elif _OLDEST_RE.match(naked) and not att and out_list is self._pos:
104                self._oldest = True
105            elif _NEWEST_RE.match(naked) and not att and out_list is self._pos:
106                self._newest = True
107            elif naked:
108                rx = _qstr_rx(naked, glob=True, full=bool(att))
109                out_list.append(_Rule(att, rx))
110            else:
111                assert False, f"bad term match: {tm[0]!r}"
112
113        if self._oldest and self._newest:
114            msg = f"oldest & newest: {match!r}"
115            raise SerialMatcherInvalid(msg)
116        elif not self:
117            log.debug("Parsed %s (any port)", repr(match))
118        elif log.isEnabledFor(logging.DEBUG):
119            rules = "".join(f"\n  {r}" for r in self.patterns())
120            log.debug("Parsed %s:%s", repr(match), rules)

Parses a serial port match expression in preparation to identify matching SerialPort objects.

def patterns(self) -> list[str]:
131    def patterns(self) -> list[str]:
132        return [
133            *(["oldest"] if self._oldest else []),
134            *(["newest"] if self._newest else []),
135            *[str(r).replace("~/", "!~/", 1) for r in self._neg],
136            *[str(r) for r in self._pos],
137        ]
def filter( self, ports: list[SerialPort]) -> list[SerialPort]:
139    def filter(self, ports: list[SerialPort]) -> list[SerialPort]:
140        """Filters a list of ports according to the match criteria."""
141
142        matches = []
143        for p in ports:
144            attrs = list(p.attr.items())
145            pos = all(any(r.match(k, v) for k, v in attrs) for r in self._pos)
146            neg = any(r.match(k, v) for k, v in attrs for r in self._neg)
147            if pos and not neg:
148                matches.append(p)
149
150        if self._oldest or self._newest:
151            timed = [m for m in matches if "time" in p.attr]
152            timed.sort(key=lambda m: m.attr["time"])
153            return timed[:1] if self._oldest else timed[-1:]
154
155        return matches

Filters a list of ports according to the match criteria.

def attr_hit(self, port: SerialPort, key: str) -> bool:
157    def attr_hit(self, port: SerialPort, key: str) -> bool:
158        """True if the attribute given by 'key' is a positive match."""
159
160        if v := port.attr.get(key, ""):
161            return any(r.match(key, v) for r in self._pos)
162        return False

True if the attribute given by 'key' is a positive match.

class SerialPortTracker(contextlib.AbstractContextManager):
 30class SerialPortTracker(contextlib.AbstractContextManager):
 31    """
 32    Utility class to maintain a connection to a serial port of interest,
 33    re-scanning and re-connecting as needed after errors, with periodic retry.
 34    This is used for robust communication with a serial device which might be
 35    plugged and unplugged during operation.
 36    """
 37
 38    def __init__(
 39        self,
 40        match: str | SerialPortMatcher,
 41        *,
 42        baud: int = 0,
 43        topts: TrackerOptions = TrackerOptions(),
 44        copts: SerialConnectionOptions = SerialConnectionOptions(),
 45    ):
 46        """
 47        Prepare to manage a serial port connection.
 48        - `match` must be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching the port of interest
 49        - `topts` can define parameters for tracking (eg. re-scan interval)
 50        - `copts` can define parameters for connecting (eg. baud rate)
 51          - OR `baud` can set the baud rate (as a shortcut)
 52
 53        Actual port scans and connections only happen after `find_*` or
 54        `connect_*` methods are called. Call `close` to end any open
 55        connection; use `SerialPortTracker` as the target of a
 56        [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with)
 57        to automatically close the port on exit from the `with` body.
 58
 59        Raises:
 60        - `SerialMatcherInvalid`: Bad format of `match` string
 61        """
 62
 63        if isinstance(match, str):
 64            match = SerialPortMatcher(match)
 65        if baud:
 66            copts = dataclasses.replace(copts, baud=baud)
 67
 68        self._match = match
 69        self._tracker_opts = topts
 70        self._conn_opts = copts
 71
 72        self._lock = threading.Lock()
 73        self._scan_keys: set[str] = set()
 74        self._scan_matched: list[SerialPort] = []
 75        self._scan_deadline = 0.0
 76        self._next_scan = 0.0
 77        self._conn: SerialConnection | None = None
 78
 79        log.debug("Tracking: %s", repr(str(match)) if match else "(any port)")
 80
 81    def __exit__(self, exc_type, exc_value, traceback):
 82        self.close()
 83
 84    def __repr__(self) -> str:
 85        return (
 86            f"SerialPortTracker({self._match!r}, "
 87            f"topts={self._tracker_opts!r}, "
 88            f"copts={self._conn_opts!r})"
 89        )
 90
 91    def close(self) -> None:
 92        """
 93        Closes any open connection with `SerialConnection.close`. A subsequent
 94        call to `connect_sync`/`connect_async` will establish a new connection.
 95        """
 96
 97        with self._lock:
 98            if self._conn:
 99                self._conn.close()
100
101    def find_sync(self, timeout: float | int | None = None) -> list[SerialPort]:
102        """
103        Waits up to `timeout` seconds (forever for `None`) until serial port(s)
104        exist matching this tracker's requirements. Rescans periodically while
105        waiting (see `TrackerOptions.scan_interval`).
106
107        Returns a list of matching `SerialPort` objects, or `[]` on timeout.
108
109        Raises:
110        - `SerialScanException`: System error scanning ports
111        """
112
113        def key(p: SerialPort) -> str:
114            return p.name + "@" + p.attr.get("time", "")
115
116        deadline = to_deadline(timeout)
117        while True:
118            with self._lock:
119                if (wait := from_deadline(self._next_scan)) <= 0:
120                    wait = self._tracker_opts.scan_interval
121                    found = scan_serial_ports()
122                    for p in found if self._next_scan else []:  # not first scan
123                        if key(p) not in self._scan_keys:
124                            p.attr["tracking"] = "new"
125
126                    matched = self._match.filter(found)
127                    self._next_scan = to_deadline(wait)
128                    self._scan_keys = set(key(p) for p in found)
129                    self._scan_matched = matched
130                    nf, nm = len(found), len(matched)
131                    log.debug("%d/%d ports match %r", nm, nf, str(self._match))
132
133                if self._scan_matched:
134                    return self._scan_matched
135
136            timeout_wait = from_deadline(deadline)
137            if timeout_wait < wait:
138                return []
139
140            log.debug("Next scan in %.2fs", wait)
141            time.sleep(wait)
142
143    async def find_async(self) -> list[SerialPort]:
144        """
145        Similar to `find_sync` but returns a
146        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
147        instead of blocking the current thread.
148        """
149
150        while True:
151            with self._lock:
152                next_scan = self._next_scan
153            if ports := self.find_sync(timeout=0):
154                return ports
155            wait = from_deadline(next_scan)
156            log.debug("Next scan in %.2fs", wait)
157            await asyncio.sleep(wait)
158
159    def connect_sync(
160        self, timeout: float | int | None = None
161    ) -> SerialConnection | None:
162        """
163        If a connection is established and healthy, returns it immediately.
164
165        Otherwise, waits up to `timeout` seconds (forever for `None`) for
166        serial port(s) to appear matching this tracker's requirements, then
167        attempts a new connection. If the connection succeeds, it is remembered
168        and returned, otherwise scanning resumes.
169
170        If multiple ports match the requirements, connections are attempted
171        to each, and the first success (if any) is remembered and returned.
172
173        Returns `None` on timeout.
174
175        Raises:
176        - `SerialScanException`: System error scanning ports
177        """
178
179        deadline = to_deadline(timeout)
180        ports: list[SerialPort] = []
181        while True:
182            with self._lock:
183                if self._conn:
184                    try:
185                        self._conn.write(b"")  # check for liveness
186                        return self._conn
187                    except SerialIoClosed:
188                        log.debug("%s closed", self._conn.port_name)
189                        self._conn = None
190                    except SerialIoException as exc:
191                        name = self._conn.port_name
192                        log.warning("%s failed (%s)", name, exc)
193                        self._conn.close()
194                        self._conn = None
195
196                ports.sort(key=lambda p: p.attr.get("time", ""), reverse=True)
197                for port in ports:
198                    try:
199                        self._conn = SerialConnection(
200                            port=port, opts=self._conn_opts
201                        )
202                        return self._conn
203                    except SerialOpenException as exc:
204                        log.warning("Can't open %s (%s)", port, exc)
205                        self._scan_matched = []  # force re-scan on error
206
207            if not (ports := self.find_sync(timeout=from_deadline(deadline))):
208                return None
209
210    async def connect_async(self) -> SerialConnection:
211        """
212        Similar to `connect_sync` but returns a
213        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
214        instead of blocking the current thread.
215        """
216
217        while True:
218            with self._lock:
219                next_scan = self._next_scan
220            if conn := self.connect_sync(timeout=0):
221                return conn
222            wait = from_deadline(next_scan)
223            log.debug("Next scan in %.2fs", wait)
224            await asyncio.sleep(wait)
225
226    @property
227    def matcher(self) -> SerialPortMatcher:
228        """The `SerialPortMatcher` used by this tracker."""
229        return self._match

Utility class to maintain a connection to a serial port of interest, re-scanning and re-connecting as needed after errors, with periodic retry. This is used for robust communication with a serial device which might be plugged and unplugged during operation.

SerialPortTracker( match: str | SerialPortMatcher, *, baud: int = 0, topts: TrackerOptions = TrackerOptions(scan_interval=0.5), copts: SerialConnectionOptions = SerialConnectionOptions(baud=115200, sharing='exclusive'))
38    def __init__(
39        self,
40        match: str | SerialPortMatcher,
41        *,
42        baud: int = 0,
43        topts: TrackerOptions = TrackerOptions(),
44        copts: SerialConnectionOptions = SerialConnectionOptions(),
45    ):
46        """
47        Prepare to manage a serial port connection.
48        - `match` must be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching the port of interest
49        - `topts` can define parameters for tracking (eg. re-scan interval)
50        - `copts` can define parameters for connecting (eg. baud rate)
51          - OR `baud` can set the baud rate (as a shortcut)
52
53        Actual port scans and connections only happen after `find_*` or
54        `connect_*` methods are called. Call `close` to end any open
55        connection; use `SerialPortTracker` as the target of a
56        [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with)
57        to automatically close the port on exit from the `with` body.
58
59        Raises:
60        - `SerialMatcherInvalid`: Bad format of `match` string
61        """
62
63        if isinstance(match, str):
64            match = SerialPortMatcher(match)
65        if baud:
66            copts = dataclasses.replace(copts, baud=baud)
67
68        self._match = match
69        self._tracker_opts = topts
70        self._conn_opts = copts
71
72        self._lock = threading.Lock()
73        self._scan_keys: set[str] = set()
74        self._scan_matched: list[SerialPort] = []
75        self._scan_deadline = 0.0
76        self._next_scan = 0.0
77        self._conn: SerialConnection | None = None
78
79        log.debug("Tracking: %s", repr(str(match)) if match else "(any port)")

Prepare to manage a serial port connection.

  • match must be a port match expression matching the port of interest
  • topts can define parameters for tracking (eg. re-scan interval)
  • copts can define parameters for connecting (eg. baud rate)
    • OR baud can set the baud rate (as a shortcut)

Actual port scans and connections only happen after find_* or connect_* methods are called. Call close to end any open connection; use SerialPortTracker as the target of a with statement to automatically close the port on exit from the with body.

Raises:

def close(self) -> None:
91    def close(self) -> None:
92        """
93        Closes any open connection with `SerialConnection.close`. A subsequent
94        call to `connect_sync`/`connect_async` will establish a new connection.
95        """
96
97        with self._lock:
98            if self._conn:
99                self._conn.close()

Closes any open connection with SerialConnection.close. A subsequent call to connect_sync/connect_async will establish a new connection.

def find_sync( self, timeout: float | int | None = None) -> list[SerialPort]:
101    def find_sync(self, timeout: float | int | None = None) -> list[SerialPort]:
102        """
103        Waits up to `timeout` seconds (forever for `None`) until serial port(s)
104        exist matching this tracker's requirements. Rescans periodically while
105        waiting (see `TrackerOptions.scan_interval`).
106
107        Returns a list of matching `SerialPort` objects, or `[]` on timeout.
108
109        Raises:
110        - `SerialScanException`: System error scanning ports
111        """
112
113        def key(p: SerialPort) -> str:
114            return p.name + "@" + p.attr.get("time", "")
115
116        deadline = to_deadline(timeout)
117        while True:
118            with self._lock:
119                if (wait := from_deadline(self._next_scan)) <= 0:
120                    wait = self._tracker_opts.scan_interval
121                    found = scan_serial_ports()
122                    for p in found if self._next_scan else []:  # not first scan
123                        if key(p) not in self._scan_keys:
124                            p.attr["tracking"] = "new"
125
126                    matched = self._match.filter(found)
127                    self._next_scan = to_deadline(wait)
128                    self._scan_keys = set(key(p) for p in found)
129                    self._scan_matched = matched
130                    nf, nm = len(found), len(matched)
131                    log.debug("%d/%d ports match %r", nm, nf, str(self._match))
132
133                if self._scan_matched:
134                    return self._scan_matched
135
136            timeout_wait = from_deadline(deadline)
137            if timeout_wait < wait:
138                return []
139
140            log.debug("Next scan in %.2fs", wait)
141            time.sleep(wait)

Waits up to timeout seconds (forever for None) until serial port(s) exist matching this tracker's requirements. Rescans periodically while waiting (see TrackerOptions.scan_interval).

Returns a list of matching SerialPort objects, or [] on timeout.

Raises:

async def find_async(self) -> list[SerialPort]:
143    async def find_async(self) -> list[SerialPort]:
144        """
145        Similar to `find_sync` but returns a
146        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
147        instead of blocking the current thread.
148        """
149
150        while True:
151            with self._lock:
152                next_scan = self._next_scan
153            if ports := self.find_sync(timeout=0):
154                return ports
155            wait = from_deadline(next_scan)
156            log.debug("Next scan in %.2fs", wait)
157            await asyncio.sleep(wait)

Similar to find_sync but returns a Future instead of blocking the current thread.

def connect_sync( self, timeout: float | int | None = None) -> SerialConnection | None:
159    def connect_sync(
160        self, timeout: float | int | None = None
161    ) -> SerialConnection | None:
162        """
163        If a connection is established and healthy, returns it immediately.
164
165        Otherwise, waits up to `timeout` seconds (forever for `None`) for
166        serial port(s) to appear matching this tracker's requirements, then
167        attempts a new connection. If the connection succeeds, it is remembered
168        and returned, otherwise scanning resumes.
169
170        If multiple ports match the requirements, connections are attempted
171        to each, and the first success (if any) is remembered and returned.
172
173        Returns `None` on timeout.
174
175        Raises:
176        - `SerialScanException`: System error scanning ports
177        """
178
179        deadline = to_deadline(timeout)
180        ports: list[SerialPort] = []
181        while True:
182            with self._lock:
183                if self._conn:
184                    try:
185                        self._conn.write(b"")  # check for liveness
186                        return self._conn
187                    except SerialIoClosed:
188                        log.debug("%s closed", self._conn.port_name)
189                        self._conn = None
190                    except SerialIoException as exc:
191                        name = self._conn.port_name
192                        log.warning("%s failed (%s)", name, exc)
193                        self._conn.close()
194                        self._conn = None
195
196                ports.sort(key=lambda p: p.attr.get("time", ""), reverse=True)
197                for port in ports:
198                    try:
199                        self._conn = SerialConnection(
200                            port=port, opts=self._conn_opts
201                        )
202                        return self._conn
203                    except SerialOpenException as exc:
204                        log.warning("Can't open %s (%s)", port, exc)
205                        self._scan_matched = []  # force re-scan on error
206
207            if not (ports := self.find_sync(timeout=from_deadline(deadline))):
208                return None

If a connection is established and healthy, returns it immediately.

Otherwise, waits up to timeout seconds (forever for None) for serial port(s) to appear matching this tracker's requirements, then attempts a new connection. If the connection succeeds, it is remembered and returned, otherwise scanning resumes.

If multiple ports match the requirements, connections are attempted to each, and the first success (if any) is remembered and returned.

Returns None on timeout.

Raises:

async def connect_async(self) -> SerialConnection:
210    async def connect_async(self) -> SerialConnection:
211        """
212        Similar to `connect_sync` but returns a
213        [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future)
214        instead of blocking the current thread.
215        """
216
217        while True:
218            with self._lock:
219                next_scan = self._next_scan
220            if conn := self.connect_sync(timeout=0):
221                return conn
222            wait = from_deadline(next_scan)
223            log.debug("Next scan in %.2fs", wait)
224            await asyncio.sleep(wait)

Similar to connect_sync but returns a Future instead of blocking the current thread.

matcher: SerialPortMatcher
226    @property
227    def matcher(self) -> SerialPortMatcher:
228        """The `SerialPortMatcher` used by this tracker."""
229        return self._match

The SerialPortMatcher used by this tracker.

@dataclasses.dataclass(frozen=True)
class TrackerOptions:
22@dataclasses.dataclass(frozen=True)
23class TrackerOptions:
24    """Optional parameters for `SerialPortTracker`."""
25
26    scan_interval: float | int = 0.5
27    """Seconds between port re-scans when waiting for a match."""

Optional parameters for SerialPortTracker.

TrackerOptions(scan_interval: float | int = 0.5)
scan_interval: float | int = 0.5

Seconds between port re-scans when waiting for a match.

SerialSharingType = typing.Literal['oblivious', 'polite', 'exclusive', 'stomp']
class SerialException(builtins.OSError):
 5class SerialException(OSError):
 6    """Exception base class for `okserial` I/O errors."""
 7
 8    port: str | None
 9    """The device name of the serial port involved in the error."""
10
11    def __init__(self, message: str, port: str | None = None):
12        super().__init__(f"{port}: {message}" if port else message)
13        self.port = port

Exception base class for okserial I/O errors.

SerialException(message: str, port: str | None = None)
11    def __init__(self, message: str, port: str | None = None):
12        super().__init__(f"{port}: {message}" if port else message)
13        self.port = port
port: str | None

The device name of the serial port involved in the error.

class SerialIoClosed(ok_serial.SerialIoException):
22class SerialIoClosed(SerialIoException):
23    """Exception raised when I/O is attempted on a closed serial port."""
24
25    pass

Exception raised when I/O is attempted on a closed serial port.

class SerialIoException(ok_serial.SerialException):
16class SerialIoException(SerialException):
17    """Exception raised for I/O errors communicating with serial ports."""
18
19    pass

Exception raised for I/O errors communicating with serial ports.

class SerialMatcherInvalid(builtins.ValueError):
46class SerialMatcherInvalid(ValueError):
47    """Exception raised when a port matcher string is syntactically invalid."""
48
49    pass

Exception raised when a port matcher string is syntactically invalid.

class SerialOpenBusy(ok_serial.SerialOpenException):
34class SerialOpenBusy(SerialOpenException):
35    """Exception raised if an open attempt failes due to port contention."""
36
37    pass

Exception raised if an open attempt failes due to port contention.

class SerialOpenException(ok_serial.SerialException):
28class SerialOpenException(SerialException):
29    """Exception raised for system errors opening a serial port."""
30
31    pass

Exception raised for system errors opening a serial port.

class SerialScanException(ok_serial.SerialException):
40class SerialScanException(SerialException):
41    """Exception raised for system errors scanning available ports."""
42
43    pass

Exception raised for system errors scanning available ports.