from http import HTTPStatus import aiomqtt import asyncio import toml import aiohttp import os lucke_bearer_token: str room: str url: str roomId: int|str async def sendSceneRequest(client, scene): endpoint = url + f"/rest/v2/fri-fkkt/lecture-halls/{roomId}/scenes/{scene}/activate" async with aiohttp.request("GET", endpoint, headers={"Authorization": "Bearer " + lucke_bearer_token}) as resp: if resp.status == 204: await client.publish(f'{room}/lucke/preset/current', payload=scene, qos=1, retain=True) else: print(resp.status, await resp.text()) async def setLight(client, lightNum, intensity: int): endpoint = url + f"/rest/v2/fri-fkkt/lecture-halls/{roomId}/lights/{lightNum}" async with aiohttp.request("PUT", endpoint, headers={"Authorization": "Bearer " + lucke_bearer_token}, json={ "stateOn": intensity != 0, "dimmValue": intensity }) as resp: if resp.status == 204: await client.publish(f'{room}/lucke/brightness/{lightNum}', payload=intensity, qos=1, retain=True) else: print("setLight error:", resp.status, await resp.text()) async def task_luckeCommand(controlClient): await controlClient.subscribe(f"{room}/lucke/#") async for mesg in controlClient.messages: mesg: aiomqtt.Message if mesg.topic.matches(f"{room}/lucke/preset/recall"): sceneNum = mesg.payload.decode() print("Received: " + str(mesg.topic) + " payload: [" + sceneNum + "]") await sendSceneRequest(controlClient, sceneNum) elif mesg.topic.matches(f"{room}/lucke/set/+"): lightNum = mesg.topic.value.rsplit("/", maxsplit=1)[-1] try: intensity = int(mesg.payload.decode()) assert 0 <= intensity <= 100 except: print("Invalid message:", mesg) else: await setLight(controlClient, lightNum, intensity) await asyncio.sleep(0.01) async def task_luckePoll(client): """Polls the API and sends light brightness to the API""" while True: endpoint = url + f"/rest/v2/fri-fkkt/lecture-halls/{roomId}/lights/" async with aiohttp.ClientSession() as session: async with session.get(endpoint, headers={"Authorization": f"Bearer {lucke_bearer_token}"}) as response: if response.status == 200: lights = await response.json() for light in lights: await client.publish(f'{room}/lucke/is_dimmable/{light["id"]}', payload=light["dimmable"], qos=1, retain=True) # TODO: find a better way to handle non-dimmable lights if light["stateOn"]: dimValue = light.get("dimmValue", 100) else: dimValue = 0 await client.publish(f'{room}/lucke/brightness/{light["id"]}', payload=dimValue, qos=1, retain=True) else: print(f"Failed to fetch lights: {response.status}", await response.text()) await asyncio.sleep(.5) async def main(): global room, lucke_bearer_token, url, roomId config_file = os.getenv('MM_CONFIG_PATH', './malinaConfig.toml') conf = toml.load(config_file) room = conf['global']['room'] mqttHost = conf['global']['mqttHost'] mqttPort = conf['global']['mqttPort'] url = conf["lucke"]['url'] roomId = conf["lucke"]['roomId'] lucke_bearer_token = conf["lucke"]['bearer_token'] async with aiomqtt.Client(mqttHost, mqttPort) as client: task_control = asyncio.create_task(task_luckeCommand(client)) task_control = asyncio.create_task(task_luckePoll(client)) await asyncio.gather(task_control) if __name__ == '__main__': asyncio.run(main())