predavalnice-kontroler/controller/lucke/luckeControl.py

110 lines
4.4 KiB
Python

from http import HTTPStatus
import aiomqtt
import asyncio
import toml
import aiohttp
import os
lucke_bearer_token: str
room: str
url: str
roomId: int|str
async def sendSceneRequest(client, scene):
endpoint = url + f"/rest/v2/fri-fkkt/lecture-halls/{roomId}/scenes/{scene}/activate"
async with aiohttp.request("GET", endpoint, headers={"Authorization": "Bearer " + lucke_bearer_token}) as resp:
if resp.status == 204:
await client.publish(f'{room}/lucke/preset/current', payload=scene, qos=1, retain=True)
else:
print(resp.status, await resp.text())
async def setLight(client, lightNum, intensity: int):
endpoint = url + f"/rest/v2/fri-fkkt/lecture-halls/{roomId}/lights/{lightNum}"
async with aiohttp.request("PUT", endpoint, headers={"Authorization": "Bearer " + lucke_bearer_token}, json={
"stateOn": intensity != 0,
"dimmValue": intensity
}) as resp:
if resp.status == 204:
await client.publish(f'{room}/lucke/brightness/{lightNum}', payload=intensity, qos=1, retain=True)
else:
print("setLight error:", resp.status, await resp.text())
async def saveScene(client, sceneNum):
endpoint = url + f"/rest/v2/fri-fkkt/lecture-halls/{roomId}/scenes/{sceneNum}/save"
async with aiohttp.request("GET", endpoint, headers={"Authorization": "Bearer " + lucke_bearer_token}) as resp:
if resp.status == 204:
pass
else:
print("saveScene error:", resp.status, await resp.text())
async def task_luckeCommand(controlClient):
await controlClient.subscribe(f"{room}/lucke/#")
async for mesg in controlClient.messages:
mesg: aiomqtt.Message
if mesg.topic.matches(f"{room}/lucke/preset/recall"):
sceneNum = mesg.payload.decode()
print("Received: " + str(mesg.topic) + " payload: [" + sceneNum + "]")
await sendSceneRequest(controlClient, sceneNum)
elif mesg.topic.matches(f"{room}/lucke/preset/save"):
sceneNum = mesg.payload.decode()
print("Received: " + str(mesg.topic) + " payload: [" + sceneNum + "]")
await saveScene(controlClient, sceneNum)
elif mesg.topic.matches(f"{room}/lucke/set/+"):
lightNum = mesg.topic.value.rsplit("/", maxsplit=1)[-1]
try:
intensity = int(mesg.payload.decode())
assert 0 <= intensity <= 100
except:
print("Invalid message:", mesg)
else:
await setLight(controlClient, lightNum, intensity)
await asyncio.sleep(0.01)
async def task_luckePoll(client):
"""Polls the API and sends light brightness to the API"""
while True:
endpoint = url + f"/rest/v2/fri-fkkt/lecture-halls/{roomId}/lights/"
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers={"Authorization": f"Bearer {lucke_bearer_token}"}) as response:
if response.status == 200:
lights = await response.json()
for light in lights:
await client.publish(f'{room}/lucke/is_dimmable/{light["id"]}', payload=light["dimmable"], qos=1, retain=True)
# TODO: find a better way to handle non-dimmable lights
if light["stateOn"]:
dimValue = light.get("dimmValue", 100)
else:
dimValue = 0
await client.publish(f'{room}/lucke/brightness/{light["id"]}', payload=dimValue, qos=1, retain=True)
else:
print(f"Failed to fetch lights: {response.status}", await response.text())
await asyncio.sleep(.5)
async def main():
global room, lucke_bearer_token, url, roomId
config_file = os.getenv('MM_CONFIG_PATH', './malinaConfig.toml')
conf = toml.load(config_file)
room = conf['global']['room']
mqttHost = conf['global']['mqttHost']
mqttPort = conf['global']['mqttPort']
url = conf["lucke"]['url']
roomId = conf["lucke"]['roomId']
lucke_bearer_token = conf["lucke"]['bearer_token']
async with aiomqtt.Client(mqttHost, mqttPort) as client:
task_control = asyncio.create_task(task_luckeCommand(client))
task_control = asyncio.create_task(task_luckePoll(client))
await asyncio.gather(task_control)
if __name__ == '__main__':
asyncio.run(main())