98 lines
2.3 KiB
Python
98 lines
2.3 KiB
Python
import re
|
|
import asyncio
|
|
import telnetlib3
|
|
from dataclasses import dataclass
|
|
from enum import StrEnum
|
|
|
|
PORT = 43680
|
|
RE_STATUS = re.compile(r"OP (.+?)(?:$| = (.+)$)")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ACK:
|
|
command: str
|
|
|
|
@dataclass(frozen=True)
|
|
class ValueUpdate:
|
|
key: str
|
|
value: str
|
|
|
|
class Status(StrEnum):
|
|
standby = "0"
|
|
warmup = "1"
|
|
imaging = "2"
|
|
cooling = "3"
|
|
warning = "4"
|
|
|
|
class LampMode(StrEnum):
|
|
economy = "0"
|
|
standard = "1"
|
|
dimming = "2"
|
|
|
|
class ValueType(StrEnum):
|
|
status = "status"
|
|
picture_mute = "picture.mute"
|
|
lamp_mode = "lamp.mode"
|
|
|
|
|
|
class BarcoRLMW_TCP:
|
|
def __init__(self, projector_ip):
|
|
self.projector_ip = projector_ip
|
|
self.session = None
|
|
|
|
async def init(self):
|
|
self.reader, self.writer = await telnetlib3.open_connection(self.projector_ip, 3023)
|
|
|
|
async def exec(self, cmd):
|
|
await self.writer.write("op " + cmd + "\r")
|
|
|
|
async def set(self, key, val):
|
|
await self.writer.write("op " + key + " = " + val + "\r")
|
|
|
|
async def query(self, key):
|
|
await self.writer.write("op " + key + " ?\r")
|
|
|
|
async def set_power(self, power):
|
|
await self.exec(f"power.{onoff(power)}")
|
|
|
|
async def set_shutter(self, shutter):
|
|
await self.set("picture.mute", int(shutter))
|
|
|
|
async def iter_messages(self):
|
|
"""Async iterator that processes feedback from the projector.
|
|
Yields ACK (acknowledgement) and ValueUpdate (value has changed) messages."""
|
|
while True:
|
|
line = await self.reader.readuntil('\r').decode().strip()
|
|
msg = self.parse_response(line)
|
|
if msg:
|
|
yield msg
|
|
|
|
def parse_response(self, line):
|
|
matches = RE_STATUS.findall(line)
|
|
|
|
if len(matches) == 0:
|
|
return None
|
|
match = matches[0]
|
|
|
|
if len(match) == 2:
|
|
return ACK(match[1])
|
|
if len(match) == 3:
|
|
return ValueUpdate(match[1], match[2])
|
|
|
|
|
|
def onoff(state: bool) -> str:
|
|
if state:
|
|
return "on"
|
|
else:
|
|
return "off"
|
|
|
|
if __name__ == "__main__":
|
|
|
|
async def main():
|
|
barco = BarcoRLMW_TCP("192.168.192.12")
|
|
status = await barco.get_status()
|
|
print(status)
|
|
|
|
await barco.click_remote("kymenu")
|
|
|
|
asyncio.run(main())
|