import re import asyncio import telnetlib3 from dataclasses import dataclass from enum import StrEnum PORT = 43680 RE_STATUS = re.compile(r"OP (.+?)(?:$| = (.+)$)") @dataclass(frozen=True) class ACK: command: str @dataclass(frozen=True) class ValueUpdate: key: str value: str class Status(StrEnum): standby = "0" warmup = "1" imaging = "2" cooling = "3" warning = "4" class LampMode(StrEnum): economy = "0" standard = "1" dimming = "2" class ValueType(StrEnum): status = "status" picture_mute = "picture.mute" lamp_mode = "lamp.mode" class BarcoRLMW_TCP: def __init__(self, projector_ip): self.projector_ip = projector_ip self.session = None async def init(self): self.reader, self.writer = await telnetlib3.open_connection(self.projector_ip, 3023) async def exec(self, cmd): await self.writer.write("op " + cmd + "\r") async def set(self, key, val): await self.writer.write("op " + key + " = " + val + "\r") async def query(self, key): await self.writer.write("op " + key + " ?\r") async def set_power(self, power): await self.exec(f"power.{onoff(power)}") async def set_shutter(self, shutter): await self.set("picture.mute", int(shutter)) async def iter_messages(self): """Async iterator that processes feedback from the projector. Yields ACK (acknowledgement) and ValueUpdate (value has changed) messages.""" while True: line = await self.reader.readuntil('\r').decode().strip() msg = self.parse_response(line) if msg: yield msg def parse_response(self, line): matches = RE_STATUS.findall(line) if len(matches) == 0: return None match = matches[0] if len(match) == 2: return ACK(match[1]) if len(match) == 3: return ValueUpdate(match[1], match[2]) def onoff(state: bool) -> str: if state: return "on" else: return "off" if __name__ == "__main__": async def main(): barco = BarcoRLMW_TCP("192.168.192.12") status = await barco.get_status() print(status) await barco.click_remote("kymenu") asyncio.run(main())