My local repo of qmk firmware.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
qmk_firmware/lib/python/qmk/cli/xap/xap_client.py

228 lines
6.0 KiB

3 years ago
"""Dummy XAP Client
"""
import json
import random
import gzip
import threading
import functools
from struct import Struct, pack, unpack
from collections import namedtuple
from enum import IntFlag, IntEnum
from platform import platform
RequestPacket = namedtuple('RequestPacket', 'token length data')
RequestStruct = Struct('<HB61s')
ResponsePacket = namedtuple('ResponsePacket', 'token flags length data')
ResponseStruct = Struct('<HBB60s')
def _gen_token():
"""Generate XAP token - cannot start with 00xx or 'reserved' (FFFE|FFFF)
3 years ago
"""
token = random.randrange(0x0100, 0xFFFD)
3 years ago
# swap endianness
return unpack('<H', pack('>H', token))[0]
def _u32toBCD(val): # noqa: N802
"""Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'
class XAPSecureStatus(IntEnum):
LOCKED = 0x00
UNLOCKING = 0x01
UNLOCKED = 0x02
class XAPFlags(IntFlag):
FAILURE = 0
SUCCESS = 1 << 0
SECURE_FAILURE = 1 << 1
class XAPEventType(IntEnum):
SECURE = 0x01
KEYBOARD = 0x02
USER = 0x03
class XAPRouteError(Exception):
pass
3 years ago
class XAPDevice:
def __init__(self, dev):
"""Constructor opens hid device and starts dependent services
"""
self.responses = {}
self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start()
def _read_loop(self):
"""Background thread to signal waiting transactions
"""
while 1:
array_alpha = self.dev.read(ResponseStruct.size, 100)
3 years ago
if array_alpha:
token = int.from_bytes(array_alpha[:2], 'little')
event = self.responses.get(token)
if event:
event._ret = array_alpha
event.set()
def _query_device_info(self):
datalen = int.from_bytes(self.transaction(b'\x01\x05') or bytes(0), 'little')
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(b'\x01\x06', offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def listen(self):
"""Receive a 'broadcast' message
"""
token = 0xFFFF
event = threading.Event()
self.responses[token] = event
while not hasattr(event, '_ret'):
event.wait(timeout=0.25)
3 years ago
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
return (r.flags, r.data[:r.length])
def _transaction(self, *args):
3 years ago
"""Request/Receive
"""
# convert args to array of bytes
data = bytes()
for arg in args:
if isinstance(arg, (bytes, bytearray)):
data += arg
if isinstance(arg, int): # TODO: remove terrible assumption of u16
data += arg.to_bytes(2, byteorder='little')
token = _gen_token()
p = RequestPacket(token, len(data), data)
buffer = RequestStruct.pack(*list(p))
event = threading.Event()
self.responses[token] = event
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b'\x00' + buffer
self.dev.write(buffer)
event.wait(timeout=1)
self.responses.pop(token, None)
if not hasattr(event, '_ret'):
return None
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
if r.flags & XAPFlags.SUCCESS == 0:
3 years ago
return None
return r.data[:r.length]
@functools.lru_cache
def capability(self, route):
cap = int.from_bytes(self._transaction(route) or bytes(0), 'little')
return cap
@functools.lru_cache
def subsystem(self):
sub = int.from_bytes(self._transaction(b'\x00\x02') or bytes(0), 'little')
return sub
@functools.lru_cache
3 years ago
def version(self):
ver = int.from_bytes(self._transaction(b'\x00\x00') or bytes(0), 'little')
3 years ago
return {'xap': _u32toBCD(ver)}
def _ensure_route(self, route):
(sub, rt) = route
cap = bytes([sub, 1])
if self.subsystem() & (1 << sub) == 0:
raise XAPRouteError("subsystem not available")
if self.capability(cap) & (1 << rt) == 0:
raise XAPRouteError("route not available")
def transaction(self, route, *args):
self._ensure_route(route)
return self._transaction(route, *args)
@functools.lru_cache
3 years ago
def info(self):
data = self._query_device_info()
data['_id'] = self.transaction(b'\x01\x08')
data['xap'] = self.version()['xap']
return data
def status(self):
lock = int.from_bytes(self.transaction(b'\x00\x03') or bytes(0), 'little')
data = {}
data['lock'] = XAPSecureStatus(lock).name
return data
3 years ago
def unlock(self):
self.transaction(b'\x00\x04')
def lock(self):
self.transaction(b'\x00\x05')
def reset(self):
status = int.from_bytes(self.transaction(b'\x01\x07') or bytes(0), 'little')
return status == 1
3 years ago
class XAPClient:
@staticmethod
def _lazy_imports():
# Lazy load to avoid missing dependency issues
global hid
import hid
@staticmethod
def list(search=None):
"""Find compatible XAP devices
"""
XAPClient._lazy_imports()
def _is_xap_usage(x):
return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058
def _is_filtered_device(x):
name = '%04x:%04x' % (x['vendor_id'], x['product_id'])
return name.lower().startswith(search.lower())
devices = filter(_is_xap_usage, hid.enumerate())
if search:
devices = filter(_is_filtered_device, devices)
return list(devices)
def connect(self, dev):
"""Connect to a given XAP device
"""
XAPClient._lazy_imports()
return XAPDevice(dev)