crystalfontz

AtxPowerSwitchFunction

Bases: Enum

An ATX power switch function.

Refer to your device's datasheet for the effects of each of these functions.

Source code in crystalfontz/atx.py
10
11
12
13
14
15
16
17
18
19
20
class AtxPowerSwitchFunction(Enum):
    """
    An ATX power switch function.

    Refer to your device's datasheet for the effects of each of these functions.
    """

    LCD_OFF_IF_HOST_IS_OFF = 0x10
    KEYPAD_RESET = 0x20
    KEYPAD_POWER_ON = 0x40
    KEYPAD_POWER_OFF = 0x80

AtxPowerSwitchFunctionalitySettings dataclass

Source code in crystalfontz/atx.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass
class AtxPowerSwitchFunctionalitySettings:
    functions: Set[AtxPowerSwitchFunction]
    auto_polarity: bool = True
    reset_invert: bool = False
    power_invert: bool = False
    power_pulse_length_seconds: Optional[float] = None

    """
    Settings for command 28 (0x1C): Set ATX Power Switch Functionality.

    Parameters:
        functions (Set[AtxPowerSwitchFunction): A set of enabled power switch functions.
        auto_polarity (bool): When True, automatically detects polarity for reset and/or
                              power (recommended)
        reset_invert (bool): When True, the reset pin drives high instead of low
        power_invert (bool): When True, the power pin drives high instead of low
        power_pulse_length_seconds (Optional[float]): Length of power on and off
                                                      pulses in seconds. When set to 8
                                                      seconds or higher, asserts power
                                                      control line until host power
                                                      state changes.

    """

    @classmethod
    def from_bytes(cls: Type[Self], settings: bytes) -> Self:
        functions: Set[AtxPowerSwitchFunction] = set()
        for function in AtxPowerSwitchFunction:
            if settings[0] & function.value:
                functions.add(function)
        auto_polarity: bool = bool(settings[0] & AUTO_POLARITY)
        reset_invert: bool = bool(settings[0] & RESET_INVERT)
        power_invert: bool = bool(settings[0] & POWER_INVERT)
        power_pulse_length_seconds: Optional[float] = (
            settings[1] / 32 if len(settings) > 1 else None
        )

        return cls(
            functions=functions,
            auto_polarity=auto_polarity,
            reset_invert=reset_invert,
            power_invert=power_invert,
            power_pulse_length_seconds=power_pulse_length_seconds,
        )

    def to_bytes(self: Self) -> bytes:
        functions: int = 0
        for function in self.functions:
            functions ^= function.value
        if self.auto_polarity:
            functions ^= AUTO_POLARITY
        if self.reset_invert:
            functions ^= RESET_INVERT
        if self.power_invert:
            functions ^= POWER_INVERT

        packed: bytes = functions.to_bytes(1, "big")

        if self.power_pulse_length_seconds is not None:
            pulse_length = int(self.power_pulse_length_seconds * 32)

            if pulse_length < 1:
                raise ValueError(f"Pulse length can not be less than {1/32}")

            packed += min(pulse_length, 255).to_bytes(1, "big")

        return packed

    def as_dict(self: Self) -> Dict[str, Any]:
        as_ = asdict(self)

        as_["functions"] = [fn.value for fn in self.functions]

        return as_

    def __repr__(self: Self) -> str:
        repr_ = f"Functions enabled: {', '.join([e.name for e in self.functions])}\n"
        repr_ += f"Auto-Polarity Enabled: {'yes' if self.auto_polarity else 'no'}\n"
        repr_ += f"Reset Inverted: {'yes' if self.reset_invert else 'no'}\n"
        repr_ += f"Power Inverted: {'yes' if self.power_invert else 'no'}\n"
        repr_ += f"Power Pulse Length (seconds): {self.power_pulse_length_seconds}"
        return repr_

power_pulse_length_seconds = None class-attribute instance-attribute

Settings for command 28 (0x1C): Set ATX Power Switch Functionality.

Parameters:
  • functions (Set[AtxPowerSwitchFunction) –

    A set of enabled power switch functions.

  • auto_polarity (bool) –

    When True, automatically detects polarity for reset and/or power (recommended)

  • reset_invert (bool) –

    When True, the reset pin drives high instead of low

  • power_invert (bool) –

    When True, the power pin drives high instead of low

  • power_pulse_length_seconds (Optional[float]) –

    Length of power on and off pulses in seconds. When set to 8 seconds or higher, asserts power control line until host power state changes.

Client

Bases: Protocol

A crystalfontz client. Typically created through a call to connection or create_connection.

This client has methods for every command supported by the CFA533. For more details, refer to the datasheet for your device.

In addition, this client will accept a ReportHandler class, and will call the appropriate method on it whenever a key activity or temperature report is received.

Also supported are configurations for command timeouts and retry behavior. The default behavior is a timeout of 0.25 seconds with no retries. This 250ms timeout is based on the datasheet for the CFA533.

Source code in crystalfontz/client.py
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
class Client(asyncio.Protocol):
    """
    A crystalfontz client. Typically created through a call to `connection` or
    `create_connection`.

    This client has methods for every command supported by the CFA533. For more
    details, refer to the datasheet for your device.

    In addition, this client will accept a `ReportHandler` class, and will call
    the appropriate method on it whenever a key activity or temperature report is
    received.

    Also supported are configurations for command timeouts and retry behavior. The
    default behavior is a timeout of 0.25 seconds with no retries. This 250ms timeout
    is based on the datasheet for the CFA533.
    """

    def __init__(
        self: Self,
        device: Device,
        report_handler: ReportHandler,
        timeout: float,
        retry_times: int,
        loop: asyncio.AbstractEventLoop,
    ) -> None:

        self.device: Device = device
        self.report_handler: ReportHandler = report_handler
        self._default_timeout: float = timeout
        self._default_retry_times: int = retry_times

        self._buffer: bytes = b""
        self.loop: asyncio.AbstractEventLoop = loop
        self._transport: Optional[SerialTransport] = None
        self._connection_made: asyncio.Future[None] = self.loop.create_future()
        self._closed: asyncio.Future[None] = self.loop.create_future()

        self._lock: asyncio.Lock = asyncio.Lock()
        self._expect: Optional[Type[Response]] = None
        self._receivers: Dict[Type[Response], List[Receiver[Response]]] = defaultdict(
            lambda: list()
        )
        self._receiving: Set[Receiver[Any]] = set()

    @property
    def model(self: Self) -> str:
        """
        The model of the current device.
        """

        return self.device.model

    @property
    def hardware_rev(self: Self) -> str:
        """
        The hardware revision of the current device.
        """

        return self.device.hardware_rev

    @property
    def firmware_rev(self: Self) -> str:
        """
        The firmware revision of the current device.
        """

        return self.device.firmware_rev

    @property
    def baud_rate(self: Self) -> BaudRate:
        """
        The transport's baud rate.
        """

        if not self._transport or not self._transport.serial:
            raise ConnectionError("Uninitialized transport has no baud rate")
        return self._transport.serial.baudrate

    @baud_rate.setter
    def baud_rate(self: Self, baud_rate: BaudRate) -> None:
        if not self._transport or not self._transport.serial:
            raise ConnectionError("Uninitialized transport has no baud rate")
        self._transport.serial.baudrate = baud_rate

    #
    # pyserial callbacks
    #

    def _is_serial_transport(
        self: Self, transport: asyncio.BaseTransport
    ) -> TypeGuard[SerialTransport]:
        return isinstance(transport, SerialTransport)

    def connection_made(self: Self, transport: asyncio.BaseTransport) -> None:
        if not self._is_serial_transport(transport):
            raise ConnectionError("Transport is not a SerialTransport")

        self._transport = transport
        self._running = True

        self._key_activity_queue: Receiver[KeyActivityReport] = self.subscribe(
            KeyActivityReport, expect=False
        )
        self._temperature_queue: Receiver[TemperatureReport] = self.subscribe(
            TemperatureReport, expect=False
        )

        self._key_activity_task: asyncio.Task[None] = self.loop.create_task(
            self._handle_report(
                "key_activity",
                self._key_activity_queue,
                self.report_handler.on_key_activity,
            )
        )
        self._temperature_task: asyncio.Task[None] = self.loop.create_task(
            self._handle_report(
                "temperature",
                self._temperature_queue,
                self.report_handler.on_temperature,
            )
        )

        self._connection_made.set_result(None)

    def connection_lost(self: Self, exc: Optional[Exception]) -> None:
        self._running = False
        try:
            if exc:
                raise ConnectionError("Connection lost") from exc
        except Exception as exc:
            self._error(exc)
        else:
            self._close()

    @property
    def closed(self: Self) -> asyncio.Future:
        """
        An asyncio.Future that resolves when the connection is closed. This
        may be due either to calling `client.close()` or an Exception.
        """
        return self._closed

    def close(self: Self) -> None:
        """
        Close the connection.
        """

        if self._transport:
            self._transport.close()
        self._close()

    # Internal method to close the connection, potentially due to an exception.
    def _close(self: Self, exc: Optional[Exception] = None) -> None:
        self._running = False

        # A clean exit requires that we cancel these tasks and then wait
        # for them to finish before killing the event loop
        self._key_activity_task.cancel()
        self._temperature_task.cancel()

        tasks_done = asyncio.gather(self._key_activity_task, self._temperature_task)
        tasks_done.add_done_callback(self._finish_tasks(exc))

        if self.closed.done() and exc:
            raise exc

    def _finish_tasks(
        self: Self,
        exc: Optional[Exception],
    ) -> Callable[[asyncio.Future[Tuple[None, None]]], None]:
        def callback(tasks_done: asyncio.Future[Tuple[None, None]]) -> None:
            task_exc = tasks_done.exception()
            try:
                # The tasks should have failed with a CancelledError
                if task_exc:
                    raise task_exc
            except asyncio.CancelledError:
                # This error is expected, wrap it up
                self._finish_close(exc)
            except Exception as task_exc:
                # An unexpected error of some kind was raised by the tasks.
                # Do our best to handle them...
                if exc:
                    # We have two exceptions. We don't want to mask the
                    # exception that actually caused us to close, so we
                    # warn and hope for the best.
                    warnings.warn(traceback.format_exc())
                    self._finish_close(exc)
                else:
                    # This is our new exception.
                    self._finish_close(task_exc)

        return callback

    def _finish_close(self: Self, exc: Optional[BaseException]) -> None:
        # Tasks successfully closed. Resolve the future if we have it,
        # otherwise raise.
        if self.closed.done():
            if exc:
                raise exc
        elif exc:
            self.closed.set_exception(exc)
        else:
            self.closed.set_result(None)

    def data_received(self: Self, data: bytes) -> None:
        try:
            self._buffer += data

            packet, buff = parse_packet(self._buffer)
            self._buffer = buff

            while packet:
                self._packet_received(packet)
                packet, buff = parse_packet(self._buffer)
                self._buffer = buff
        except Exception as exc:
            # Exceptions here would have come from the packet parser, not
            # the packet handler
            self._error(exc)

    def _error(self: Self, exc: Exception) -> None:
        if self._receiving:
            list(self._receiving)[0].put_nowait((exc, None))
        else:
            self._close(exc)

    def _packet_received(self: Self, packet: Packet) -> None:
        logging.debug(f"Packet received: {packet}")
        try:
            res = Response.from_packet(packet)
            raw_res = (
                RawResponse.from_packet(packet)
                if RawResponse in self._receivers
                else None
            )
        except ResponseDecodeError as exc:
            self._emit_response_decode_error(exc)
        except DeviceError as exc:
            self._emit_device_error(exc)
        except Exception as exc:
            self._error(exc)
        else:
            self._emit(type(res), (None, res))
            if raw_res:
                self._emit(RawResponse, (None, raw_res))

    def _emit(self: Self, response_cls: Type[Response], item: Result[Response]) -> None:
        if response_cls in self._receivers:
            for rcv in self._receivers[response_cls]:
                rcv.put_nowait(item)
        elif item[0]:
            self._error(item[0])

    def _emit_response_decode_error(self: Self, exc: ResponseDecodeError) -> None:
        # We know the intended response type, so send it to any subscribers
        self._emit(exc.response_cls, (exc, None))

    def _emit_device_error(self: Self, exc: DeviceError) -> None:
        if exc.expected_response in RESPONSE_CLASSES:
            self._emit(RESPONSE_CLASSES[exc.expected_response], (exc, None))
        else:
            self._error(exc)

    #
    # Event subscriptions
    #

    def subscribe(self: Self, cls: Type[R], expect: bool = True) -> Receiver[R]:
        """
        Subscribe to results of a given response class. Returns a
        `Receiver[Response]`.

        This is a low level method. Most use cases not met by individual command
        methods or a ReportHandler are best handled with `client.expect`.
        """

        receiving: Set[Receiver[Any]] = self._receiving if expect else set()

        rcv: Receiver[R] = Receiver(receiving)
        key = cast(Type[Response], cls)
        value = cast(Receiver[Response], rcv)
        self._receivers[key].append(value)
        return rcv

    def unsubscribe(self: Self, cls: Type[R], receiver: Receiver[R]) -> None:
        """
        Unsubscribe from results of a given response class and queue. This queue is
        typically created by a call to `client.subscribe`.

        This is a low level method. Most use cases not met by individual command
        methods or a ReportHandler are best handled with `client.expect`.
        """

        key = cast(Type[Response], cls)
        value = [
            rcv
            for rcv in self._receivers[key]
            if rcv != cast(Receiver[Response], receiver)
        ]

        cast_value = cast(List[Receiver[Response]], value)
        self._receivers[key] = cast_value

    @timeout
    async def expect(self: Self, cls: Type[R], timeout: Optional[float] = None) -> R:
        """
        Wait for a response of an expected class, with a timeout.

        This method accepts a `timeout` parameter. If defined, it will override
        the client's default timeout.

        This is a low level method. Most use cases are met by individual command
        methods.
        """
        q = self.subscribe(cls)
        exc, res = await q.get()
        q.task_done()
        self.unsubscribe(cls, q)
        if exc:
            raise exc
        elif res:
            return res
        raise CrystalfontzError("assert: result has either exception or response")

    #
    # Commands
    #

    @retry
    async def send_command(
        self: Self,
        command: Command,
        response_cls: Type[R],
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> R:
        """
        Send a `Command`, then wait for and return its expected `Response`.

        This method accepts `timeout` and `retry_times` parameters. If defined, they
        will override the client's default timeout.

        This is a low level method. Most use cases are met by individual command
        methods.
        """
        async with self._lock:
            self.send_packet(command.to_packet())
            return await self.expect(response_cls, timeout=timeout)

    def send_packet(self: Self, packet: Packet) -> None:
        if not self._transport:
            raise ConnectionError("Must be connected to send data")
        buff = serialize_packet(packet)
        self._transport.write(buff)

    async def ping(
        self: Self,
        payload: bytes,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> Pong:
        """
        0 (0x00): Ping Command

        The device will return the Ping Command to the host.
        """

        return await self.send_command(
            Ping(payload), Pong, timeout=timeout, retry_times=retry_times
        )

    async def test_connection(
        self: Self, timeout: Optional[float] = None, retry_times: Optional[int] = None
    ) -> None:
        """
        Test the connection by sending a ping and checking that the response matches.
        """

        payload: bytes = "".join(
            random.choice(ascii_lowercase) for _ in range(16)
        ).encode("ascii")
        try:
            pong = await self.ping(payload, timeout=timeout, retry_times=retry_times)
        except TimeoutError as exc:
            raise ConnectionError(
                "Failed to receive packet within "
                f"{timeout if timeout is not None else self._default_timeout} seconds"
            ) from exc
        if pong.response != payload:
            raise ConnectionError(f"{pong.response} != {payload}")

    async def detect_baud_rate(
        self: Self, timeout: Optional[float] = None, retry_times: Optional[int] = None
    ) -> None:
        """
        Detect the device's configured baud rate by testing the connection at each
        potential baud setting.
        """

        baud_rate = self.baud_rate
        try:
            logger.info(f"Testing connection at {baud_rate} bps...")
            await self.test_connection(timeout, retry_times)
        except ConnectionError as exc:
            logger.debug(exc)
            other_baud_rate = OTHER_BAUD_RATE[baud_rate]
            self.baud_rate = other_baud_rate
            logger.info(
                f"Connection failed at {baud_rate} bps. "
                f"Testing connection at {other_baud_rate} bps..."
            )
            try:
                await self.test_connection(timeout, retry_times)
            except ConnectionError as exc:
                logger.info(
                    f"Connection failed for both {baud_rate} bps "
                    f"and {other_baud_rate} bps."
                )
                raise exc
        else:
            logger.info(f"Connection successful at {baud_rate} bps.")

    async def versions(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> Versions:
        """
        1 (0x01): Get Hardware & Firmware Version

        The device will return the hardware and firmware version information to the
        host.
        """

        return await self.send_command(
            GetVersions(), Versions, timeout=timeout, retry_times=retry_times
        )

    async def detect_device(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> None:
        """
        Get model, hardware and firmware versions from the device, then configure the
        client to use that device. This is useful if you don't know a priori what
        device you're using.
        """

        versions = await self.versions(timeout=timeout, retry_times=retry_times)
        self.device = lookup_device(
            versions.model, versions.hardware_rev, versions.firmware_rev
        )

    async def write_user_flash_area(
        self: Self,
        data: bytes,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> UserFlashAreaWritten:
        """
        2 (0x02): Write User Flash Area

        The CFA533 reserves 16 bytes of nonvolatile memory for arbitrary use by the
        host. This memory can be used to store a serial number, IP address, gateway
        address, netmask, or any other data required. All 16 bytes must be supplied.
        """

        return await self.send_command(
            WriteUserFlashArea(data),
            UserFlashAreaWritten,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def read_user_flash_area(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> UserFlashAreaRead:
        """
        3 (0x03): Read User Flash Area

        This command will read the User Flash Area and return the data to the host.
        For more information, review the documentation for
        `client.write_user_flash_area`.
        """

        return await self.send_command(
            ReadUserFlashArea(),
            UserFlashAreaRead,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def store_boot_state(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> BootStateStored:
        """
        4 (0x04): Store Current State as Boot State

        The device loads its power-up configuration from nonvolatile memory when
        power is applied. The device is configured at the factory to display a
        "welcome" screen when power is applied. This command can be used to customize
        the "welcome" screen, as well as the following items:

        - Characters shown on LCD
        - Special character font definitions
        - Cursor position
        - Cursor style
        - Contrast setting
        - LCD backlight setting
        - Settings of any "live" displays, such as temperature display
        - Key press and release masks
        - ATX function enable and pulse length settings
        - Baud rate
        - GPIO settings

        You cannot store the temperature reporting (although the live display of
        temperatures can be saved). You cannot store the host watchdog. The host
        software should enable this item once the system is initialized and is ready
        to receive the data.
        """

        return await self.send_command(
            StoreBootState(), BootStateStored, timeout=timeout, retry_times=retry_times
        )

    async def reboot_lcd(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> PowerResponse:
        """
        Reboot the device, using 5 (0x05): Reboot Device, Reset Host, or Power Off
        Host.

        Rebooting the device may be useful for testing the boot configuration. It may
        also be useful to re-enumerate the devices on the One-Wire bus.
        """

        return await self.send_command(
            RebootLCD(), PowerResponse, timeout=timeout, retry_times=retry_times
        )

    async def reset_host(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> PowerResponse:
        """
        Reset the host, using 5 (0x05): Reboot Device, Reset Host, or Power Off Host.

        This command assumes the host's reset line is connected to GPIO[3]. For more
        information, review your device's datasheet.
        """

        await self.send_command(ResetHost(), PowerResponse)
        return await self.expect(
            PowerResponse, timeout=timeout, retry_times=retry_times
        )

    async def shutdown_host(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> PowerResponse:
        """
        Turn off the host's power, using 5 (0x05): Reboot Device, Reset Host, or Power
        Off Host.

        This command assumes the host's power control line is connected to GPIO[2].
        For more information, review your device's datasheet.
        """

        return await self.send_command(
            ShutdownHost(), PowerResponse, timeout=timeout, retry_times=retry_times
        )

    async def clear_screen(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> ClearedScreen:
        """
        6 (0x06): Clear LCD Screen

        Sets the contents of the LCD screen DDRAM to '' = 0x20 = 32 and moves the
        cursor to the left-most column of the top line.
        """

        return await self.send_command(
            ClearScreen(), ClearedScreen, timeout=timeout, retry_times=retry_times
        )

    async def set_line_1(
        self: Self,
        line: str | bytes,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> Line1Set:
        """
        7 (0x07): Set LCD Contents, Line 1

        Sets the center 16 characters displayed on the top line of the LCD screen.

        Please use this command only if you need backwards compatibility with older
        devices. For new applications, please use the more flexible command
        `client.send_data`.
        """

        return await self.send_command(
            SetLine1(line, self.device),
            Line1Set,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def set_line_2(
        self: Self,
        line: str | bytes,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> Line2Set:
        """
        8 (0x08): Set LCD Contents, Line 2

        Sets the center 16 characters displayed on the bottom line of the LCD screen.

        Please use this command only if you need backwards compatibility with older
        devices. For new applications, please use the more flexible command
        `client.send_data`.
        """

        return await self.send_command(
            SetLine2(line, self.device),
            Line2Set,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def set_special_character_data(
        self: Self,
        index: int,
        character: SpecialCharacter,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> SpecialCharacterDataSet:
        """
        9 (0x09): Set LCD Special Character Data

        Sets the font definition for one of the special characters (CGRAM).
        """

        return await self.send_command(
            SetSpecialCharacterData(index, character, self.device),
            SpecialCharacterDataSet,
            timeout=timeout,
            retry_times=retry_times,
        )

    def set_special_character_encoding(
        self: Self,
        character: str,
        index: int,
    ) -> None:
        """
        Configure a unicode character to encode to the index of a given special
        character on CGRAM.
        """

        self.device.character_rom.set_encoding(character, index)

    async def read_lcd_memory(
        self: Self,
        address: int,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> LcdMemory:
        """
        10 (0x0A): Read 8 bytes of LCD Memory

        This command will return the contents of the LCD's DDRAM or CGRAM. This
        command is intended for debugging.
        """

        return await self.send_command(
            ReadLcdMemory(address), LcdMemory, timeout=timeout, retry_times=retry_times
        )

    async def set_cursor_position(
        self: Self,
        row: int,
        column: int,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> CursorPositionSet:
        """
        11 (0x0B): Set LCD Cursor Position

        This command allows the cursor to be placed at the desired location on the
        device's LCD screen.
        """

        return await self.send_command(
            SetCursorPosition(row, column, self.device),
            CursorPositionSet,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def set_cursor_style(
        self: Self,
        style: CursorStyle,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> CursorStyleSet:
        """
        12 (0x0C): Set LCD Cursor Style

        This command allows you to select among four hardware generated cursor
        options.
        """

        return await self.send_command(
            SetCursorStyle(style),
            CursorStyleSet,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def set_contrast(
        self: Self,
        contrast: float,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> ContrastSet:
        """
        13 (0x0D): Set LCD Contrast

        This command sets the contrast or vertical viewing angle of the display.
        """

        return await self.send_command(
            SetContrast(contrast, self.device),
            ContrastSet,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def set_backlight(
        self: Self,
        lcd_brightness: float,
        keypad_brightness: Optional[float] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> BacklightSet:
        """
        14 (0x0E): Set LCD & Keypad Backlight

        This command sets the brightness of the LCD and keypad backlights.
        """

        return await self.send_command(
            SetBacklight(lcd_brightness, keypad_brightness, self.device),
            BacklightSet,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def read_dow_device_information(
        self: Self,
        index: int,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> DowDeviceInformation:
        """
        18 (0x12): Read DOW Device Information

        When power is applied to the unit, it detects any devices connected to the
        Dallas Semiconductor One-Wire (DOW) bus and stores the device's information.
        This command will allow the host to read the device's information.

        Note: The GPIO pin used for DOW must not be configured as user GPIO. For more
        information, review your unit's datasheet.
        """

        return await self.send_command(
            ReadDowDeviceInformation(index),
            DowDeviceInformation,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def setup_temperature_reporting(
        self: Self,
        enabled: Iterable[int],
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> TemperatureReportingSetUp:
        """
        19 (0x13): Set Up Temperature Reporting

        This command will configure the device to report the temperature information
        to the host every second.
        """

        return await self.send_command(
            SetupTemperatureReporting(enabled, self.device),
            TemperatureReportingSetUp,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def dow_transaction(
        self: Self,
        index: int,
        bytes_to_read: int,
        data_to_write: Optional[bytes] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> DowTransactionResult:
        """
        20 (0x14): Arbitrary DOW Transaction

        The unit can function as an RS-232 to Dallas 1-Wire bridge. The unit can
        send up to 15 bytes and receive up to 14 bytes. This will be sufficient for
        many devices, but some devices require larger transactions and cannot by fully
        used with the unit.

        For more information, review your unit's datasheet.
        """

        return await self.send_command(
            DowTransaction(
                index,
                bytes_to_read,
                data_to_write if data_to_write is not None else b"",
            ),
            DowTransactionResult,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def setup_live_temperature_display(
        self: Self,
        slot: int,
        item: TemperatureDisplayItem,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> LiveTemperatureDisplaySetUp:
        """
        21 (0x15): Set Up Live Temperature Display

        You can configure the device to automatically update a portion of the LCD with
        a "live" temperature reading. Once the display is configured using this
        command, the device will continue to display the live reading on the LCD
        without host intervention.
        """

        return await self.send_command(
            SetupLiveTemperatureDisplay(slot, item, self.device),
            LiveTemperatureDisplaySetUp,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def send_command_to_lcd_controller(
        self: Self,
        location: LcdRegister,
        data: int | bytes,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> CommandSentToLcdController:
        """
        22 (0x16): Send Command Directly to the LCD Controller

        The controller on the CFA533 is HD44780 compatible. Generally, you will not
        need low-level access to the LCD controller but some arcane functions of the
        HD44780 are not exposed by the CFA533's command set. This command allows you
        to access the CFA533's LCD controller directly.
        """

        return await self.send_command(
            SendCommandToLcdController(location, data),
            CommandSentToLcdController,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def configure_key_reporting(
        self: Self,
        when_pressed: Set[KeyPress],
        when_released: Set[KeyPress],
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> KeyReportingConfigured:
        """
        23 (0x17): Configure Key Reporting


        By default, the device reports any key event to the host. This command allows
        the key events to be enabled or disabled on an individual basis.
        """

        return await self.send_command(
            ConfigureKeyReporting(when_pressed, when_released),
            KeyReportingConfigured,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def poll_keypad(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> KeypadPolled:
        """
        24 (0x18): Read Keypad, Polled Mode

        In some situations, it may be convenient for the host to poll the device for
        key activity. This command allows the host to detect which keys are currently
        pressed, which keys have been pressed since the last poll, and which keys have
        been released since the last poll.
        """

        return await self.send_command(
            PollKeypad(), KeypadPolled, timeout=timeout, retry_times=retry_times
        )

    async def set_atx_power_switch_functionality(
        self: Self,
        settings: AtxPowerSwitchFunctionalitySettings,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> AtxPowerSwitchFunctionalitySet:
        """
        28 (0x1C): Set ATX Power Switch Functionality

        The combination of this device with the Crystalfontz WR-PWR-Y14 cable can
        be used to replace the function of the power and reset switches in a standard
        ATX-compatible system.

        This functionality comes with a number of caveats. Please review your device's
        datasheet for more information.
        """

        return await self.send_command(
            SetAtxPowerSwitchFunctionality(settings),
            AtxPowerSwitchFunctionalitySet,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def configure_watchdog(
        self: Self,
        timeout_seconds: int,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> WatchdogConfigured:
        """
        29 (0x1D): Enable/Disable and Reset the Watchdog

        Some high-availability systems use hardware watchdog timers to ensure that
        a software or hardware failure does not result in an extended system outage.
        Once the host system has booted, a system monitor program is started. The
        system monitor program would enable the watchdog timer on the device. If the
        system monitor program fails to reset the device's watchdog timer, the device
        will reset the host system.

        The GPIO pins used for ATX control must not be configured as user GPIO. For
        more details, review your device's datasheet.
        """

        return await self.send_command(
            ConfigureWatchdog(timeout_seconds),
            WatchdogConfigured,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def read_status(
        self: Self,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> DeviceStatus:
        """
        30 (0x1E): Read Reporting & Status

        This command can be used to verify the current items configured to report to
        the host, as well as some other miscellaneous status information. Please
        note that the information returned is not identical between devices, and may
        in fact vary between firmware versions of the same model. As such, the return
        value of this function is not type-safe.
        """

        res: StatusRead = await self.send_command(
            ReadStatus(), StatusRead, timeout=timeout, retry_times=retry_times
        )
        return self.device.status(res.data)

    async def send_data(
        self: Self,
        row: int,
        column: int,
        data: str | bytes,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> DataSent:
        """
        31 (0x1F): Send Data to LCD

        This command allows data to be placed at any position on the LCD.
        """

        return await self.send_command(
            SendData(row, column, data, self.device),
            DataSent,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def set_baud_rate(
        self: Self,
        baud_rate: BaudRate,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> BaudRateSet:
        """
        33 (0x21): Set Baud Rate

        This command will change the device's baud rate. This method sends the baud
        rate command, waits for a positive acknowledgement from the device at the old
        baud rate, and then switches to the new baud rate. The baud rate must be saved
        by a call to `client.store_boot_state` if you want the device to power up at
        the new baud rate.
        """

        res: BaudRateSet = await self.send_command(
            SetBaudRate(baud_rate),
            BaudRateSet,
            timeout=timeout,
            retry_times=retry_times,
        )
        self.baud_rate = baud_rate
        return res

    # Older versions of the CFA533 don't support GPIO, and future models might
    # support more GPIO pins. Therefore, we don't validate the index or
    # gatekeep based on
    async def set_gpio(
        self: Self,
        index: int,
        output_state: int,
        settings: Optional[GpioSettings] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> GpioSet:
        """
        34 (0x22): Set or Set and Configure GPIO Pins

        The CFA533 (hardware versions 1.4 and up, firmware versions 1.9 and up) has
        five pins for user-definable general purpose input / output (GPIO). These pins
        are shared with the DOW and ATX functions. Be careful when you configure GPIO
        if you want to use the ATX or DOW at the same time.

        This functionality comes with many caveats. Please review the documentation in
        your device's datasheet.
        """

        return await self.send_command(
            SetGpio(index, output_state, settings),
            GpioSet,
            timeout=timeout,
            retry_times=retry_times,
        )

    async def read_gpio(
        self: Self,
        index: int,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> GpioRead:
        """
        35 (0x23): Read GPIO Pin Levels and Configuration State

        See method `client.set_gpio` for details on the GPIO architecture.

        This functionality comes with many caveats. Please review the documentation in
        your device's datasheet.
        """

        return await self.send_command(
            ReadGpio(index), GpioRead, timeout=timeout, retry_times=retry_times
        )

    #
    # Report handlers
    #

    async def _handle_report(
        self: Self,
        name: str,
        queue: Receiver[R],
        handler: ReportHandlerMethod,
    ) -> None:
        while True:
            if not self._running:
                logging.debug(f"{name} background task exiting")
                return

            logging.debug(f"{name} background task getting a new report")
            exc, report = await queue.get()

            if exc:
                logging.debug(f"{name} background task encountered an exception: {exc}")
                if not self.closed.done():
                    self.closed.set_exception(exc)
                    queue.task_done()
                else:
                    queue.task_done()
                    raise exc
            elif report:
                logging.debug(f"{name} background task is calling {handler.__name__}")
                await handler(report)
                queue.task_done()
            else:
                raise CrystalfontzError(
                    "assert: result has either exception or response"
                )

    #
    # Effects
    #

    def marquee(
        self: Self,
        row: int,
        text: str,
        pause: Optional[float] = None,
        tick: Optional[float] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> Marquee:
        """
        Display a marquee effect on the LCD screen.
        """

        return Marquee(
            client=self,
            row=row,
            text=text,
            pause=pause,
            tick=tick,
            timeout=timeout,
            retry_times=retry_times,
            loop=self.loop,
        )

    def screensaver(
        self: Self,
        text: str,
        tick: Optional[float] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> Screensaver:
        """
        Display a screensaver effect on the LCD screen.
        """

        return Screensaver(
            client=self,
            text=text,
            tick=tick,
            timeout=timeout,
            retry_times=retry_times,
            loop=self.loop,
        )

    def dance_party(
        self: Self,
        tick: Optional[float] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> DanceParty:
        """
        Display a dance party effect on the LCD screen.
        """

        return DanceParty(
            client=self,
            tick=tick,
            timeout=timeout,
            retry_times=retry_times,
            loop=self.loop,
        )

baud_rate property writable

The transport's baud rate.

closed property

An asyncio.Future that resolves when the connection is closed. This may be due either to calling client.close() or an Exception.

firmware_rev property

The firmware revision of the current device.

hardware_rev property

The hardware revision of the current device.

model property

The model of the current device.

clear_screen(timeout=None, retry_times=None) async

6 (0x06): Clear LCD Screen

Sets the contents of the LCD screen DDRAM to '' = 0x20 = 32 and moves the cursor to the left-most column of the top line.

Source code in crystalfontz/client.py
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
async def clear_screen(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> ClearedScreen:
    """
    6 (0x06): Clear LCD Screen

    Sets the contents of the LCD screen DDRAM to '' = 0x20 = 32 and moves the
    cursor to the left-most column of the top line.
    """

    return await self.send_command(
        ClearScreen(), ClearedScreen, timeout=timeout, retry_times=retry_times
    )

close()

Close the connection.

Source code in crystalfontz/client.py
344
345
346
347
348
349
350
351
def close(self: Self) -> None:
    """
    Close the connection.
    """

    if self._transport:
        self._transport.close()
    self._close()

configure_key_reporting(when_pressed, when_released, timeout=None, retry_times=None) async

23 (0x17): Configure Key Reporting

By default, the device reports any key event to the host. This command allows the key events to be enabled or disabled on an individual basis.

Source code in crystalfontz/client.py
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
async def configure_key_reporting(
    self: Self,
    when_pressed: Set[KeyPress],
    when_released: Set[KeyPress],
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> KeyReportingConfigured:
    """
    23 (0x17): Configure Key Reporting


    By default, the device reports any key event to the host. This command allows
    the key events to be enabled or disabled on an individual basis.
    """

    return await self.send_command(
        ConfigureKeyReporting(when_pressed, when_released),
        KeyReportingConfigured,
        timeout=timeout,
        retry_times=retry_times,
    )

configure_watchdog(timeout_seconds, timeout=None, retry_times=None) async

29 (0x1D): Enable/Disable and Reset the Watchdog

Some high-availability systems use hardware watchdog timers to ensure that a software or hardware failure does not result in an extended system outage. Once the host system has booted, a system monitor program is started. The system monitor program would enable the watchdog timer on the device. If the system monitor program fails to reset the device's watchdog timer, the device will reset the host system.

The GPIO pins used for ATX control must not be configured as user GPIO. For more details, review your device's datasheet.

Source code in crystalfontz/client.py
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
async def configure_watchdog(
    self: Self,
    timeout_seconds: int,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> WatchdogConfigured:
    """
    29 (0x1D): Enable/Disable and Reset the Watchdog

    Some high-availability systems use hardware watchdog timers to ensure that
    a software or hardware failure does not result in an extended system outage.
    Once the host system has booted, a system monitor program is started. The
    system monitor program would enable the watchdog timer on the device. If the
    system monitor program fails to reset the device's watchdog timer, the device
    will reset the host system.

    The GPIO pins used for ATX control must not be configured as user GPIO. For
    more details, review your device's datasheet.
    """

    return await self.send_command(
        ConfigureWatchdog(timeout_seconds),
        WatchdogConfigured,
        timeout=timeout,
        retry_times=retry_times,
    )

dance_party(tick=None, timeout=None, retry_times=None)

Display a dance party effect on the LCD screen.

Source code in crystalfontz/client.py
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
def dance_party(
    self: Self,
    tick: Optional[float] = None,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> DanceParty:
    """
    Display a dance party effect on the LCD screen.
    """

    return DanceParty(
        client=self,
        tick=tick,
        timeout=timeout,
        retry_times=retry_times,
        loop=self.loop,
    )

detect_baud_rate(timeout=None, retry_times=None) async

Detect the device's configured baud rate by testing the connection at each potential baud setting.

Source code in crystalfontz/client.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
async def detect_baud_rate(
    self: Self, timeout: Optional[float] = None, retry_times: Optional[int] = None
) -> None:
    """
    Detect the device's configured baud rate by testing the connection at each
    potential baud setting.
    """

    baud_rate = self.baud_rate
    try:
        logger.info(f"Testing connection at {baud_rate} bps...")
        await self.test_connection(timeout, retry_times)
    except ConnectionError as exc:
        logger.debug(exc)
        other_baud_rate = OTHER_BAUD_RATE[baud_rate]
        self.baud_rate = other_baud_rate
        logger.info(
            f"Connection failed at {baud_rate} bps. "
            f"Testing connection at {other_baud_rate} bps..."
        )
        try:
            await self.test_connection(timeout, retry_times)
        except ConnectionError as exc:
            logger.info(
                f"Connection failed for both {baud_rate} bps "
                f"and {other_baud_rate} bps."
            )
            raise exc
    else:
        logger.info(f"Connection successful at {baud_rate} bps.")

detect_device(timeout=None, retry_times=None) async

Get model, hardware and firmware versions from the device, then configure the client to use that device. This is useful if you don't know a priori what device you're using.

Source code in crystalfontz/client.py
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
async def detect_device(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> None:
    """
    Get model, hardware and firmware versions from the device, then configure the
    client to use that device. This is useful if you don't know a priori what
    device you're using.
    """

    versions = await self.versions(timeout=timeout, retry_times=retry_times)
    self.device = lookup_device(
        versions.model, versions.hardware_rev, versions.firmware_rev
    )

dow_transaction(index, bytes_to_read, data_to_write=None, timeout=None, retry_times=None) async

20 (0x14): Arbitrary DOW Transaction

The unit can function as an RS-232 to Dallas 1-Wire bridge. The unit can send up to 15 bytes and receive up to 14 bytes. This will be sufficient for many devices, but some devices require larger transactions and cannot by fully used with the unit.

For more information, review your unit's datasheet.

Source code in crystalfontz/client.py
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
async def dow_transaction(
    self: Self,
    index: int,
    bytes_to_read: int,
    data_to_write: Optional[bytes] = None,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> DowTransactionResult:
    """
    20 (0x14): Arbitrary DOW Transaction

    The unit can function as an RS-232 to Dallas 1-Wire bridge. The unit can
    send up to 15 bytes and receive up to 14 bytes. This will be sufficient for
    many devices, but some devices require larger transactions and cannot by fully
    used with the unit.

    For more information, review your unit's datasheet.
    """

    return await self.send_command(
        DowTransaction(
            index,
            bytes_to_read,
            data_to_write if data_to_write is not None else b"",
        ),
        DowTransactionResult,
        timeout=timeout,
        retry_times=retry_times,
    )

expect(cls, timeout=None) async

Wait for a response of an expected class, with a timeout.

This method accepts a timeout parameter. If defined, it will override the client's default timeout.

This is a low level method. Most use cases are met by individual command methods.

Source code in crystalfontz/client.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
@timeout
async def expect(self: Self, cls: Type[R], timeout: Optional[float] = None) -> R:
    """
    Wait for a response of an expected class, with a timeout.

    This method accepts a `timeout` parameter. If defined, it will override
    the client's default timeout.

    This is a low level method. Most use cases are met by individual command
    methods.
    """
    q = self.subscribe(cls)
    exc, res = await q.get()
    q.task_done()
    self.unsubscribe(cls, q)
    if exc:
        raise exc
    elif res:
        return res
    raise CrystalfontzError("assert: result has either exception or response")

marquee(row, text, pause=None, tick=None, timeout=None, retry_times=None)

Display a marquee effect on the LCD screen.

Source code in crystalfontz/client.py
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
def marquee(
    self: Self,
    row: int,
    text: str,
    pause: Optional[float] = None,
    tick: Optional[float] = None,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> Marquee:
    """
    Display a marquee effect on the LCD screen.
    """

    return Marquee(
        client=self,
        row=row,
        text=text,
        pause=pause,
        tick=tick,
        timeout=timeout,
        retry_times=retry_times,
        loop=self.loop,
    )

ping(payload, timeout=None, retry_times=None) async

0 (0x00): Ping Command

The device will return the Ping Command to the host.

Source code in crystalfontz/client.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
async def ping(
    self: Self,
    payload: bytes,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> Pong:
    """
    0 (0x00): Ping Command

    The device will return the Ping Command to the host.
    """

    return await self.send_command(
        Ping(payload), Pong, timeout=timeout, retry_times=retry_times
    )

poll_keypad(timeout=None, retry_times=None) async

24 (0x18): Read Keypad, Polled Mode

In some situations, it may be convenient for the host to poll the device for key activity. This command allows the host to detect which keys are currently pressed, which keys have been pressed since the last poll, and which keys have been released since the last poll.

Source code in crystalfontz/client.py
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
async def poll_keypad(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> KeypadPolled:
    """
    24 (0x18): Read Keypad, Polled Mode

    In some situations, it may be convenient for the host to poll the device for
    key activity. This command allows the host to detect which keys are currently
    pressed, which keys have been pressed since the last poll, and which keys have
    been released since the last poll.
    """

    return await self.send_command(
        PollKeypad(), KeypadPolled, timeout=timeout, retry_times=retry_times
    )

read_dow_device_information(index, timeout=None, retry_times=None) async

18 (0x12): Read DOW Device Information

When power is applied to the unit, it detects any devices connected to the Dallas Semiconductor One-Wire (DOW) bus and stores the device's information. This command will allow the host to read the device's information.

Note: The GPIO pin used for DOW must not be configured as user GPIO. For more information, review your unit's datasheet.

Source code in crystalfontz/client.py
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
async def read_dow_device_information(
    self: Self,
    index: int,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> DowDeviceInformation:
    """
    18 (0x12): Read DOW Device Information

    When power is applied to the unit, it detects any devices connected to the
    Dallas Semiconductor One-Wire (DOW) bus and stores the device's information.
    This command will allow the host to read the device's information.

    Note: The GPIO pin used for DOW must not be configured as user GPIO. For more
    information, review your unit's datasheet.
    """

    return await self.send_command(
        ReadDowDeviceInformation(index),
        DowDeviceInformation,
        timeout=timeout,
        retry_times=retry_times,
    )

read_gpio(index, timeout=None, retry_times=None) async

35 (0x23): Read GPIO Pin Levels and Configuration State

See method client.set_gpio for details on the GPIO architecture.

This functionality comes with many caveats. Please review the documentation in your device's datasheet.

Source code in crystalfontz/client.py
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
async def read_gpio(
    self: Self,
    index: int,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> GpioRead:
    """
    35 (0x23): Read GPIO Pin Levels and Configuration State

    See method `client.set_gpio` for details on the GPIO architecture.

    This functionality comes with many caveats. Please review the documentation in
    your device's datasheet.
    """

    return await self.send_command(
        ReadGpio(index), GpioRead, timeout=timeout, retry_times=retry_times
    )

read_lcd_memory(address, timeout=None, retry_times=None) async

10 (0x0A): Read 8 bytes of LCD Memory

This command will return the contents of the LCD's DDRAM or CGRAM. This command is intended for debugging.

Source code in crystalfontz/client.py
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
async def read_lcd_memory(
    self: Self,
    address: int,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> LcdMemory:
    """
    10 (0x0A): Read 8 bytes of LCD Memory

    This command will return the contents of the LCD's DDRAM or CGRAM. This
    command is intended for debugging.
    """

    return await self.send_command(
        ReadLcdMemory(address), LcdMemory, timeout=timeout, retry_times=retry_times
    )

read_status(timeout=None, retry_times=None) async

30 (0x1E): Read Reporting & Status

This command can be used to verify the current items configured to report to the host, as well as some other miscellaneous status information. Please note that the information returned is not identical between devices, and may in fact vary between firmware versions of the same model. As such, the return value of this function is not type-safe.

Source code in crystalfontz/client.py
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
async def read_status(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> DeviceStatus:
    """
    30 (0x1E): Read Reporting & Status

    This command can be used to verify the current items configured to report to
    the host, as well as some other miscellaneous status information. Please
    note that the information returned is not identical between devices, and may
    in fact vary between firmware versions of the same model. As such, the return
    value of this function is not type-safe.
    """

    res: StatusRead = await self.send_command(
        ReadStatus(), StatusRead, timeout=timeout, retry_times=retry_times
    )
    return self.device.status(res.data)

read_user_flash_area(timeout=None, retry_times=None) async

3 (0x03): Read User Flash Area

This command will read the User Flash Area and return the data to the host. For more information, review the documentation for client.write_user_flash_area.

Source code in crystalfontz/client.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
async def read_user_flash_area(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> UserFlashAreaRead:
    """
    3 (0x03): Read User Flash Area

    This command will read the User Flash Area and return the data to the host.
    For more information, review the documentation for
    `client.write_user_flash_area`.
    """

    return await self.send_command(
        ReadUserFlashArea(),
        UserFlashAreaRead,
        timeout=timeout,
        retry_times=retry_times,
    )

reboot_lcd(timeout=None, retry_times=None) async

Reboot the device, using 5 (0x05): Reboot Device, Reset Host, or Power Off Host.

Rebooting the device may be useful for testing the boot configuration. It may also be useful to re-enumerate the devices on the One-Wire bus.

Source code in crystalfontz/client.py
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
async def reboot_lcd(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> PowerResponse:
    """
    Reboot the device, using 5 (0x05): Reboot Device, Reset Host, or Power Off
    Host.

    Rebooting the device may be useful for testing the boot configuration. It may
    also be useful to re-enumerate the devices on the One-Wire bus.
    """

    return await self.send_command(
        RebootLCD(), PowerResponse, timeout=timeout, retry_times=retry_times
    )

reset_host(timeout=None, retry_times=None) async

Reset the host, using 5 (0x05): Reboot Device, Reset Host, or Power Off Host.

This command assumes the host's reset line is connected to GPIO[3]. For more information, review your device's datasheet.

Source code in crystalfontz/client.py
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
async def reset_host(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> PowerResponse:
    """
    Reset the host, using 5 (0x05): Reboot Device, Reset Host, or Power Off Host.

    This command assumes the host's reset line is connected to GPIO[3]. For more
    information, review your device's datasheet.
    """

    await self.send_command(ResetHost(), PowerResponse)
    return await self.expect(
        PowerResponse, timeout=timeout, retry_times=retry_times
    )

screensaver(text, tick=None, timeout=None, retry_times=None)

Display a screensaver effect on the LCD screen.

Source code in crystalfontz/client.py
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
def screensaver(
    self: Self,
    text: str,
    tick: Optional[float] = None,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> Screensaver:
    """
    Display a screensaver effect on the LCD screen.
    """

    return Screensaver(
        client=self,
        text=text,
        tick=tick,
        timeout=timeout,
        retry_times=retry_times,
        loop=self.loop,
    )

send_command(command, response_cls, timeout=None, retry_times=None) async

Send a Command, then wait for and return its expected Response.

This method accepts timeout and retry_times parameters. If defined, they will override the client's default timeout.

This is a low level method. Most use cases are met by individual command methods.

Source code in crystalfontz/client.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
@retry
async def send_command(
    self: Self,
    command: Command,
    response_cls: Type[R],
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> R:
    """
    Send a `Command`, then wait for and return its expected `Response`.

    This method accepts `timeout` and `retry_times` parameters. If defined, they
    will override the client's default timeout.

    This is a low level method. Most use cases are met by individual command
    methods.
    """
    async with self._lock:
        self.send_packet(command.to_packet())
        return await self.expect(response_cls, timeout=timeout)

send_command_to_lcd_controller(location, data, timeout=None, retry_times=None) async

22 (0x16): Send Command Directly to the LCD Controller

The controller on the CFA533 is HD44780 compatible. Generally, you will not need low-level access to the LCD controller but some arcane functions of the HD44780 are not exposed by the CFA533's command set. This command allows you to access the CFA533's LCD controller directly.

Source code in crystalfontz/client.py
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
async def send_command_to_lcd_controller(
    self: Self,
    location: LcdRegister,
    data: int | bytes,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> CommandSentToLcdController:
    """
    22 (0x16): Send Command Directly to the LCD Controller

    The controller on the CFA533 is HD44780 compatible. Generally, you will not
    need low-level access to the LCD controller but some arcane functions of the
    HD44780 are not exposed by the CFA533's command set. This command allows you
    to access the CFA533's LCD controller directly.
    """

    return await self.send_command(
        SendCommandToLcdController(location, data),
        CommandSentToLcdController,
        timeout=timeout,
        retry_times=retry_times,
    )

send_data(row, column, data, timeout=None, retry_times=None) async

31 (0x1F): Send Data to LCD

This command allows data to be placed at any position on the LCD.

Source code in crystalfontz/client.py
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
async def send_data(
    self: Self,
    row: int,
    column: int,
    data: str | bytes,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> DataSent:
    """
    31 (0x1F): Send Data to LCD

    This command allows data to be placed at any position on the LCD.
    """

    return await self.send_command(
        SendData(row, column, data, self.device),
        DataSent,
        timeout=timeout,
        retry_times=retry_times,
    )

set_atx_power_switch_functionality(settings, timeout=None, retry_times=None) async

28 (0x1C): Set ATX Power Switch Functionality

The combination of this device with the Crystalfontz WR-PWR-Y14 cable can be used to replace the function of the power and reset switches in a standard ATX-compatible system.

This functionality comes with a number of caveats. Please review your device's datasheet for more information.

Source code in crystalfontz/client.py
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
async def set_atx_power_switch_functionality(
    self: Self,
    settings: AtxPowerSwitchFunctionalitySettings,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> AtxPowerSwitchFunctionalitySet:
    """
    28 (0x1C): Set ATX Power Switch Functionality

    The combination of this device with the Crystalfontz WR-PWR-Y14 cable can
    be used to replace the function of the power and reset switches in a standard
    ATX-compatible system.

    This functionality comes with a number of caveats. Please review your device's
    datasheet for more information.
    """

    return await self.send_command(
        SetAtxPowerSwitchFunctionality(settings),
        AtxPowerSwitchFunctionalitySet,
        timeout=timeout,
        retry_times=retry_times,
    )

set_backlight(lcd_brightness, keypad_brightness=None, timeout=None, retry_times=None) async

14 (0x0E): Set LCD & Keypad Backlight

This command sets the brightness of the LCD and keypad backlights.

Source code in crystalfontz/client.py
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
async def set_backlight(
    self: Self,
    lcd_brightness: float,
    keypad_brightness: Optional[float] = None,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> BacklightSet:
    """
    14 (0x0E): Set LCD & Keypad Backlight

    This command sets the brightness of the LCD and keypad backlights.
    """

    return await self.send_command(
        SetBacklight(lcd_brightness, keypad_brightness, self.device),
        BacklightSet,
        timeout=timeout,
        retry_times=retry_times,
    )

set_baud_rate(baud_rate, timeout=None, retry_times=None) async

33 (0x21): Set Baud Rate

This command will change the device's baud rate. This method sends the baud rate command, waits for a positive acknowledgement from the device at the old baud rate, and then switches to the new baud rate. The baud rate must be saved by a call to client.store_boot_state if you want the device to power up at the new baud rate.

Source code in crystalfontz/client.py
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
async def set_baud_rate(
    self: Self,
    baud_rate: BaudRate,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> BaudRateSet:
    """
    33 (0x21): Set Baud Rate

    This command will change the device's baud rate. This method sends the baud
    rate command, waits for a positive acknowledgement from the device at the old
    baud rate, and then switches to the new baud rate. The baud rate must be saved
    by a call to `client.store_boot_state` if you want the device to power up at
    the new baud rate.
    """

    res: BaudRateSet = await self.send_command(
        SetBaudRate(baud_rate),
        BaudRateSet,
        timeout=timeout,
        retry_times=retry_times,
    )
    self.baud_rate = baud_rate
    return res

set_contrast(contrast, timeout=None, retry_times=None) async

13 (0x0D): Set LCD Contrast

This command sets the contrast or vertical viewing angle of the display.

Source code in crystalfontz/client.py
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
async def set_contrast(
    self: Self,
    contrast: float,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> ContrastSet:
    """
    13 (0x0D): Set LCD Contrast

    This command sets the contrast or vertical viewing angle of the display.
    """

    return await self.send_command(
        SetContrast(contrast, self.device),
        ContrastSet,
        timeout=timeout,
        retry_times=retry_times,
    )

set_cursor_position(row, column, timeout=None, retry_times=None) async

11 (0x0B): Set LCD Cursor Position

This command allows the cursor to be placed at the desired location on the device's LCD screen.

Source code in crystalfontz/client.py
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
async def set_cursor_position(
    self: Self,
    row: int,
    column: int,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> CursorPositionSet:
    """
    11 (0x0B): Set LCD Cursor Position

    This command allows the cursor to be placed at the desired location on the
    device's LCD screen.
    """

    return await self.send_command(
        SetCursorPosition(row, column, self.device),
        CursorPositionSet,
        timeout=timeout,
        retry_times=retry_times,
    )

set_cursor_style(style, timeout=None, retry_times=None) async

12 (0x0C): Set LCD Cursor Style

This command allows you to select among four hardware generated cursor options.

Source code in crystalfontz/client.py
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
async def set_cursor_style(
    self: Self,
    style: CursorStyle,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> CursorStyleSet:
    """
    12 (0x0C): Set LCD Cursor Style

    This command allows you to select among four hardware generated cursor
    options.
    """

    return await self.send_command(
        SetCursorStyle(style),
        CursorStyleSet,
        timeout=timeout,
        retry_times=retry_times,
    )

set_gpio(index, output_state, settings=None, timeout=None, retry_times=None) async

34 (0x22): Set or Set and Configure GPIO Pins

The CFA533 (hardware versions 1.4 and up, firmware versions 1.9 and up) has five pins for user-definable general purpose input / output (GPIO). These pins are shared with the DOW and ATX functions. Be careful when you configure GPIO if you want to use the ATX or DOW at the same time.

This functionality comes with many caveats. Please review the documentation in your device's datasheet.

Source code in crystalfontz/client.py
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
async def set_gpio(
    self: Self,
    index: int,
    output_state: int,
    settings: Optional[GpioSettings] = None,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> GpioSet:
    """
    34 (0x22): Set or Set and Configure GPIO Pins

    The CFA533 (hardware versions 1.4 and up, firmware versions 1.9 and up) has
    five pins for user-definable general purpose input / output (GPIO). These pins
    are shared with the DOW and ATX functions. Be careful when you configure GPIO
    if you want to use the ATX or DOW at the same time.

    This functionality comes with many caveats. Please review the documentation in
    your device's datasheet.
    """

    return await self.send_command(
        SetGpio(index, output_state, settings),
        GpioSet,
        timeout=timeout,
        retry_times=retry_times,
    )

set_line_1(line, timeout=None, retry_times=None) async

7 (0x07): Set LCD Contents, Line 1

Sets the center 16 characters displayed on the top line of the LCD screen.

Please use this command only if you need backwards compatibility with older devices. For new applications, please use the more flexible command client.send_data.

Source code in crystalfontz/client.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
async def set_line_1(
    self: Self,
    line: str | bytes,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> Line1Set:
    """
    7 (0x07): Set LCD Contents, Line 1

    Sets the center 16 characters displayed on the top line of the LCD screen.

    Please use this command only if you need backwards compatibility with older
    devices. For new applications, please use the more flexible command
    `client.send_data`.
    """

    return await self.send_command(
        SetLine1(line, self.device),
        Line1Set,
        timeout=timeout,
        retry_times=retry_times,
    )

set_line_2(line, timeout=None, retry_times=None) async

8 (0x08): Set LCD Contents, Line 2

Sets the center 16 characters displayed on the bottom line of the LCD screen.

Please use this command only if you need backwards compatibility with older devices. For new applications, please use the more flexible command client.send_data.

Source code in crystalfontz/client.py
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
async def set_line_2(
    self: Self,
    line: str | bytes,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> Line2Set:
    """
    8 (0x08): Set LCD Contents, Line 2

    Sets the center 16 characters displayed on the bottom line of the LCD screen.

    Please use this command only if you need backwards compatibility with older
    devices. For new applications, please use the more flexible command
    `client.send_data`.
    """

    return await self.send_command(
        SetLine2(line, self.device),
        Line2Set,
        timeout=timeout,
        retry_times=retry_times,
    )

set_special_character_data(index, character, timeout=None, retry_times=None) async

9 (0x09): Set LCD Special Character Data

Sets the font definition for one of the special characters (CGRAM).

Source code in crystalfontz/client.py
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
async def set_special_character_data(
    self: Self,
    index: int,
    character: SpecialCharacter,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> SpecialCharacterDataSet:
    """
    9 (0x09): Set LCD Special Character Data

    Sets the font definition for one of the special characters (CGRAM).
    """

    return await self.send_command(
        SetSpecialCharacterData(index, character, self.device),
        SpecialCharacterDataSet,
        timeout=timeout,
        retry_times=retry_times,
    )

set_special_character_encoding(character, index)

Configure a unicode character to encode to the index of a given special character on CGRAM.

Source code in crystalfontz/client.py
866
867
868
869
870
871
872
873
874
875
876
def set_special_character_encoding(
    self: Self,
    character: str,
    index: int,
) -> None:
    """
    Configure a unicode character to encode to the index of a given special
    character on CGRAM.
    """

    self.device.character_rom.set_encoding(character, index)

setup_live_temperature_display(slot, item, timeout=None, retry_times=None) async

21 (0x15): Set Up Live Temperature Display

You can configure the device to automatically update a portion of the LCD with a "live" temperature reading. Once the display is configured using this command, the device will continue to display the live reading on the LCD without host intervention.

Source code in crystalfontz/client.py
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
async def setup_live_temperature_display(
    self: Self,
    slot: int,
    item: TemperatureDisplayItem,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> LiveTemperatureDisplaySetUp:
    """
    21 (0x15): Set Up Live Temperature Display

    You can configure the device to automatically update a portion of the LCD with
    a "live" temperature reading. Once the display is configured using this
    command, the device will continue to display the live reading on the LCD
    without host intervention.
    """

    return await self.send_command(
        SetupLiveTemperatureDisplay(slot, item, self.device),
        LiveTemperatureDisplaySetUp,
        timeout=timeout,
        retry_times=retry_times,
    )

setup_temperature_reporting(enabled, timeout=None, retry_times=None) async

19 (0x13): Set Up Temperature Reporting

This command will configure the device to report the temperature information to the host every second.

Source code in crystalfontz/client.py
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
async def setup_temperature_reporting(
    self: Self,
    enabled: Iterable[int],
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> TemperatureReportingSetUp:
    """
    19 (0x13): Set Up Temperature Reporting

    This command will configure the device to report the temperature information
    to the host every second.
    """

    return await self.send_command(
        SetupTemperatureReporting(enabled, self.device),
        TemperatureReportingSetUp,
        timeout=timeout,
        retry_times=retry_times,
    )

shutdown_host(timeout=None, retry_times=None) async

Turn off the host's power, using 5 (0x05): Reboot Device, Reset Host, or Power Off Host.

This command assumes the host's power control line is connected to GPIO[2]. For more information, review your device's datasheet.

Source code in crystalfontz/client.py
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
async def shutdown_host(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> PowerResponse:
    """
    Turn off the host's power, using 5 (0x05): Reboot Device, Reset Host, or Power
    Off Host.

    This command assumes the host's power control line is connected to GPIO[2].
    For more information, review your device's datasheet.
    """

    return await self.send_command(
        ShutdownHost(), PowerResponse, timeout=timeout, retry_times=retry_times
    )

store_boot_state(timeout=None, retry_times=None) async

4 (0x04): Store Current State as Boot State

The device loads its power-up configuration from nonvolatile memory when power is applied. The device is configured at the factory to display a "welcome" screen when power is applied. This command can be used to customize the "welcome" screen, as well as the following items:

  • Characters shown on LCD
  • Special character font definitions
  • Cursor position
  • Cursor style
  • Contrast setting
  • LCD backlight setting
  • Settings of any "live" displays, such as temperature display
  • Key press and release masks
  • ATX function enable and pulse length settings
  • Baud rate
  • GPIO settings

You cannot store the temperature reporting (although the live display of temperatures can be saved). You cannot store the host watchdog. The host software should enable this item once the system is initialized and is ready to receive the data.

Source code in crystalfontz/client.py
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
async def store_boot_state(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> BootStateStored:
    """
    4 (0x04): Store Current State as Boot State

    The device loads its power-up configuration from nonvolatile memory when
    power is applied. The device is configured at the factory to display a
    "welcome" screen when power is applied. This command can be used to customize
    the "welcome" screen, as well as the following items:

    - Characters shown on LCD
    - Special character font definitions
    - Cursor position
    - Cursor style
    - Contrast setting
    - LCD backlight setting
    - Settings of any "live" displays, such as temperature display
    - Key press and release masks
    - ATX function enable and pulse length settings
    - Baud rate
    - GPIO settings

    You cannot store the temperature reporting (although the live display of
    temperatures can be saved). You cannot store the host watchdog. The host
    software should enable this item once the system is initialized and is ready
    to receive the data.
    """

    return await self.send_command(
        StoreBootState(), BootStateStored, timeout=timeout, retry_times=retry_times
    )

subscribe(cls, expect=True)

Subscribe to results of a given response class. Returns a Receiver[Response].

This is a low level method. Most use cases not met by individual command methods or a ReportHandler are best handled with client.expect.

Source code in crystalfontz/client.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def subscribe(self: Self, cls: Type[R], expect: bool = True) -> Receiver[R]:
    """
    Subscribe to results of a given response class. Returns a
    `Receiver[Response]`.

    This is a low level method. Most use cases not met by individual command
    methods or a ReportHandler are best handled with `client.expect`.
    """

    receiving: Set[Receiver[Any]] = self._receiving if expect else set()

    rcv: Receiver[R] = Receiver(receiving)
    key = cast(Type[Response], cls)
    value = cast(Receiver[Response], rcv)
    self._receivers[key].append(value)
    return rcv

test_connection(timeout=None, retry_times=None) async

Test the connection by sending a ping and checking that the response matches.

Source code in crystalfontz/client.py
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
async def test_connection(
    self: Self, timeout: Optional[float] = None, retry_times: Optional[int] = None
) -> None:
    """
    Test the connection by sending a ping and checking that the response matches.
    """

    payload: bytes = "".join(
        random.choice(ascii_lowercase) for _ in range(16)
    ).encode("ascii")
    try:
        pong = await self.ping(payload, timeout=timeout, retry_times=retry_times)
    except TimeoutError as exc:
        raise ConnectionError(
            "Failed to receive packet within "
            f"{timeout if timeout is not None else self._default_timeout} seconds"
        ) from exc
    if pong.response != payload:
        raise ConnectionError(f"{pong.response} != {payload}")

unsubscribe(cls, receiver)

Unsubscribe from results of a given response class and queue. This queue is typically created by a call to client.subscribe.

This is a low level method. Most use cases not met by individual command methods or a ReportHandler are best handled with client.expect.

Source code in crystalfontz/client.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def unsubscribe(self: Self, cls: Type[R], receiver: Receiver[R]) -> None:
    """
    Unsubscribe from results of a given response class and queue. This queue is
    typically created by a call to `client.subscribe`.

    This is a low level method. Most use cases not met by individual command
    methods or a ReportHandler are best handled with `client.expect`.
    """

    key = cast(Type[Response], cls)
    value = [
        rcv
        for rcv in self._receivers[key]
        if rcv != cast(Receiver[Response], receiver)
    ]

    cast_value = cast(List[Receiver[Response]], value)
    self._receivers[key] = cast_value

versions(timeout=None, retry_times=None) async

1 (0x01): Get Hardware & Firmware Version

The device will return the hardware and firmware version information to the host.

Source code in crystalfontz/client.py
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
async def versions(
    self: Self,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> Versions:
    """
    1 (0x01): Get Hardware & Firmware Version

    The device will return the hardware and firmware version information to the
    host.
    """

    return await self.send_command(
        GetVersions(), Versions, timeout=timeout, retry_times=retry_times
    )

write_user_flash_area(data, timeout=None, retry_times=None) async

2 (0x02): Write User Flash Area

The CFA533 reserves 16 bytes of nonvolatile memory for arbitrary use by the host. This memory can be used to store a serial number, IP address, gateway address, netmask, or any other data required. All 16 bytes must be supplied.

Source code in crystalfontz/client.py
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
async def write_user_flash_area(
    self: Self,
    data: bytes,
    timeout: Optional[float] = None,
    retry_times: Optional[int] = None,
) -> UserFlashAreaWritten:
    """
    2 (0x02): Write User Flash Area

    The CFA533 reserves 16 bytes of nonvolatile memory for arbitrary use by the
    host. This memory can be used to store a serial number, IP address, gateway
    address, netmask, or any other data required. All 16 bytes must be supplied.
    """

    return await self.send_command(
        WriteUserFlashArea(data),
        UserFlashAreaWritten,
        timeout=timeout,
        retry_times=retry_times,
    )

Config

Bases: BaseConfig

A configuration object. This class is typically used by the Crystalfontz CLI, but may also be useful for scripts or Jupyter notebooks using its configuration.

Source code in crystalfontz/config.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@config(APP_NAME)
class Config(BaseConfig):
    """
    A configuration object. This class is typically used by the Crystalfontz CLI, but
    may also be useful for scripts or Jupyter notebooks using its configuration.
    """

    port: str = field(default=DEFAULT_PORT, env_var="PORT")
    model: str = field(default="CFA533", env_var="MODEL")
    hardware_rev: Optional[str] = field(default=None, env_var="HARDWARE_REV")
    firmware_rev: Optional[str] = field(default=None, env_var="FIRMWARE_REV")
    baud_rate: BaudRate = field(
        default=SLOW_BAUD_RATE, env_var="BAUD_RATE", load=load_baud_rate, dump=str
    )
    timeout: float = field(default=DEFAULT_TIMEOUT, env_var="TIMEOUT")
    retry_times: int = field(default=DEFAULT_RETRY_TIMES, env_var="RETRY_TIMES")

ConnectionError

Bases: CrystalfontzError

A connection error.

Source code in crystalfontz/error.py
12
13
14
15
16
17
class ConnectionError(CrystalfontzError):
    """
    A connection error.
    """

    pass

CrystalfontzError

Bases: Exception

An error in the Crystalfontz client.

Source code in crystalfontz/error.py
4
5
6
7
8
9
class CrystalfontzError(Exception):
    """
    An error in the Crystalfontz client.
    """

    pass

CursorStyle

Bases: Enum

A cursor style, as set with command 12 (0x0C): Set LCD Cursor Style.

  • NONE: No cursor.
  • BLINKING_BLOCK: Blinking block cursor.
  • STATIC_UNDERSCORE: Static underscore cursor.
  • BLINKING_UNDERSCORE: Blinking underscore cursor. On the CFA633, this represents a blinking block plus an underscore.
Source code in crystalfontz/cursor.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class CursorStyle(Enum):
    """
    A cursor style, as set with command 12 (0x0C): Set LCD Cursor Style.

    - **NONE**:  No cursor.
    - **BLINKING_BLOCK**: Blinking block cursor.
    - **STATIC_UNDERSCORE**: Static underscore cursor.
    - **BLINKING_UNDERSCORE**: Blinking underscore cursor. On the CFA633, this
      represents a blinking block plus an underscore.
    """

    NONE = 0
    BLINKING_BLOCK = 1
    STATIC_UNDERSCORE = 2
    BLINKING_UNDERSCORE = 3

DanceParty

Bases: Effect

A dance party effect. Randomly changes the backlight and contrast settings on an interval.

Source code in crystalfontz/effects.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class DanceParty(Effect):
    """
    A dance party effect. Randomly changes the backlight and contrast settings on
    an interval.
    """

    def __init__(
        self: Self,
        client: EffectClient,
        tick: Optional[float] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        super().__init__(
            client=client,
            tick=tick if tick is not None else 0.5,
            timeout=timeout,
            retry_times=retry_times,
            loop=loop,
        )

    def _random_contrast(self: Self) -> float:
        return random.choice([0.4, 0.5, 0.6])

    def _random_brightness(self: Self) -> float:
        return random.choice([0.2, 0.4, 0.6, 0.8])

    async def render(self: Self) -> None:
        await asyncio.gather(
            self.client.set_contrast(self._random_contrast()),
            self.client.set_backlight(self._random_brightness()),
        )

DecodeError

Bases: CrystalfontzError

An error while decoding incoming data.

Source code in crystalfontz/error.py
28
29
30
31
32
33
class DecodeError(CrystalfontzError):
    """
    An error while decoding incoming data.
    """

    pass

Device

Bases: ABC

An abstract device. Subclasses of Device contain parameter and methods particular to a given model, hardware revision and firmware revision.

Source code in crystalfontz/device.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class Device(ABC):
    """
    An abstract device. Subclasses of Device contain parameter and methods
    particular to a given model, hardware revision and firmware revision.
    """

    model: str = "<unknown>"
    hardware_rev: str = "<unknown>"
    firmware_rev: str = "<unknown>"

    lines: int = 2
    columns: int = 16
    character_width: int = 6
    character_height: int = 8
    character_rom: CharacterRom = CFA533_CHARACTER_ROM
    n_temperature_sensors: int = 0

    def contrast(self: Self, contrast: float) -> bytes:
        """
        Set the contrast of the device. This is device-dependent.
        """
        raise NotImplementedError("contrast")

    def brightness(
        self: Self, lcd_brightness: float, keypad_brightness: Optional[float]
    ) -> bytes:
        """
        Set the brightness of the device's LCD and keypad. This is device-dependent.
        """

        raise NotImplementedError("brightness")

    def status(self: Self, data: bytes) -> DeviceStatus:
        """
        Parse the status included in a device response into a status object.
        This is highly device-dependent.
        """

        raise NotImplementedError("status")

brightness(lcd_brightness, keypad_brightness)

Set the brightness of the device's LCD and keypad. This is device-dependent.

Source code in crystalfontz/device.py
102
103
104
105
106
107
108
109
def brightness(
    self: Self, lcd_brightness: float, keypad_brightness: Optional[float]
) -> bytes:
    """
    Set the brightness of the device's LCD and keypad. This is device-dependent.
    """

    raise NotImplementedError("brightness")

contrast(contrast)

Set the contrast of the device. This is device-dependent.

Source code in crystalfontz/device.py
 96
 97
 98
 99
100
def contrast(self: Self, contrast: float) -> bytes:
    """
    Set the contrast of the device. This is device-dependent.
    """
    raise NotImplementedError("contrast")

status(data)

Parse the status included in a device response into a status object. This is highly device-dependent.

Source code in crystalfontz/device.py
111
112
113
114
115
116
117
def status(self: Self, data: bytes) -> DeviceStatus:
    """
    Parse the status included in a device response into a status object.
    This is highly device-dependent.
    """

    raise NotImplementedError("status")

DeviceError

Bases: CrystalfontzError

An error returned from the device.

Source code in crystalfontz/error.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class DeviceError(CrystalfontzError):
    """
    An error returned from the device.
    """

    @classmethod
    def is_error_code(cls: Type[Self], code: int) -> bool:
        # Error codes start with bits 0b11
        return code >> 6 == 0b11

    def __init__(self: Self, packet: Tuple[int, bytes]) -> None:
        code, payload = packet
        # The six bits following the 0b11 correspond to the command
        self.command = code & 0o77
        # The expected response code, so we can match this error with the
        # expected success response
        self.expected_response = self.command + 0x40
        self.payload = payload
        message = f"Error executing command 0x{self.command:02X}"

        if len(self.payload):
            message += f": {self.payload}"

        super().__init__(message)

DeviceLookupError

Bases: CrystalfontzError

An error while looking up a device.

Source code in crystalfontz/error.py
54
55
56
57
58
59
class DeviceLookupError(CrystalfontzError):
    """
    An error while looking up a device.
    """

    pass

Effect

Bases: ABC

An effect. Effects are time-based actions implemented on top of the client, such as marquees and screensavers.

Source code in crystalfontz/effects.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class Effect(ABC):
    """
    An effect. Effects are time-based actions implemented on top of the client,
    such as marquees and screensavers.
    """

    def __init__(
        self: Self,
        client: EffectClient,
        tick: float = 1.0,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        _loop = loop if loop else asyncio.get_running_loop()
        self._event_loop: asyncio.AbstractEventLoop = _loop

        self.timeout: Optional[float] = timeout
        self.retry_times: Optional[int] = retry_times

        self.client: EffectClient = client
        self._running: bool = False
        self._tick: float = tick
        self._task: Optional[asyncio.Task[None]] = None
        self._timer: float = time.time()

    async def run(self: Self) -> None:
        self._running = True

        self.reset_timer()
        await self.start()

        while True:
            self.reset_timer()
            if not self._running:
                await self.finish()
                return
            await self.render()
            await asyncio.sleep(self.time_remaining(self._tick))

    def reset_timer(self: Self) -> None:
        self._timer = time.time()

    @property
    def time_elapsed(self: Self) -> float:
        return time.time() - self._timer

    def time_remaining(self: Self, wait_for: float) -> float:
        return max(wait_for - self.time_elapsed, 0)

    async def sleep_remaining(self: Self, wait_for: float) -> None:
        await asyncio.sleep(self.time_remaining(wait_for))

    async def start(self: Self) -> None:
        pass

    @abstractmethod
    async def render(self: Self) -> None:
        raise NotImplementedError("tick")

    async def finish(self: Self) -> None:
        pass

    def stop(self: Self) -> None:
        self._running = False

EffectClient

Bases: Protocol

A protocol for any client used by effects.

This protocol covers a subset of the Client class which may be used by effects.

Source code in crystalfontz/effects.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class EffectClient(Protocol):
    """
    A protocol for any client used by effects.

    This protocol covers a subset of the `Client` class which may be used by effects.
    """

    device: Device

    async def clear_screen(
        self: Self, timeout: Optional[float] = None, retry_times: Optional[int] = None
    ) -> ClearedScreen: ...

    async def set_cursor_position(
        self: Self,
        row: int,
        column: int,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> CursorPositionSet: ...

    async def set_cursor_style(
        self: Self,
        style: CursorStyle,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> CursorStyleSet: ...

    async def set_contrast(
        self: Self,
        contrast: float,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> ContrastSet: ...

    async def set_backlight(
        self: Self,
        lcd_brightness: float,
        keypad_brightness: Optional[int] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> BacklightSet: ...

    async def send_data(
        self: Self,
        row: int,
        column: int,
        data: str | bytes,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
    ) -> DataSent: ...

EncodeError

Bases: CrystalfontzError

An error while encoding outgoing data.

Source code in crystalfontz/error.py
46
47
48
49
50
51
class EncodeError(CrystalfontzError):
    """
    An error while encoding outgoing data.
    """

    pass

GpioDriveMode

Bases: Enum

Pin drive mode, based on the output state.

For details on these settings and the supported combination, refer to your device's datasheet.

Source code in crystalfontz/gpio.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class GpioDriveMode(Enum):
    """
    Pin drive mode, based on the output state.

    For details on these settings and the supported combination, refer to your
    device's datasheet.
    """

    SLOW_STRONG = 1
    FAST_STRONG = 2
    RESISTIVE = 3
    HI_Z = 4

    @classmethod
    def from_byte(
        cls: Type[Self], mode: int
    ) -> "Tuple[Optional[GpioDriveMode], Optional[GpioDriveMode]]":
        up: Optional[GpioDriveMode] = None
        down: Optional[GpioDriveMode] = None
        if mode == 0b000:
            up = GpioDriveMode.FAST_STRONG
            down = GpioDriveMode.RESISTIVE
        elif mode == 0b001:
            up = GpioDriveMode.FAST_STRONG
            down = GpioDriveMode.FAST_STRONG
        elif mode == 0b010:
            up = GpioDriveMode.HI_Z
            down = None
        elif mode == 0b011:
            up = GpioDriveMode.RESISTIVE
            down = GpioDriveMode.FAST_STRONG
        elif mode == 0b100:
            up = GpioDriveMode.SLOW_STRONG
            down = GpioDriveMode.HI_Z
        elif mode == 0b101:
            up = GpioDriveMode.SLOW_STRONG
            down = GpioDriveMode.SLOW_STRONG
        elif mode == 0b110:
            warnings.warn(f"Drive mode {mode:0b} is reserved")
        else:
            up = GpioDriveMode.HI_Z
            down = GpioDriveMode.SLOW_STRONG

        return (up, down)

GpioFunction

Bases: Enum

Pin function.

  • UNUSED: Port unused for GPIO. It will take on the default function such as ATX, DOW or unused.
  • USED: Port used for GPIO under user control.
Source code in crystalfontz/gpio.py
34
35
36
37
38
39
40
41
42
43
44
class GpioFunction(Enum):
    """
    Pin function.

    - **UNUSED**: Port unused for GPIO. It will take on the default function such as
      ATX, DOW or unused.
    - **USED**: Port used for GPIO under user control.
    """

    UNUSED = 0b0000
    USED = 0b1000

GpioSettings

GPIO pin settings.

Attributes:
  • function (GpioFunction) –

    The pin's function.

  • mode (Optional[int]) –

    The raw setting of the pin's modes.

  • up (Optional[GpioDriveMode]) –

    The pin's mode for drive-up.

  • down (Optional[GpioDriveMode]) –

    The pin's mode for drive-down.

Source code in crystalfontz/gpio.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class GpioSettings:
    """
    GPIO pin settings.

    Attributes:
        function (GpioFunction): The pin's function.
        mode (Optional[int]): The raw setting of the pin's modes.
        up (Optional[GpioDriveMode]): The pin's mode for drive-up.
        down (Optional[GpioDriveMode]): The pin's mode for drive-down.
    """

    def __init__(
        self: Self,
        function: GpioFunction,
        mode: Optional[int] = None,
        up: Optional[GpioDriveMode] = None,
        down: Optional[GpioDriveMode] = None,
    ) -> None:
        self.function: GpioFunction = function
        self.mode: int

        def invalid() -> NoReturn:
            raise ValueError(f"Unsupported combination up={up}, down={down}")

        if mode is not None:
            if not (0 <= mode <= 0b111):
                raise ValueError(f"Invalid mode {mode:0b}")
            self.mode = mode
            up, down = GpioDriveMode.from_byte(mode)
            return

        if up == GpioDriveMode.FAST_STRONG:
            if down == GpioDriveMode.RESISTIVE:
                self.mode = 0b000
            elif down == GpioDriveMode.FAST_STRONG:
                self.mode = 0b001
            else:
                invalid()
        elif up == GpioDriveMode.SLOW_STRONG:
            if down == GpioDriveMode.HI_Z:
                self.mode = 0b100
            elif down == GpioDriveMode.SLOW_STRONG:
                self.mode = 0b101
            else:
                invalid()
        elif up == GpioDriveMode.RESISTIVE:
            if down == GpioDriveMode.FAST_STRONG:
                self.mode = 0b011
            else:
                invalid()
        elif up == GpioDriveMode.HI_Z:
            if down is None:
                self.mode = 0b010
            elif down == GpioDriveMode.SLOW_STRONG:
                self.mode = 0b111
            else:
                invalid()
        else:
            invalid()

    def __str__(self: Self) -> str:
        return f"GpioSettings(function={self.function}, mode={self.mode:0b}"

    def to_bytes(self: Self) -> bytes:
        return (self.function.value + self.mode).to_bytes(1, "big")

    @classmethod
    def from_byte(cls: Type[Self], data: int) -> Self:
        function = GpioFunction.USED if data & 0b1000 else GpioFunction.UNUSED
        mode = data & 0b0111
        return cls(function=function, mode=mode)

    def as_dict(self: Self) -> Dict[str, Any]:
        up, down = GpioDriveMode.from_byte(self.mode)
        return dict(
            function=self.function.value,
            mode=self.mode,
            up=up.name if up is not None else None,
            down=down.name if down is not None else None,
        )

    def __repr__(self: Self) -> str:
        up, down = GpioDriveMode.from_byte(self.mode)
        repr_ = f"Function: {self.function.value}\n"
        repr_ += (
            f"Drive Mode: {up.name if up is not None else '<none>'}, "
            f"{down.name if down is not None else '<none>'}"
        )

        return repr_

GpioState dataclass

Pin state & changes since last poll.

Attributes:
  • state (bool) –

    State at the last reading. When True, the pin was high.

  • falling (bool) –

    At least one falling edge has been detected since the last poll.

  • rising (bool) –

    At least one rising edge has been detected since the last poll.

Source code in crystalfontz/gpio.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class GpioState:
    """
    Pin state & changes since last poll.

    Attributes:
        state: State at the last reading. When True, the pin was high.
        falling: At least one falling edge has been detected since the last poll.
        rising: At least one rising edge has been detected since the last poll.
    """

    state: bool
    falling: bool
    rising: bool

    @classmethod
    def from_byte(cls: Type[Self], data: int) -> Self:
        return cls(
            state=bool(data & 0b0001),
            falling=bool(data & 0b0010),
            rising=bool(data & 0b0100),
        )

KeyActivity

Bases: Enum

A key activity. This is either a "press" event or a "release" event for a given key.

Source code in crystalfontz/keys.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class KeyActivity(Enum):
    """
    A key activity. This is either a "press" event or a "release" event for a
    given key.
    """

    KEY_UP_PRESS = 1
    KEY_DOWN_PRESS = 2
    KEY_LEFT_PRESS = 3
    KEY_RIGHT_PRESS = 4
    KEY_ENTER_PRESS = 5
    KEY_EXIT_PRESS = 6
    KEY_UP_RELEASE = 7
    KEY_DOWN_RELEASE = 8
    KEY_LEFT_RELEASE = 9
    KEY_RIGHT_RELEASE = 10
    KEY_ENTER_RELEASE = 11
    KEY_EXIT_RELEASE = 12

    @classmethod
    def from_byte(cls: Type[Self], activity: int) -> "KeyActivity":
        return KEY_ACTIVITIES[activity - 1]

    @classmethod
    def from_bytes(cls: Type[Self], activity: bytes) -> "KeyActivity":
        return cls.from_byte(activity[0])

    def to_byte(self: Self) -> int:
        return self.value

KeyActivityReport

Bases: Response

A key activity report from the Crystalfontz LCD.

Attributes:
  • activity (KeyActivity) –

    The reported key activity.

Source code in crystalfontz/response.py
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
@code(0x80)
class KeyActivityReport(Response):
    """
    A key activity report from the Crystalfontz LCD.

    Attributes:
        activity (KeyActivity): The reported key activity.
    """

    def __init__(self: Self, activity: KeyActivity) -> None:
        self.activity: KeyActivity = activity

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        assert_len(1, data)

        activity: KeyActivity = KeyActivity.from_bytes(data)

        return cls(activity)

    def __str__(self: Self) -> str:
        return f"KeyActivityReport({self.activity.name})"

    def __repr__(self: Self) -> str:
        return f"{self.__class__.__name__}\t{self.activity.name}"

    def as_dict(self: Self) -> Dict[str, Any]:
        return dict(type=self.__class__.__name__, activity=self.activity.name)

KeyState dataclass

A key's state.

Attributes:
  • pressed (bool) –

    When True, the key is currently pressed.

  • pressed_since (bool) –

    When True, the key has been pressed since the last poll.

  • released_since (bool) –

    When True, the key has been released since the last poll.

Source code in crystalfontz/keys.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@dataclass
class KeyState:
    """
    A key's state.

    Attributes:
        pressed (bool): When True, the key is currently pressed.
        pressed_since (bool): When True, the key has been pressed since the last poll.
        released_since (bool): When True, the key has been released since the last
                               poll.
    """

    keypress: KeyPress
    pressed: bool
    pressed_since: bool
    released_since: bool

    @classmethod
    def from_bytes(cls: Type[Self], state: bytes, keypress: KeyPress) -> Self:
        pressed = state[0]
        pressed_since = state[1]
        released_since = state[2]

        return cls(
            keypress=keypress,
            pressed=bool(pressed & keypress),
            pressed_since=bool(pressed_since & keypress),
            released_since=bool(released_since & keypress),
        )

    def to_bytes(self: Self) -> Tuple[int, int, int]:
        pressed = self.keypress if self.pressed else 0x00
        pressed_since = self.keypress if self.pressed_since else 0x00
        released_since = self.keypress if self.released_since else 0x00

        return (pressed, pressed_since, released_since)

KeyStates dataclass

The state of all keys.

Attributes:
  • up (KeyState) –

    The state of the "up" key.

  • enter (KeyState) –

    The state of the "enter" key.

  • exit (KeyState) –

    The state of the "exit" key.

  • left (KeyState) –

    The state of the "left" key.

  • right (KeyState) –

    The state of the "right" key.

  • down (KeyState) –

    The state of the "down" key.

Source code in crystalfontz/keys.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@dataclass
class KeyStates:
    """
    The state of all keys.

    Attributes:
        up: The state of the "up" key.
        enter: The state of the "enter" key.
        exit: The state of the "exit" key.
        left: The state of the "left" key.
        right: The state of the "right" key.
        down: The state of the "down" key.
    """

    up: KeyState
    enter: KeyState
    exit: KeyState
    left: KeyState
    right: KeyState
    down: KeyState

    @classmethod
    def from_bytes(cls: Type[Self], state: bytes) -> Self:
        return cls(
            up=KeyState.from_bytes(state, KP_UP),
            enter=KeyState.from_bytes(state, KP_ENTER),
            exit=KeyState.from_bytes(state, KP_EXIT),
            left=KeyState.from_bytes(state, KP_LEFT),
            right=KeyState.from_bytes(state, KP_RIGHT),
            down=KeyState.from_bytes(state, KP_DOWN),
        )

    def to_bytes(self: Self) -> bytes:
        pressed = 0x00
        pressed_since = 0x00
        released_since = 0x00

        for state in [
            self.up,
            self.enter,
            self.exit,
            self.left,
            self.right,
            self.down,
        ]:
            p, p_s, r_s = state.to_bytes()
            pressed = pressed ^ p
            pressed_since = pressed_since ^ p_s
            released_since = released_since ^ r_s

        return bytes([pressed, pressed_since, released_since])

    def __repr__(self: Self) -> str:
        repr_ = ""
        for name, state in asdict(self).items():
            st = ", ".join(
                [
                    (
                        f"{n}={'yes' if s else 'no'}"
                        if isinstance(s, bool)
                        else f"{n}={keypress_repr(s)}"
                    )
                    for n, s in state.items()
                ]
            )
            repr_ += f"{name}: {st}\n"
        return repr_[0:-1]

KeypadPolled

Bases: Response

Attributes:
  • states (KeyStates) –

    The keypad's key states.

Source code in crystalfontz/response.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@code(0x58)
class KeypadPolled(Response):
    """
    Attributes:
        states (KeyStates): The keypad's key states.
    """

    def __init__(self: Self, states: KeyStates) -> None:
        self.states = states

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        assert_len(3, data)
        states = KeyStates.from_bytes(data)
        return cls(states)

    def __str__(self: Self) -> str:
        return f"KeypadPolled(states={self.states})"

    def as_dict(self: Self) -> Dict[str, Any]:
        return dict(states=asdict(self.states))

    def __repr__(self: Self) -> str:
        repr_ = "Keypad States:\n"
        repr_ += textwrap.indent(repr(self.states), "  ")

        return repr_

LcdMemory

Bases: Response

Attributes:
  • address (int) –

    The address read from LCD memory.

  • data (bytes) –

    The data read from the address in LCD memory.

Source code in crystalfontz/response.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
@code(0x4A)
class LcdMemory(Response):
    """
    Attributes:
        address (int): The address read from LCD memory.
        data (bytes): The data read from the address in LCD memory.
    """

    def __init__(self: Self, address: int, data: bytes) -> None:
        self.address: int = address
        self.data: bytes = data

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        assert_len(9, data)
        address: int = data[0]
        lcd_data: bytes = data[1:]

        return cls(address, lcd_data)

    def __str__(self: Self) -> str:
        return f"LcdMemory(0x{self.address:02X}={self.data})"

    def as_dict(self: Self) -> Dict[str, Any]:
        return dict(address=self.address, data=format_json_bytes(self.data))

    def __repr__(self: Self) -> str:
        return f"0x{self.address:02X}: {format_bytes(self.data)}"

LcdRegister

Bases: Enum

An LCD register. The LCD supports two registers, "DATA" and "CONTROL".

Source code in crystalfontz/lcd.py
 4
 5
 6
 7
 8
 9
10
class LcdRegister(Enum):
    """
    An LCD register. The LCD supports two registers, "DATA" and "CONTROL".
    """

    DATA = 0
    CONTROL = 1

LoggingReportHandler

Bases: ReportHandler

A report handler which logs, using Python's logging module.

Source code in crystalfontz/report.py
44
45
46
47
48
49
50
51
52
53
54
55
56
class LoggingReportHandler(ReportHandler):
    """
    A report handler which logs, using Python's logging module.
    """

    def __init__(self: Self) -> None:
        self.logger = logging.getLogger(__name__)

    async def on_key_activity(self: Self, report: KeyActivityReport) -> None:
        self.logger.info(report)

    async def on_temperature(self: Self, report: TemperatureReport) -> None:
        self.logger.info(report)

Marquee

Bases: Effect

A marquee. Prints text to a row, and scrolls it across the screen.

Source code in crystalfontz/effects.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class Marquee(Effect):
    """
    A marquee. Prints text to a row, and scrolls it across the screen.
    """

    def __init__(
        self: Self,
        client: EffectClient,
        row: int,
        text: str,
        pause: Optional[float] = None,
        tick: Optional[float] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        device = client.device

        if not (0 <= row < device.lines):
            raise ValueError(f"Invalid row: {row}")
        _tick = tick if tick is not None else 0.3

        super().__init__(
            client=client,
            tick=_tick,
            timeout=timeout,
            retry_times=retry_times,
            loop=loop,
        )
        self._pause: float = pause if pause is not None else _tick

        self.row: int = row
        self.text: bytes = device.character_rom.encode(text).ljust(device.columns, b" ")
        self.shift: int = 0

    async def start(self: Self) -> None:
        await self.render()
        await self.sleep_remaining(self._pause)

    async def render(self: Self) -> None:
        device = self.client.device
        buffer = self._line()
        await self.client.send_data(
            self.row, 0, buffer, timeout=self.timeout, retry_times=self.retry_times
        )
        self.shift += 1
        if self.shift > device.columns:
            self.shift = 0

    def _line(self: Self) -> bytes:
        device = self.client.device

        left: bytes = self.text[self.shift :]
        right: bytes = self.text[0 : self.shift]
        middle: bytes = b" " * max(device.columns - len(self.text), 1)
        return (left + middle + right)[0 : device.columns]

NoopReportHandler

Bases: ReportHandler

A report handler which does nothing.

Source code in crystalfontz/report.py
32
33
34
35
36
37
38
39
40
41
class NoopReportHandler(ReportHandler):
    """
    A report handler which does nothing.
    """

    async def on_key_activity(self: Self, report: KeyActivityReport) -> None:
        pass

    async def on_temperature(self: Self, report: TemperatureReport) -> None:
        pass

Pong

Bases: Response

Attributes:
  • response (bytes) –

    The data sent in the ping command.

Source code in crystalfontz/response.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@code(0x40)
class Pong(Response):
    """
    Attributes:
        response (bytes): The data sent in the ping command.
    """

    def __init__(self: Self, response: bytes) -> None:
        self.response = response

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        return cls(data)

    def __str__(self: Self) -> str:
        return f"Pong({self.response})"

RawResponse

Bases: Response

A raw response. This class may be used with client.expect to capture an otherwise unsupported response type.

Source code in crystalfontz/response.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class RawResponse(Response):
    """
    A raw response. This class may be used with `client.expect` to capture
    an otherwise unsupported response type.
    """

    def __init__(self: Self, data: bytes) -> None:
        self.code: int = 0xFF
        self.data: bytes = data

    @classmethod
    def from_packet(cls: Type[Self], packet: Packet) -> Self:
        code, data = packet
        res = cls(data)
        res.code = code
        return res

ReportHandler

Bases: ABC

Handle reporting. Reports are issued for key activities and temperature readings.

Source code in crystalfontz/report.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ReportHandler(ABC):
    """
    Handle reporting. Reports are issued for key activities and temperature readings.
    """

    @abstractmethod
    async def on_key_activity(self: Self, report: KeyActivityReport) -> None:
        """
        This method is called on any new key activity report.
        """

        raise NotImplementedError("on_key_activity")

    @abstractmethod
    async def on_temperature(self: Self, report: TemperatureReport) -> None:
        """
        This method is called on any new temperature report.
        """

        raise NotImplementedError("on_temperature")

on_key_activity(report) abstractmethod async

This method is called on any new key activity report.

Source code in crystalfontz/report.py
15
16
17
18
19
20
21
@abstractmethod
async def on_key_activity(self: Self, report: KeyActivityReport) -> None:
    """
    This method is called on any new key activity report.
    """

    raise NotImplementedError("on_key_activity")

on_temperature(report) abstractmethod async

This method is called on any new temperature report.

Source code in crystalfontz/report.py
23
24
25
26
27
28
29
@abstractmethod
async def on_temperature(self: Self, report: TemperatureReport) -> None:
    """
    This method is called on any new temperature report.
    """

    raise NotImplementedError("on_temperature")

Response

Bases: ABC

A response received from the Crystalfontz LCD.

To implement a new response type, subclass this class and implement the init method.

Source code in crystalfontz/response.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Response(ABC):
    """
    A response received from the Crystalfontz LCD.

    To implement a new response type, subclass this class and implement the
    __init__ method.
    """

    @classmethod
    @abstractmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        raise NotImplementedError("from_bytes")

    @classmethod
    def from_packet(cls: Type[Self], packet: Packet) -> "Response":
        code, data = packet
        if code in RESPONSE_CLASSES:
            res_cls = RESPONSE_CLASSES[code]
            try:
                return res_cls.from_bytes(data)
            except Exception as exc:
                raise ResponseDecodeError(res_cls, str(exc)) from exc

        if DeviceError.is_error_code(code):
            raise DeviceError(packet)

        raise UnknownResponseError(packet)

Screensaver

Bases: Effect

A screensaver effect. Prints text at a random position, and moves it around the screen on an interval.

Source code in crystalfontz/effects.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
class Screensaver(Effect):
    """
    A screensaver effect. Prints text at a random position, and moves it around the
    screen on an interval.
    """

    def __init__(
        self: Self,
        client: EffectClient,
        text: str,
        tick: Optional[float] = None,
        timeout: Optional[float] = None,
        retry_times: Optional[int] = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        device = client.device
        buffer = device.character_rom.encode(text)

        if len(buffer) > device.columns:
            raise ValueError(
                f"Text length {len(buffer)} is too long to fit onto the device's "
                f"{device.columns} columns"
            )

        super().__init__(
            client=client,
            tick=tick if tick is not None else 3.0,
            timeout=timeout,
            retry_times=retry_times,
            loop=loop,
        )

        self.text: bytes = buffer

    async def render(self: Self) -> None:
        device = self.client.device

        await self.client.clear_screen(
            timeout=self.timeout, retry_times=self.retry_times
        )

        row = random.randrange(0, device.lines)
        column = random.randrange(0, device.columns - len(self.text))

        await self.client.send_data(
            row, column, self.text, timeout=self.timeout, retry_times=self.retry_times
        )

StatusRead

Bases: Response

A raw status response. This status is parsed based on the device.

Source code in crystalfontz/response.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
@code(0x5E)
class StatusRead(Response):
    """
    A raw status response. This status is parsed based on the device.
    """

    def __init__(self: Self, data: bytes) -> None:
        self.data: bytes = data

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        return cls(data)

    def __str__(self: Self) -> str:
        return f"StatusRead({self.data})"

TemperatureDisplayItem dataclass

A temperature display item, as used in command 21 (0x15): Set Up Live Temperature Display.

Parameters:
  • index (int) –

    The index of the display item.

  • n_digits (Literal[3] | Literal[5]) –

    The number of digits to display.

  • column (int) –

    The zero-indexed column to display the temperature on.

  • row (int) –

    The zero-indexed row to display the temperature on.

  • units (TemperatureUnit) –

    The units to use when displaying the temperature.

Source code in crystalfontz/temperature.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@dataclass
class TemperatureDisplayItem:
    """
    A temperature display item, as used in command 21 (0x15): Set Up Live
    Temperature Display.

    Parameters:
        index (int): The index of the display item.
        n_digits (Literal[3] | Literal[5]): The number of digits to display.
        column (int): The zero-indexed column to display the temperature on.
        row (int): The zero-indexed row to display the temperature on.
        units (TemperatureUnit): The units to use when displaying the temperature.
    """

    index: int
    # TODO: Device specific?
    n_digits: TemperatureDigits
    column: int
    row: int
    units: TemperatureUnit

    @classmethod
    def to_bytes(
        cls: Type[Self], item: Optional[Self], device: DeviceProtocol
    ) -> bytes:
        if item is None:
            return b"\x00"
        # TODO: Validation. The documentation suggests that sensors 32+ are
        # actually for something else - fan speed?
        index: bytes = item.index.to_bytes(1, "big")
        n_digits: bytes = item.n_digits.to_bytes(1, "big")

        if not (0 <= item.column < device.columns):
            raise ValueError(f"Column {item.column} is invalid")

        column: bytes = item.column.to_bytes(1, "big")

        if not (0 <= item.row < device.lines):
            raise ValueError(f"Row {item.row} is invalid")

        row: bytes = item.row.to_bytes(1, "big")
        units: bytes = item.units.value.to_bytes(1, "big")

        return index + n_digits + column + row + units

TemperatureReport

Bases: Response

A temperature sensor report from the Crystalfontz LCD.

Attributes:
  • index (int) –

    The index of the temperature sensor.

  • celsius (float) –

    The temperature in celsius.

  • fahrenheit (float) –

    The temperature in fahrenheit.

Source code in crystalfontz/response.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
@code(0x82)
class TemperatureReport(Response):
    """
    A temperature sensor report from the Crystalfontz LCD.

    Attributes:
        index (int): The index of the temperature sensor.
        celsius (float): The temperature in celsius.
        fahrenheit (float): The temperature in fahrenheit.
    """

    def __init__(self: Self, index: int, celsius: float, fahrenheit: float) -> None:
        self.index: int = index
        self.celsius: float = celsius
        self.fahrenheit: float = fahrenheit

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        assert_len(4, data)

        index: int = data[0]
        value = struct.unpack(">H", data[1:3])[0]
        dow_crc_status = data[3]

        if dow_crc_status == 0:
            raise DecodeError("Bad CRC from temperature sensor")

        celsius: float = value / 16.0
        fahrenheit: float = (9 / 5 * celsius) + 32.0

        return cls(index, celsius, fahrenheit)

    def __str__(self: Self) -> str:
        return (
            f"TemperatureReport({self.index}, celsius={self.celsius}, "
            f"fahrenheit={self.fahrenheit})"
        )

    def __repr__(self: Self) -> str:
        return f"{self.__class__.__name__}\t{self.celsius}\t{self.fahrenheit}"

    def as_dict(self: Self) -> Dict[str, Any]:
        return dict(
            type=self.__class__.__name__,
            celsius=self.celsius,
            fahrenheit=self.fahrenheit,
        )

TemperatureUnit

Bases: Enum

A temperature unit. Either CELSIUS or FAHRENHEIT.

Source code in crystalfontz/temperature.py
12
13
14
15
16
17
18
class TemperatureUnit(Enum):
    """
    A temperature unit. Either CELSIUS or FAHRENHEIT.
    """

    CELSIUS = 0
    FAHRENHEIT = 1

UnknownResponseError

Bases: DecodeError

An error raised when the response code is unrecognized.

Source code in crystalfontz/error.py
62
63
64
65
66
67
68
69
70
71
72
73
74
class UnknownResponseError(DecodeError):
    """
    An error raised when the response code is unrecognized.
    """

    def __init__(self: Self, packet: Tuple[int, bytes]) -> None:
        code, payload = packet

        self.code: int = code
        self.command_code: Optional[int] = code - 0x40 if code < 0x80 else None
        self.payload = payload

        super().__init__(f"Unknown response (0x{code:02X}, {payload})")

UserFlashAreaRead

Bases: Response

Attributes:
  • data (bytes) –

    The data read from the user flash area.

Source code in crystalfontz/response.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@code(0x43)
class UserFlashAreaRead(Response):
    """
    Attributes:
        data (bytes): The data read from the user flash area.
    """

    def __init__(self: Self, data: bytes) -> None:
        self.data: bytes = data

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        return cls(data)

    def __str__(self: Self) -> str:
        return f"UserFlashAreaRead({self.data})"

Versions

Bases: Response

Attributes:
  • model (str) –

    The device model.

  • hardware_rev (str) –

    The device's hardware revision.

  • firmware_rev (str) –

    The device's firmware revision.

Source code in crystalfontz/response.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@code(0x41)
class Versions(Response):
    """
    Attributes:
        model (str): The device model.
        hardware_rev (str): The device's hardware revision.
        firmware_rev (str): The device's firmware revision.
    """

    def __init__(self: Self, model: str, hardware_rev: str, firmware_rev: str) -> None:
        self.model: str = model
        self.hardware_rev: str = hardware_rev
        self.firmware_rev: str = firmware_rev

    @classmethod
    def from_bytes(cls: Type[Self], data: bytes) -> Self:
        decoded = data.decode("ascii")
        model, versions = decoded.split(":")
        hw_rev, fw_rev = versions.split(",")

        return cls(model, hw_rev.strip(), fw_rev.strip())

    def __str__(self: Self) -> str:
        return (
            f"Versions(model={self.model}, hardware_rev={self.hardware_rev}, "
            f"firmware_rev={self.firmware_rev})"
        )

    def as_dict(self: Self) -> Dict[str, Any]:
        return dict(
            model=self.model,
            hardware_rev=self.hardware_rev,
            firmware_rev=self.firmware_rev,
        )

    def __repr__(self: Self) -> str:
        return f"{self.model}: {self.hardware_rev}, {self.firmware_rev}"

connection(port, model='CFA533', hardware_rev=None, firmware_rev=None, device=None, report_handler=None, timeout=DEFAULT_TIMEOUT, retry_times=DEFAULT_RETRY_TIMES, loop=None, baud_rate=SLOW_BAUD_RATE) async

Create a connection to the specified device, with an associated context.

This context will automatically close the connection on exit and wait for the connection to close.

Source code in crystalfontz/client.py
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
@asynccontextmanager
async def connection(
    port: str,
    model: str = "CFA533",
    hardware_rev: Optional[str] = None,
    firmware_rev: Optional[str] = None,
    device: Optional[Device] = None,
    report_handler: Optional[ReportHandler] = None,
    timeout: float = DEFAULT_TIMEOUT,
    retry_times: int = DEFAULT_RETRY_TIMES,
    loop: Optional[asyncio.AbstractEventLoop] = None,
    baud_rate: BaudRate = SLOW_BAUD_RATE,
) -> AsyncGenerator[Client, None]:
    """
    Create a connection to the specified device, with an associated context.

    This context will automatically close the connection on exit and wait for the
    connection to close.
    """

    client = await create_connection(
        port,
        model=model,
        hardware_rev=hardware_rev,
        firmware_rev=firmware_rev,
        device=device,
        report_handler=report_handler,
        timeout=timeout,
        retry_times=retry_times,
        loop=loop,
        baud_rate=baud_rate,
    )

    yield client

    client.close()
    await client.closed

create_connection(port, model='CFA533', hardware_rev=None, firmware_rev=None, device=None, report_handler=None, timeout=DEFAULT_TIMEOUT, retry_times=DEFAULT_RETRY_TIMES, loop=None, baud_rate=SLOW_BAUD_RATE) async

Create a connection to the specified device. Returns a Client object.

To close the connection, call client.close(). The client.closed property is a Future that will resolve when the client is closed (either due to a call to client.close() or an error) and should be awaited.

Source code in crystalfontz/client.py
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
async def create_connection(
    port: str,
    model: str = "CFA533",
    hardware_rev: Optional[str] = None,
    firmware_rev: Optional[str] = None,
    device: Optional[Device] = None,
    report_handler: Optional[ReportHandler] = None,
    timeout: float = DEFAULT_TIMEOUT,
    retry_times: int = DEFAULT_RETRY_TIMES,
    loop: Optional[asyncio.AbstractEventLoop] = None,
    baud_rate: BaudRate = SLOW_BAUD_RATE,
) -> Client:
    """
    Create a connection to the specified device. Returns a Client object.

    To close the connection, call `client.close()`. The `client.closed` property is a
    Future that will resolve when the client is closed (either due to a call to
    `client.close()` or an error) and should be awaited.
    """

    _loop = loop if loop else asyncio.get_running_loop()

    if not device:
        device = lookup_device(model, hardware_rev, firmware_rev)

    if not report_handler:
        report_handler = NoopReportHandler()

    logger.info(f"Connecting to {port} at {baud_rate} baud")

    _, client = await create_serial_connection(
        _loop,
        lambda: Client(
            device=device,
            report_handler=report_handler,
            timeout=timeout,
            retry_times=retry_times,
            loop=_loop,
        ),
        port,
        baudrate=baud_rate,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
    )

    await client._connection_made

    return client