ok_serial
A Python serial port library (based on PySerial) with improved port discovery and I/O semantics. (Usage guide)
1""" 2A Python serial port library 3(based on [PySerial](https://www.pyserial.com/)) 4with improved port discovery 5and I/O semantics. 6[(Usage guide)](https://github.com/egnor/ok-py-serial#readme) 7""" 8 9from beartype.claw import beartype_this_package as _beartype_me 10 11# ruff: noqa: E402 12_beartype_me() 13 14from ok_serial._connection import ( 15 SerialConnection, 16 SerialConnectionOptions, 17 SerialControlSignals, 18) 19 20from ok_serial._scanning import scan_serial_ports, SerialPort 21from ok_serial._matcher import SerialPortMatcher 22from ok_serial._tracker import SerialPortTracker, TrackerOptions 23from ok_serial._locking import SerialSharingType 24 25from ok_serial._exceptions import ( 26 SerialException, 27 SerialIoClosed, 28 SerialIoException, 29 SerialMatcherInvalid, 30 SerialOpenBusy, 31 SerialOpenException, 32 SerialScanException, 33) 34 35__all__ = [n for n in globals() if not n.startswith("_")]
53class SerialConnection(contextlib.AbstractContextManager): 54 """An open connection to a serial port.""" 55 56 def __init__( 57 self, 58 *, 59 match: str | SerialPortMatcher | None = None, 60 port: str | SerialPort | None = None, 61 opts: SerialConnectionOptions = SerialConnectionOptions(), 62 **kwargs, 63 ): 64 """ 65 Opens a serial port to make it available for use. 66 - `match` can be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching exactly one port... 67 - OR `port` must name a raw system serial device to open. 68 - `opts` can define baud rate and other port parameters... 69 - OR other keywords are forwarded to `SerialConnectionOptions` 70 71 Call `close` to release the port; use 72 `SerialConnection` as the target of a 73 [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with) 74 to automatically close the port on exit from the `with` body. 75 76 Example: 77 ``` 78 with SerialConnection(match="vid_pid=0403:6001", baud=115200, sharing="polite") as p: 79 ... interact with `p` ... 80 # automatically closed on exit from block 81 ``` 82 83 Raises: 84 - `SerialOpenException`: I/O error opening the specified port 85 - `SerialOpenBusy`: The port is already in use 86 - `SerialScanException`: System error scanning ports to find `match` 87 - `SerialMatcherInvalid`: Bad format of `match` string 88 """ 89 90 assert (match is not None) + (port is not None) == 1 91 opts = dataclasses.replace(opts, **kwargs) 92 93 if match is not None: 94 if isinstance(match, str): 95 match = SerialPortMatcher(match) 96 if not (found := scan_serial_ports()): 97 raise _exceptions.SerialOpenException("No ports found") 98 if not (matched := match.filter(found)): 99 msg = f"No ports match {match!r}" 100 raise _exceptions.SerialOpenException(msg) 101 if len(matched) > 1: 102 matched_text = "".join(f"\n {p}" for p in matched) 103 msg = f'Multiple ports match "{match}": {matched_text}' 104 raise _exceptions.SerialOpenException(msg) 105 port = matched[0].name 106 log.debug("Scanned %r, found %s", match, port) 107 108 assert port is not None 109 if isinstance(port, SerialPort): 110 port = port.name 111 112 with contextlib.ExitStack() as cleanup: 113 cleanup.enter_context(using_lock_file(port, opts.sharing)) 114 115 try: 116 pyserial = cleanup.enter_context( 117 serial.Serial( 118 port=port, 119 baudrate=opts.baud, 120 write_timeout=0.1, 121 ) 122 ) 123 log.debug("Opened %s %s", port, opts) 124 except OSError as ex: 125 if ex.errno == errno.EBUSY: 126 msg = "Serial port busy (EBUSY)" 127 raise _exceptions.SerialOpenBusy(msg, port) from ex 128 else: 129 msg = "Serial port open error" 130 raise _exceptions.SerialOpenException(msg, port) from ex 131 132 if hasattr(pyserial, "fileno"): 133 fd, share = pyserial.fileno(), opts.sharing 134 cleanup.enter_context(using_fd_lock(port, fd, share)) 135 136 self._io = cleanup.enter_context(_IoThreads(pyserial)) 137 self._io.start() 138 self._cleanup = cleanup.pop_all() 139 140 def __del__(self) -> None: 141 if hasattr(self, "_cleanup"): 142 self._cleanup.close() 143 144 def __exit__(self, exc_type, exc_value, traceback) -> None: 145 self._cleanup.__exit__(exc_type, exc_value, traceback) 146 147 def __repr__(self) -> str: 148 return f"SerialConnection({self._io.pyserial.port!r})" 149 150 def close(self) -> None: 151 """ 152 Releases the serial port connection and any associated locks. 153 154 Any I/O operations in progress or attempted after closure will 155 raise an immediate `SerialIoClosed` exception. 156 """ 157 158 self._cleanup.close() 159 160 def read_sync( 161 self, 162 *, 163 timeout: float | int | None = None, 164 ) -> bytes: 165 """ 166 Waits up to `timeout` seconds (forever for `None`) for data, 167 then returns all of it (b"" on timeout). 168 169 Raises: 170 - `SerialIoException`: port I/O failed and there is no matching data 171 - `SerialIoClosed`: the port was closed and there is no matching data 172 """ 173 174 deadline = to_deadline(timeout) 175 while True: 176 with self._io.monitor: 177 if self._io.incoming: 178 output = bytes(self._io.incoming) 179 self._io.incoming.clear() 180 return output 181 elif self._io.exception: 182 raise self._io.exception 183 elif (wait := from_deadline(deadline)) <= 0: 184 return b"" 185 else: 186 self._io.monitor.wait(timeout=wait) 187 188 async def read_async(self) -> bytes: 189 """ 190 Similar to `read_sync` but returns a 191 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 192 instead of blocking the current thread. 193 """ 194 195 while True: 196 future = self._io.create_future_in_loop() # BEFORE read_sync 197 if out := self.read_sync(timeout=0): 198 return out 199 await future 200 201 def write(self, data: bytes | bytearray) -> None: 202 """ 203 Adds data to the outgoing buffer to be sent immediately. 204 Never blocks; the buffer can grow indefinitely. 205 (Use `outgoing_size` and `drain_sync`/`drain_async` to manage 206 buffer size.) 207 208 Raises: 209 - `SerialIoException`: port I/O failed 210 - `SerialIoClosed`: the port was closed 211 """ 212 213 with self._io.monitor: 214 if self._io.exception: 215 raise self._io.exception 216 elif data: 217 self._io.outgoing.extend(data) 218 self._io.monitor.notify_all() 219 220 def drain_sync(self, *, timeout: float | int | None = None) -> bool: 221 """ 222 Waits up to `timeout` seconds (forever for `None`) until 223 all buffered data is transmitted. 224 225 Returns `True` if the drain completed, `False` on timeout. 226 227 Raises: 228 - `SerialIoException`: port I/O failed 229 - `SerialIoClosed`: the port was closed 230 """ 231 232 deadline = to_deadline(timeout) 233 while True: 234 with self._io.monitor: 235 if self._io.exception: 236 raise self._io.exception 237 elif not self._io.outgoing: 238 return True 239 elif (wait := from_deadline(deadline)) <= 0: 240 return False 241 else: 242 self._io.monitor.wait(timeout=wait) 243 244 async def drain_async(self) -> bool: 245 """ 246 Similar to `drain_sync` but returns a 247 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 248 instead of blocking the current thread. 249 """ 250 251 while True: 252 future = self._io.create_future_in_loop() # BEFORE drain_sync 253 if self.drain_sync(timeout=0): 254 return True 255 await future 256 257 def incoming_size(self) -> int: 258 """ 259 Returns the number of bytes waiting to be read. 260 """ 261 with self._io.monitor: 262 return len(self._io.incoming) 263 264 def outgoing_size(self) -> int: 265 """ 266 Returns the number of bytes waiting to be sent. 267 """ 268 with self._io.monitor: 269 return len(self._io.outgoing) 270 271 def set_signals( 272 self, 273 dtr: bool | None = None, 274 rts: bool | None = None, 275 send_break: bool | None = None, 276 ) -> None: 277 """ 278 Sets outgoing 279 [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals) 280 state (use `None` for no change): 281 - `dtr`: assert Data Terminal Ready 282 - `rts`: assert Ready To Send 283 - `send_break`: send a continuous BREAK condition 284 285 Raises: 286 - `SerialIoException`: port I/O failed 287 - `SerialIoClosed`: the port was closed 288 """ 289 290 with self._io.monitor: 291 if self._io.exception: 292 raise self._io.exception 293 try: 294 if dtr is not None: 295 self._io.pyserial.dtr = dtr 296 if rts is not None: 297 self._io.pyserial.rts = rts 298 if send_break is not None: 299 self._io.pyserial.break_condition = send_break 300 except OSError as ex: 301 msg, dev = "Can't set control signals", self._io.pyserial.port 302 self._io.exception = _exceptions.SerialIoException(msg, dev) 303 self._io.exception.__cause__ = ex 304 raise self._io.exception 305 306 def get_signals(self) -> SerialControlSignals: 307 """ 308 Returns the current 309 [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals) state. 310 311 Raises: 312 - `SerialIoException`: port I/O failed 313 - `SerialIoClosed`: the port was closed 314 """ 315 316 with self._io.monitor: 317 if self._io.exception: 318 raise self._io.exception 319 try: 320 return SerialControlSignals( 321 dtr=self._io.pyserial.dtr, 322 dsr=self._io.pyserial.dsr, 323 cts=self._io.pyserial.cts, 324 rts=self._io.pyserial.rts, 325 ri=self._io.pyserial.ri, 326 cd=self._io.pyserial.cd, 327 sending_break=self._io.pyserial.break_condition, 328 ) 329 except OSError as ex: 330 msg, dev = "Can't get control signals", self._io.pyserial.port 331 self._io.exception = _exceptions.SerialIoException(msg, dev) 332 self._io.exception.__cause__ = ex 333 raise self._io.exception 334 335 @property 336 def port_name(self) -> str: 337 """ 338 The port's device name, eg. `/dev/ttyACM0` or `COM3`. 339 """ 340 return self._io.pyserial.port 341 342 @property 343 def pyserial(self) -> serial.Serial: 344 """ 345 The underlying 346 [`pyserial.Serial`](https://pyserial.readthedocs.io/en/latest/pyserial_api.html#serial.Serial) 347 object (API escape hatch). 348 """ 349 return self._io.pyserial 350 351 def fileno(self) -> int: 352 """ 353 The [Unix FD](https://en.wikipedia.org/wiki/File_descriptor) 354 for the serial connection, -1 if not available. 355 """ 356 try: 357 return self._io.serial.fileno() 358 except AttributeError: 359 return -1
An open connection to a serial port.
56 def __init__( 57 self, 58 *, 59 match: str | SerialPortMatcher | None = None, 60 port: str | SerialPort | None = None, 61 opts: SerialConnectionOptions = SerialConnectionOptions(), 62 **kwargs, 63 ): 64 """ 65 Opens a serial port to make it available for use. 66 - `match` can be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching exactly one port... 67 - OR `port` must name a raw system serial device to open. 68 - `opts` can define baud rate and other port parameters... 69 - OR other keywords are forwarded to `SerialConnectionOptions` 70 71 Call `close` to release the port; use 72 `SerialConnection` as the target of a 73 [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with) 74 to automatically close the port on exit from the `with` body. 75 76 Example: 77 ``` 78 with SerialConnection(match="vid_pid=0403:6001", baud=115200, sharing="polite") as p: 79 ... interact with `p` ... 80 # automatically closed on exit from block 81 ``` 82 83 Raises: 84 - `SerialOpenException`: I/O error opening the specified port 85 - `SerialOpenBusy`: The port is already in use 86 - `SerialScanException`: System error scanning ports to find `match` 87 - `SerialMatcherInvalid`: Bad format of `match` string 88 """ 89 90 assert (match is not None) + (port is not None) == 1 91 opts = dataclasses.replace(opts, **kwargs) 92 93 if match is not None: 94 if isinstance(match, str): 95 match = SerialPortMatcher(match) 96 if not (found := scan_serial_ports()): 97 raise _exceptions.SerialOpenException("No ports found") 98 if not (matched := match.filter(found)): 99 msg = f"No ports match {match!r}" 100 raise _exceptions.SerialOpenException(msg) 101 if len(matched) > 1: 102 matched_text = "".join(f"\n {p}" for p in matched) 103 msg = f'Multiple ports match "{match}": {matched_text}' 104 raise _exceptions.SerialOpenException(msg) 105 port = matched[0].name 106 log.debug("Scanned %r, found %s", match, port) 107 108 assert port is not None 109 if isinstance(port, SerialPort): 110 port = port.name 111 112 with contextlib.ExitStack() as cleanup: 113 cleanup.enter_context(using_lock_file(port, opts.sharing)) 114 115 try: 116 pyserial = cleanup.enter_context( 117 serial.Serial( 118 port=port, 119 baudrate=opts.baud, 120 write_timeout=0.1, 121 ) 122 ) 123 log.debug("Opened %s %s", port, opts) 124 except OSError as ex: 125 if ex.errno == errno.EBUSY: 126 msg = "Serial port busy (EBUSY)" 127 raise _exceptions.SerialOpenBusy(msg, port) from ex 128 else: 129 msg = "Serial port open error" 130 raise _exceptions.SerialOpenException(msg, port) from ex 131 132 if hasattr(pyserial, "fileno"): 133 fd, share = pyserial.fileno(), opts.sharing 134 cleanup.enter_context(using_fd_lock(port, fd, share)) 135 136 self._io = cleanup.enter_context(_IoThreads(pyserial)) 137 self._io.start() 138 self._cleanup = cleanup.pop_all()
Opens a serial port to make it available for use.
matchcan be a port match expression matching exactly one port...- OR
portmust name a raw system serial device to open.
- OR
optscan define baud rate and other port parameters...- OR other keywords are forwarded to
SerialConnectionOptions
- OR other keywords are forwarded to
Call close to release the port; use
SerialConnection as the target of a
with statement
to automatically close the port on exit from the with body.
Example:
with SerialConnection(match="vid_pid=0403:6001", baud=115200, sharing="polite") as p:
... interact with `p` ...
# automatically closed on exit from block
Raises:
SerialOpenException: I/O error opening the specified portSerialOpenBusy: The port is already in useSerialScanException: System error scanning ports to findmatchSerialMatcherInvalid: Bad format ofmatchstring
150 def close(self) -> None: 151 """ 152 Releases the serial port connection and any associated locks. 153 154 Any I/O operations in progress or attempted after closure will 155 raise an immediate `SerialIoClosed` exception. 156 """ 157 158 self._cleanup.close()
Releases the serial port connection and any associated locks.
Any I/O operations in progress or attempted after closure will
raise an immediate SerialIoClosed exception.
160 def read_sync( 161 self, 162 *, 163 timeout: float | int | None = None, 164 ) -> bytes: 165 """ 166 Waits up to `timeout` seconds (forever for `None`) for data, 167 then returns all of it (b"" on timeout). 168 169 Raises: 170 - `SerialIoException`: port I/O failed and there is no matching data 171 - `SerialIoClosed`: the port was closed and there is no matching data 172 """ 173 174 deadline = to_deadline(timeout) 175 while True: 176 with self._io.monitor: 177 if self._io.incoming: 178 output = bytes(self._io.incoming) 179 self._io.incoming.clear() 180 return output 181 elif self._io.exception: 182 raise self._io.exception 183 elif (wait := from_deadline(deadline)) <= 0: 184 return b"" 185 else: 186 self._io.monitor.wait(timeout=wait)
Waits up to timeout seconds (forever for None) for data,
then returns all of it (b"" on timeout).
Raises:
SerialIoException: port I/O failed and there is no matching dataSerialIoClosed: the port was closed and there is no matching data
188 async def read_async(self) -> bytes: 189 """ 190 Similar to `read_sync` but returns a 191 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 192 instead of blocking the current thread. 193 """ 194 195 while True: 196 future = self._io.create_future_in_loop() # BEFORE read_sync 197 if out := self.read_sync(timeout=0): 198 return out 199 await future
201 def write(self, data: bytes | bytearray) -> None: 202 """ 203 Adds data to the outgoing buffer to be sent immediately. 204 Never blocks; the buffer can grow indefinitely. 205 (Use `outgoing_size` and `drain_sync`/`drain_async` to manage 206 buffer size.) 207 208 Raises: 209 - `SerialIoException`: port I/O failed 210 - `SerialIoClosed`: the port was closed 211 """ 212 213 with self._io.monitor: 214 if self._io.exception: 215 raise self._io.exception 216 elif data: 217 self._io.outgoing.extend(data) 218 self._io.monitor.notify_all()
Adds data to the outgoing buffer to be sent immediately.
Never blocks; the buffer can grow indefinitely.
(Use outgoing_size and drain_sync/drain_async to manage
buffer size.)
Raises:
SerialIoException: port I/O failedSerialIoClosed: the port was closed
220 def drain_sync(self, *, timeout: float | int | None = None) -> bool: 221 """ 222 Waits up to `timeout` seconds (forever for `None`) until 223 all buffered data is transmitted. 224 225 Returns `True` if the drain completed, `False` on timeout. 226 227 Raises: 228 - `SerialIoException`: port I/O failed 229 - `SerialIoClosed`: the port was closed 230 """ 231 232 deadline = to_deadline(timeout) 233 while True: 234 with self._io.monitor: 235 if self._io.exception: 236 raise self._io.exception 237 elif not self._io.outgoing: 238 return True 239 elif (wait := from_deadline(deadline)) <= 0: 240 return False 241 else: 242 self._io.monitor.wait(timeout=wait)
Waits up to timeout seconds (forever for None) until
all buffered data is transmitted.
Returns True if the drain completed, False on timeout.
Raises:
SerialIoException: port I/O failedSerialIoClosed: the port was closed
244 async def drain_async(self) -> bool: 245 """ 246 Similar to `drain_sync` but returns a 247 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 248 instead of blocking the current thread. 249 """ 250 251 while True: 252 future = self._io.create_future_in_loop() # BEFORE drain_sync 253 if self.drain_sync(timeout=0): 254 return True 255 await future
Similar to drain_sync but returns a
Future
instead of blocking the current thread.
257 def incoming_size(self) -> int: 258 """ 259 Returns the number of bytes waiting to be read. 260 """ 261 with self._io.monitor: 262 return len(self._io.incoming)
Returns the number of bytes waiting to be read.
264 def outgoing_size(self) -> int: 265 """ 266 Returns the number of bytes waiting to be sent. 267 """ 268 with self._io.monitor: 269 return len(self._io.outgoing)
Returns the number of bytes waiting to be sent.
271 def set_signals( 272 self, 273 dtr: bool | None = None, 274 rts: bool | None = None, 275 send_break: bool | None = None, 276 ) -> None: 277 """ 278 Sets outgoing 279 [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals) 280 state (use `None` for no change): 281 - `dtr`: assert Data Terminal Ready 282 - `rts`: assert Ready To Send 283 - `send_break`: send a continuous BREAK condition 284 285 Raises: 286 - `SerialIoException`: port I/O failed 287 - `SerialIoClosed`: the port was closed 288 """ 289 290 with self._io.monitor: 291 if self._io.exception: 292 raise self._io.exception 293 try: 294 if dtr is not None: 295 self._io.pyserial.dtr = dtr 296 if rts is not None: 297 self._io.pyserial.rts = rts 298 if send_break is not None: 299 self._io.pyserial.break_condition = send_break 300 except OSError as ex: 301 msg, dev = "Can't set control signals", self._io.pyserial.port 302 self._io.exception = _exceptions.SerialIoException(msg, dev) 303 self._io.exception.__cause__ = ex 304 raise self._io.exception
Sets outgoing
RS-232 modem control line
state (use None for no change):
dtr: assert Data Terminal Readyrts: assert Ready To Sendsend_break: send a continuous BREAK condition
Raises:
SerialIoException: port I/O failedSerialIoClosed: the port was closed
306 def get_signals(self) -> SerialControlSignals: 307 """ 308 Returns the current 309 [RS-232 modem control line](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals) state. 310 311 Raises: 312 - `SerialIoException`: port I/O failed 313 - `SerialIoClosed`: the port was closed 314 """ 315 316 with self._io.monitor: 317 if self._io.exception: 318 raise self._io.exception 319 try: 320 return SerialControlSignals( 321 dtr=self._io.pyserial.dtr, 322 dsr=self._io.pyserial.dsr, 323 cts=self._io.pyserial.cts, 324 rts=self._io.pyserial.rts, 325 ri=self._io.pyserial.ri, 326 cd=self._io.pyserial.cd, 327 sending_break=self._io.pyserial.break_condition, 328 ) 329 except OSError as ex: 330 msg, dev = "Can't get control signals", self._io.pyserial.port 331 self._io.exception = _exceptions.SerialIoException(msg, dev) 332 self._io.exception.__cause__ = ex 333 raise self._io.exception
Returns the current RS-232 modem control line state.
Raises:
SerialIoException: port I/O failedSerialIoClosed: the port was closed
335 @property 336 def port_name(self) -> str: 337 """ 338 The port's device name, eg. `/dev/ttyACM0` or `COM3`. 339 """ 340 return self._io.pyserial.port
The port's device name, eg. /dev/ttyACM0 or COM3.
342 @property 343 def pyserial(self) -> serial.Serial: 344 """ 345 The underlying 346 [`pyserial.Serial`](https://pyserial.readthedocs.io/en/latest/pyserial_api.html#serial.Serial) 347 object (API escape hatch). 348 """ 349 return self._io.pyserial
The underlying
pyserial.Serial
object (API escape hatch).
351 def fileno(self) -> int: 352 """ 353 The [Unix FD](https://en.wikipedia.org/wiki/File_descriptor) 354 for the serial connection, -1 if not available. 355 """ 356 try: 357 return self._io.serial.fileno() 358 except AttributeError: 359 return -1
The Unix FD for the serial connection, -1 if not available.
20@dataclasses.dataclass(frozen=True) 21class SerialConnectionOptions: 22 """Optional parameters for `SerialConnection`.""" 23 24 baud: int = 115200 25 """The [baud rate](https://en.wikipedia.org/wiki/Baud) to use.""" 26 sharing: SerialSharingType = "exclusive" 27 """ 28 Port access negotiation strategy: 29 - `"oblivious"`: Don't perform any locking. 30 - `"polite"`: Defer to other users, don't lock the port. 31 - `"exclusive":` Require exclusive access, lock the port or fail. 32 - `"stomp"`: Try to kill other users, try to lock the port, open the 33 port regardless. Use with care! 34 """
Optional parameters for SerialConnection.
Port access negotiation strategy:
"oblivious": Don't perform any locking."polite": Defer to other users, don't lock the port."exclusive":Require exclusive access, lock the port or fail."stomp": Try to kill other users, try to lock the port, open the port regardless. Use with care!
37@dataclasses.dataclass(frozen=True) 38class SerialControlSignals: 39 """ 40 [RS-232 modem control lines](https://en.wikipedia.org/wiki/RS-232#Data_and_control_signals), 41 outgoing ("DTE to DCE") and incoming ("DCE to DTE"). 42 """ 43 44 dtr: bool 45 dsr: bool 46 cts: bool 47 rts: bool 48 ri: bool 49 cd: bool 50 sending_break: bool
RS-232 modem control lines, outgoing ("DTE to DCE") and incoming ("DCE to DTE").
37def scan_serial_ports() -> list[SerialPort]: 38 """ 39 Returns a list of serial ports currently attached to the system. 40 41 For testing and encapsulation, if the environment variable 42 `$OK_SERIAL_SCAN_OVERRIDE` is the pathname of a JSON file in 43 `{"port-name": {"attr": "value", ...}, ...}` format, that port listing 44 is returned instead of actual system scan results. 45 46 Raises: 47 - `SerialScanException`: System error scanning ports 48 """ 49 50 if ov_path := os.getenv("OK_SERIAL_SCAN_OVERRIDE"): 51 try: 52 out = _ports_from_json_text(pathlib.Path(ov_path).read_text()) 53 except (OSError, ValueError) as ex: 54 msg = f"Can't read $OK_SERIAL_SCAN_OVERRIDE {ov_path}" 55 raise SerialScanException(msg) from ex 56 57 log.debug("Read $OK_SERIAL_SCAN_OVERRIDE %s", ov_path) 58 else: 59 try: 60 pyserial_ports = list_ports.comports() 61 except OSError as ex: 62 raise SerialScanException("Can't scan serial") from ex 63 out = [] 64 for pyserial_port in pyserial_ports: 65 if port := _port_from_pyserial(pyserial_port): 66 out.append(port) 67 68 out.sort(key=natsort.natsort_keygen(key=lambda p: p.name, alg=natsort.ns.P)) 69 log.debug("Found %d ports", len(out)) 70 return out
Returns a list of serial ports currently attached to the system.
For testing and encapsulation, if the environment variable
$OK_SERIAL_SCAN_OVERRIDE is the pathname of a JSON file in
{"port-name": {"attr": "value", ...}, ...} format, that port listing
is returned instead of actual system scan results.
Raises:
SerialScanException: System error scanning ports
21@dataclasses.dataclass(frozen=True) 22class SerialPort: 23 """Metadata about a serial port found on the system""" 24 25 name: str 26 """The OS device identifier, eg. `/dev/ttyUSB3`, 'COM4', etc.""" 27 28 attr: dict[str, str] 29 """ 30 [Metadata](https://github.com/egnor/py-ok-serial#serial-port-attributes) 31 """ 32 33 def __str__(self): 34 return self.name
Metadata about a serial port found on the system
57class SerialPortMatcher: 58 """A parsed expression for identifying serial ports of interest.""" 59 60 def __init__(self, match: str): 61 """Parses a 62 [serial port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) 63 in preparation to identify matching `SerialPort` objects. 64 """ 65 66 self._input = match 67 self._pos: list[_Rule] = [] 68 self._neg: list[_Rule] = [] 69 self._oldest = False 70 self._newest = False 71 72 next_pos = 0 73 while next_pos < len(match): 74 tm = _TERM_RE.match(match, pos=next_pos) 75 if not (tm and tm[0]): 76 jmatch = json.dumps(match) # use JSON for consistent quoting 77 jpre = json.dumps(match[:next_pos]) 78 msg = f"Bad port expr:\n {jmatch}\n -{'-' * (len(jpre) - 1)}^" 79 raise SerialMatcherInvalid(msg) 80 81 next_pos, groups = tm.end(), tm.groups(default="") 82 rx_att, rx_ex, rx_text, att, eq_ex, ex, qstr, num, naked = groups 83 out_list = self._neg if bool(rx_ex or eq_ex or ex) else self._pos 84 if rx_text: 85 try: 86 out_list.append(_Rule(rx_att, re.compile(rx_text))) 87 except re.error as ex: 88 msg = f"Bad port regex: /{rx_text}/" 89 raise SerialMatcherInvalid(msg) from ex 90 elif qstr: 91 rx = _qstr_rx(qstr, glob=False, full=bool(att)) 92 out_list.append(_Rule(att, rx)) 93 elif num: 94 if num[-1:] in "hH": 95 value = int("{num[:-1]", 16) 96 elif num.isdigit(): 97 value = int(num, 10) # int('0123', 0) is an error! 98 else: 99 value = int(num, 0) 100 rx_text = f"({num}|0*{value}|(0x)?0*{value:x}h?)" 101 rx = _wrap_rx(rx_text, full=bool(att), wb=True, we=True) 102 out_list.append(_Rule(att, rx)) 103 elif _OLDEST_RE.match(naked) and not att and out_list is self._pos: 104 self._oldest = True 105 elif _NEWEST_RE.match(naked) and not att and out_list is self._pos: 106 self._newest = True 107 elif naked: 108 rx = _qstr_rx(naked, glob=True, full=bool(att)) 109 out_list.append(_Rule(att, rx)) 110 else: 111 assert False, f"bad term match: {tm[0]!r}" 112 113 if self._oldest and self._newest: 114 msg = f"oldest & newest: {match!r}" 115 raise SerialMatcherInvalid(msg) 116 elif not self: 117 log.debug("Parsed %s (any port)", repr(match)) 118 elif log.isEnabledFor(logging.DEBUG): 119 rules = "".join(f"\n {r}" for r in self.patterns()) 120 log.debug("Parsed %s:%s", repr(match), rules) 121 122 def __repr__(self) -> str: 123 return f"SerialPortMatcher({self._input!r})" 124 125 def __str__(self) -> str: 126 return self._input 127 128 def __bool__(self) -> bool: 129 return bool(self._oldest or self._newest or self._pos or self._neg) 130 131 def patterns(self) -> list[str]: 132 return [ 133 *(["oldest"] if self._oldest else []), 134 *(["newest"] if self._newest else []), 135 *[str(r).replace("~/", "!~/", 1) for r in self._neg], 136 *[str(r) for r in self._pos], 137 ] 138 139 def filter(self, ports: list[SerialPort]) -> list[SerialPort]: 140 """Filters a list of ports according to the match criteria.""" 141 142 matches = [] 143 for p in ports: 144 attrs = list(p.attr.items()) 145 pos = all(any(r.match(k, v) for k, v in attrs) for r in self._pos) 146 neg = any(r.match(k, v) for k, v in attrs for r in self._neg) 147 if pos and not neg: 148 matches.append(p) 149 150 if self._oldest or self._newest: 151 timed = [m for m in matches if "time" in p.attr] 152 timed.sort(key=lambda m: m.attr["time"]) 153 return timed[:1] if self._oldest else timed[-1:] 154 155 return matches 156 157 def attr_hit(self, port: SerialPort, key: str) -> bool: 158 """True if the attribute given by 'key' is a positive match.""" 159 160 if v := port.attr.get(key, ""): 161 return any(r.match(key, v) for r in self._pos) 162 return False
A parsed expression for identifying serial ports of interest.
60 def __init__(self, match: str): 61 """Parses a 62 [serial port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) 63 in preparation to identify matching `SerialPort` objects. 64 """ 65 66 self._input = match 67 self._pos: list[_Rule] = [] 68 self._neg: list[_Rule] = [] 69 self._oldest = False 70 self._newest = False 71 72 next_pos = 0 73 while next_pos < len(match): 74 tm = _TERM_RE.match(match, pos=next_pos) 75 if not (tm and tm[0]): 76 jmatch = json.dumps(match) # use JSON for consistent quoting 77 jpre = json.dumps(match[:next_pos]) 78 msg = f"Bad port expr:\n {jmatch}\n -{'-' * (len(jpre) - 1)}^" 79 raise SerialMatcherInvalid(msg) 80 81 next_pos, groups = tm.end(), tm.groups(default="") 82 rx_att, rx_ex, rx_text, att, eq_ex, ex, qstr, num, naked = groups 83 out_list = self._neg if bool(rx_ex or eq_ex or ex) else self._pos 84 if rx_text: 85 try: 86 out_list.append(_Rule(rx_att, re.compile(rx_text))) 87 except re.error as ex: 88 msg = f"Bad port regex: /{rx_text}/" 89 raise SerialMatcherInvalid(msg) from ex 90 elif qstr: 91 rx = _qstr_rx(qstr, glob=False, full=bool(att)) 92 out_list.append(_Rule(att, rx)) 93 elif num: 94 if num[-1:] in "hH": 95 value = int("{num[:-1]", 16) 96 elif num.isdigit(): 97 value = int(num, 10) # int('0123', 0) is an error! 98 else: 99 value = int(num, 0) 100 rx_text = f"({num}|0*{value}|(0x)?0*{value:x}h?)" 101 rx = _wrap_rx(rx_text, full=bool(att), wb=True, we=True) 102 out_list.append(_Rule(att, rx)) 103 elif _OLDEST_RE.match(naked) and not att and out_list is self._pos: 104 self._oldest = True 105 elif _NEWEST_RE.match(naked) and not att and out_list is self._pos: 106 self._newest = True 107 elif naked: 108 rx = _qstr_rx(naked, glob=True, full=bool(att)) 109 out_list.append(_Rule(att, rx)) 110 else: 111 assert False, f"bad term match: {tm[0]!r}" 112 113 if self._oldest and self._newest: 114 msg = f"oldest & newest: {match!r}" 115 raise SerialMatcherInvalid(msg) 116 elif not self: 117 log.debug("Parsed %s (any port)", repr(match)) 118 elif log.isEnabledFor(logging.DEBUG): 119 rules = "".join(f"\n {r}" for r in self.patterns()) 120 log.debug("Parsed %s:%s", repr(match), rules)
Parses a
serial port match expression
in preparation to identify matching SerialPort objects.
139 def filter(self, ports: list[SerialPort]) -> list[SerialPort]: 140 """Filters a list of ports according to the match criteria.""" 141 142 matches = [] 143 for p in ports: 144 attrs = list(p.attr.items()) 145 pos = all(any(r.match(k, v) for k, v in attrs) for r in self._pos) 146 neg = any(r.match(k, v) for k, v in attrs for r in self._neg) 147 if pos and not neg: 148 matches.append(p) 149 150 if self._oldest or self._newest: 151 timed = [m for m in matches if "time" in p.attr] 152 timed.sort(key=lambda m: m.attr["time"]) 153 return timed[:1] if self._oldest else timed[-1:] 154 155 return matches
Filters a list of ports according to the match criteria.
30class SerialPortTracker(contextlib.AbstractContextManager): 31 """ 32 Utility class to maintain a connection to a serial port of interest, 33 re-scanning and re-connecting as needed after errors, with periodic retry. 34 This is used for robust communication with a serial device which might be 35 plugged and unplugged during operation. 36 """ 37 38 def __init__( 39 self, 40 match: str | SerialPortMatcher, 41 *, 42 baud: int = 0, 43 topts: TrackerOptions = TrackerOptions(), 44 copts: SerialConnectionOptions = SerialConnectionOptions(), 45 ): 46 """ 47 Prepare to manage a serial port connection. 48 - `match` must be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching the port of interest 49 - `topts` can define parameters for tracking (eg. re-scan interval) 50 - `copts` can define parameters for connecting (eg. baud rate) 51 - OR `baud` can set the baud rate (as a shortcut) 52 53 Actual port scans and connections only happen after `find_*` or 54 `connect_*` methods are called. Call `close` to end any open 55 connection; use `SerialPortTracker` as the target of a 56 [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with) 57 to automatically close the port on exit from the `with` body. 58 59 Raises: 60 - `SerialMatcherInvalid`: Bad format of `match` string 61 """ 62 63 if isinstance(match, str): 64 match = SerialPortMatcher(match) 65 if baud: 66 copts = dataclasses.replace(copts, baud=baud) 67 68 self._match = match 69 self._tracker_opts = topts 70 self._conn_opts = copts 71 72 self._lock = threading.Lock() 73 self._scan_keys: set[str] = set() 74 self._scan_matched: list[SerialPort] = [] 75 self._scan_deadline = 0.0 76 self._next_scan = 0.0 77 self._conn: SerialConnection | None = None 78 79 log.debug("Tracking: %s", repr(str(match)) if match else "(any port)") 80 81 def __exit__(self, exc_type, exc_value, traceback): 82 self.close() 83 84 def __repr__(self) -> str: 85 return ( 86 f"SerialPortTracker({self._match!r}, " 87 f"topts={self._tracker_opts!r}, " 88 f"copts={self._conn_opts!r})" 89 ) 90 91 def close(self) -> None: 92 """ 93 Closes any open connection with `SerialConnection.close`. A subsequent 94 call to `connect_sync`/`connect_async` will establish a new connection. 95 """ 96 97 with self._lock: 98 if self._conn: 99 self._conn.close() 100 101 def find_sync(self, timeout: float | int | None = None) -> list[SerialPort]: 102 """ 103 Waits up to `timeout` seconds (forever for `None`) until serial port(s) 104 exist matching this tracker's requirements. Rescans periodically while 105 waiting (see `TrackerOptions.scan_interval`). 106 107 Returns a list of matching `SerialPort` objects, or `[]` on timeout. 108 109 Raises: 110 - `SerialScanException`: System error scanning ports 111 """ 112 113 def key(p: SerialPort) -> str: 114 return p.name + "@" + p.attr.get("time", "") 115 116 deadline = to_deadline(timeout) 117 while True: 118 with self._lock: 119 if (wait := from_deadline(self._next_scan)) <= 0: 120 wait = self._tracker_opts.scan_interval 121 found = scan_serial_ports() 122 for p in found if self._next_scan else []: # not first scan 123 if key(p) not in self._scan_keys: 124 p.attr["tracking"] = "new" 125 126 matched = self._match.filter(found) 127 self._next_scan = to_deadline(wait) 128 self._scan_keys = set(key(p) for p in found) 129 self._scan_matched = matched 130 nf, nm = len(found), len(matched) 131 log.debug("%d/%d ports match %r", nm, nf, str(self._match)) 132 133 if self._scan_matched: 134 return self._scan_matched 135 136 timeout_wait = from_deadline(deadline) 137 if timeout_wait < wait: 138 return [] 139 140 log.debug("Next scan in %.2fs", wait) 141 time.sleep(wait) 142 143 async def find_async(self) -> list[SerialPort]: 144 """ 145 Similar to `find_sync` but returns a 146 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 147 instead of blocking the current thread. 148 """ 149 150 while True: 151 with self._lock: 152 next_scan = self._next_scan 153 if ports := self.find_sync(timeout=0): 154 return ports 155 wait = from_deadline(next_scan) 156 log.debug("Next scan in %.2fs", wait) 157 await asyncio.sleep(wait) 158 159 def connect_sync( 160 self, timeout: float | int | None = None 161 ) -> SerialConnection | None: 162 """ 163 If a connection is established and healthy, returns it immediately. 164 165 Otherwise, waits up to `timeout` seconds (forever for `None`) for 166 serial port(s) to appear matching this tracker's requirements, then 167 attempts a new connection. If the connection succeeds, it is remembered 168 and returned, otherwise scanning resumes. 169 170 If multiple ports match the requirements, connections are attempted 171 to each, and the first success (if any) is remembered and returned. 172 173 Returns `None` on timeout. 174 175 Raises: 176 - `SerialScanException`: System error scanning ports 177 """ 178 179 deadline = to_deadline(timeout) 180 ports: list[SerialPort] = [] 181 while True: 182 with self._lock: 183 if self._conn: 184 try: 185 self._conn.write(b"") # check for liveness 186 return self._conn 187 except SerialIoClosed: 188 log.debug("%s closed", self._conn.port_name) 189 self._conn = None 190 except SerialIoException as exc: 191 name = self._conn.port_name 192 log.warning("%s failed (%s)", name, exc) 193 self._conn.close() 194 self._conn = None 195 196 ports.sort(key=lambda p: p.attr.get("time", ""), reverse=True) 197 for port in ports: 198 try: 199 self._conn = SerialConnection( 200 port=port, opts=self._conn_opts 201 ) 202 return self._conn 203 except SerialOpenException as exc: 204 log.warning("Can't open %s (%s)", port, exc) 205 self._scan_matched = [] # force re-scan on error 206 207 if not (ports := self.find_sync(timeout=from_deadline(deadline))): 208 return None 209 210 async def connect_async(self) -> SerialConnection: 211 """ 212 Similar to `connect_sync` but returns a 213 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 214 instead of blocking the current thread. 215 """ 216 217 while True: 218 with self._lock: 219 next_scan = self._next_scan 220 if conn := self.connect_sync(timeout=0): 221 return conn 222 wait = from_deadline(next_scan) 223 log.debug("Next scan in %.2fs", wait) 224 await asyncio.sleep(wait) 225 226 @property 227 def matcher(self) -> SerialPortMatcher: 228 """The `SerialPortMatcher` used by this tracker.""" 229 return self._match
Utility class to maintain a connection to a serial port of interest, re-scanning and re-connecting as needed after errors, with periodic retry. This is used for robust communication with a serial device which might be plugged and unplugged during operation.
38 def __init__( 39 self, 40 match: str | SerialPortMatcher, 41 *, 42 baud: int = 0, 43 topts: TrackerOptions = TrackerOptions(), 44 copts: SerialConnectionOptions = SerialConnectionOptions(), 45 ): 46 """ 47 Prepare to manage a serial port connection. 48 - `match` must be a [port match expression](https://github.com/egnor/ok-py-serial#serial-port-match-expressions) matching the port of interest 49 - `topts` can define parameters for tracking (eg. re-scan interval) 50 - `copts` can define parameters for connecting (eg. baud rate) 51 - OR `baud` can set the baud rate (as a shortcut) 52 53 Actual port scans and connections only happen after `find_*` or 54 `connect_*` methods are called. Call `close` to end any open 55 connection; use `SerialPortTracker` as the target of a 56 [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with) 57 to automatically close the port on exit from the `with` body. 58 59 Raises: 60 - `SerialMatcherInvalid`: Bad format of `match` string 61 """ 62 63 if isinstance(match, str): 64 match = SerialPortMatcher(match) 65 if baud: 66 copts = dataclasses.replace(copts, baud=baud) 67 68 self._match = match 69 self._tracker_opts = topts 70 self._conn_opts = copts 71 72 self._lock = threading.Lock() 73 self._scan_keys: set[str] = set() 74 self._scan_matched: list[SerialPort] = [] 75 self._scan_deadline = 0.0 76 self._next_scan = 0.0 77 self._conn: SerialConnection | None = None 78 79 log.debug("Tracking: %s", repr(str(match)) if match else "(any port)")
Prepare to manage a serial port connection.
matchmust be a port match expression matching the port of interesttoptscan define parameters for tracking (eg. re-scan interval)coptscan define parameters for connecting (eg. baud rate)- OR
baudcan set the baud rate (as a shortcut)
- OR
Actual port scans and connections only happen after find_* or
connect_* methods are called. Call close to end any open
connection; use SerialPortTracker as the target of a
with statement
to automatically close the port on exit from the with body.
Raises:
SerialMatcherInvalid: Bad format ofmatchstring
91 def close(self) -> None: 92 """ 93 Closes any open connection with `SerialConnection.close`. A subsequent 94 call to `connect_sync`/`connect_async` will establish a new connection. 95 """ 96 97 with self._lock: 98 if self._conn: 99 self._conn.close()
Closes any open connection with SerialConnection.close. A subsequent
call to connect_sync/connect_async will establish a new connection.
101 def find_sync(self, timeout: float | int | None = None) -> list[SerialPort]: 102 """ 103 Waits up to `timeout` seconds (forever for `None`) until serial port(s) 104 exist matching this tracker's requirements. Rescans periodically while 105 waiting (see `TrackerOptions.scan_interval`). 106 107 Returns a list of matching `SerialPort` objects, or `[]` on timeout. 108 109 Raises: 110 - `SerialScanException`: System error scanning ports 111 """ 112 113 def key(p: SerialPort) -> str: 114 return p.name + "@" + p.attr.get("time", "") 115 116 deadline = to_deadline(timeout) 117 while True: 118 with self._lock: 119 if (wait := from_deadline(self._next_scan)) <= 0: 120 wait = self._tracker_opts.scan_interval 121 found = scan_serial_ports() 122 for p in found if self._next_scan else []: # not first scan 123 if key(p) not in self._scan_keys: 124 p.attr["tracking"] = "new" 125 126 matched = self._match.filter(found) 127 self._next_scan = to_deadline(wait) 128 self._scan_keys = set(key(p) for p in found) 129 self._scan_matched = matched 130 nf, nm = len(found), len(matched) 131 log.debug("%d/%d ports match %r", nm, nf, str(self._match)) 132 133 if self._scan_matched: 134 return self._scan_matched 135 136 timeout_wait = from_deadline(deadline) 137 if timeout_wait < wait: 138 return [] 139 140 log.debug("Next scan in %.2fs", wait) 141 time.sleep(wait)
Waits up to timeout seconds (forever for None) until serial port(s)
exist matching this tracker's requirements. Rescans periodically while
waiting (see TrackerOptions.scan_interval).
Returns a list of matching SerialPort objects, or [] on timeout.
Raises:
SerialScanException: System error scanning ports
143 async def find_async(self) -> list[SerialPort]: 144 """ 145 Similar to `find_sync` but returns a 146 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 147 instead of blocking the current thread. 148 """ 149 150 while True: 151 with self._lock: 152 next_scan = self._next_scan 153 if ports := self.find_sync(timeout=0): 154 return ports 155 wait = from_deadline(next_scan) 156 log.debug("Next scan in %.2fs", wait) 157 await asyncio.sleep(wait)
159 def connect_sync( 160 self, timeout: float | int | None = None 161 ) -> SerialConnection | None: 162 """ 163 If a connection is established and healthy, returns it immediately. 164 165 Otherwise, waits up to `timeout` seconds (forever for `None`) for 166 serial port(s) to appear matching this tracker's requirements, then 167 attempts a new connection. If the connection succeeds, it is remembered 168 and returned, otherwise scanning resumes. 169 170 If multiple ports match the requirements, connections are attempted 171 to each, and the first success (if any) is remembered and returned. 172 173 Returns `None` on timeout. 174 175 Raises: 176 - `SerialScanException`: System error scanning ports 177 """ 178 179 deadline = to_deadline(timeout) 180 ports: list[SerialPort] = [] 181 while True: 182 with self._lock: 183 if self._conn: 184 try: 185 self._conn.write(b"") # check for liveness 186 return self._conn 187 except SerialIoClosed: 188 log.debug("%s closed", self._conn.port_name) 189 self._conn = None 190 except SerialIoException as exc: 191 name = self._conn.port_name 192 log.warning("%s failed (%s)", name, exc) 193 self._conn.close() 194 self._conn = None 195 196 ports.sort(key=lambda p: p.attr.get("time", ""), reverse=True) 197 for port in ports: 198 try: 199 self._conn = SerialConnection( 200 port=port, opts=self._conn_opts 201 ) 202 return self._conn 203 except SerialOpenException as exc: 204 log.warning("Can't open %s (%s)", port, exc) 205 self._scan_matched = [] # force re-scan on error 206 207 if not (ports := self.find_sync(timeout=from_deadline(deadline))): 208 return None
If a connection is established and healthy, returns it immediately.
Otherwise, waits up to timeout seconds (forever for None) for
serial port(s) to appear matching this tracker's requirements, then
attempts a new connection. If the connection succeeds, it is remembered
and returned, otherwise scanning resumes.
If multiple ports match the requirements, connections are attempted to each, and the first success (if any) is remembered and returned.
Returns None on timeout.
Raises:
SerialScanException: System error scanning ports
210 async def connect_async(self) -> SerialConnection: 211 """ 212 Similar to `connect_sync` but returns a 213 [`Future`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) 214 instead of blocking the current thread. 215 """ 216 217 while True: 218 with self._lock: 219 next_scan = self._next_scan 220 if conn := self.connect_sync(timeout=0): 221 return conn 222 wait = from_deadline(next_scan) 223 log.debug("Next scan in %.2fs", wait) 224 await asyncio.sleep(wait)
Similar to connect_sync but returns a
Future
instead of blocking the current thread.
226 @property 227 def matcher(self) -> SerialPortMatcher: 228 """The `SerialPortMatcher` used by this tracker.""" 229 return self._match
The SerialPortMatcher used by this tracker.
22@dataclasses.dataclass(frozen=True) 23class TrackerOptions: 24 """Optional parameters for `SerialPortTracker`.""" 25 26 scan_interval: float | int = 0.5 27 """Seconds between port re-scans when waiting for a match."""
Optional parameters for SerialPortTracker.
5class SerialException(OSError): 6 """Exception base class for `okserial` I/O errors.""" 7 8 port: str | None 9 """The device name of the serial port involved in the error.""" 10 11 def __init__(self, message: str, port: str | None = None): 12 super().__init__(f"{port}: {message}" if port else message) 13 self.port = port
Exception base class for okserial I/O errors.
22class SerialIoClosed(SerialIoException): 23 """Exception raised when I/O is attempted on a closed serial port.""" 24 25 pass
Exception raised when I/O is attempted on a closed serial port.
16class SerialIoException(SerialException): 17 """Exception raised for I/O errors communicating with serial ports.""" 18 19 pass
Exception raised for I/O errors communicating with serial ports.
46class SerialMatcherInvalid(ValueError): 47 """Exception raised when a port matcher string is syntactically invalid.""" 48 49 pass
Exception raised when a port matcher string is syntactically invalid.
34class SerialOpenBusy(SerialOpenException): 35 """Exception raised if an open attempt failes due to port contention.""" 36 37 pass
Exception raised if an open attempt failes due to port contention.
28class SerialOpenException(SerialException): 29 """Exception raised for system errors opening a serial port.""" 30 31 pass
Exception raised for system errors opening a serial port.
40class SerialScanException(SerialException): 41 """Exception raised for system errors scanning available ports.""" 42 43 pass
Exception raised for system errors scanning available ports.