predavalnice-kontroler/controller/projector_motors/projector_motors.py

147 lines
4.4 KiB
Python

MOCK = False
import aiomqtt
import asyncio
from smbus2 import SMBus
import toml
import os
room: str
use_offset = False
""" 0 1 2 3 """
relayMasks = [0b0001, 0b0010, 0b0100, 0b1000] #probably ne rabim
bus = SMBus(1)
i2c_map = {
'main': -1,
'side': -1,
}
relMapping = {
'service_down': 0,
'service_up': 1,
'down': 2,
'up': 3
}
currentState = {
'main': 0b0000,
'side': 0b0000
}
async def msgRelayBoard(projSelect, command, state: bool):
# Select the correct relay board
i2c_addr = i2c_map[projSelect]
# Get the relay position for the given command
maskShift = relMapping[command]
# Set the corresponding bit
mask = (1 << maskShift)
if state:
currentState[projSelect] = currentState[projSelect] | mask
else:
currentState[projSelect] = currentState[projSelect] & ~mask
# Write it to the I2C bus (0x10 is the register for relay states)
bus.write_byte_data(i2c_addr, 0x10, currentState[projSelect])
print(projSelect, "{:04b}".format(currentState[projSelect]))
"""
SrvDwn SrvUp OpDwn OpUp
MAIN: 0x42 0001 0010 0100 1000
SIDE: 0x43 0001 0010 0100 1000
"""
#old board
"""
MAIN: SrvDwn SrvUp OpDwn OpUp
0 1 2 3
SIDE: 4 5 6 7
"""
#dej like bolš to podukumentiraj or smth
async def task_command2relays(controlClient: aiomqtt.Client):
"""Read commands from MQTT and send them to the relays"""
await controlClient.subscribe(f"{room}/projectors/+/lift/#")
async for mesg in controlClient.messages:
msgTopicArr = mesg.topic.value.split('/')
value = mesg.payload.decode()
if mesg.topic.matches(f'{room}/projectors/+/lift/manual/+'):
command = msgTopicArr[-1]
projSel = msgTopicArr[-4]
if projSel not in ("main", "side") or command not in relMapping.keys() or value not in ("0", "1"):
print("Invalid manual command:", projSel, command, value)
continue
await msgRelayBoard(projSel, command, value == '1')
# Service move
if "service" in command:
# Manual control makes the position unknown, so we clear it
if value == "1":
status = "MOVING"
else:
status = "STOPPED"
await controlClient.publish(f'{room}/projectors/{projSel}/lift/status', payload=status, qos=1, retain=True)
# Normal move
else:
# HACK: if the press is too short it doesn't register, so we sleep for a bit
if value == "1":
await asyncio.sleep(.2)
#print("Pushing \'off\' to other relays to prevent conflicts")
elif mesg.topic.matches(f'{room}/projectors/+/lift/goto'):
projSel = msgTopicArr[-3]
if projSel not in ("main", "side") or value not in ("UP", "DOWN"):
print("Invalid goto command:", projSel, value)
continue
# Clear manual control
await msgRelayBoard(projSel, "service_down", False)
await msgRelayBoard(projSel, "service_up", False)
if value == "UP":
other = "down"
elif value == "DOWN":
other = "up"
await msgRelayBoard(projSel, other, False)
# Click the buttom for a bit and release it, then publish that the lift has moved
await msgRelayBoard(projSel, value.lower(), True)
await asyncio.sleep(1)
await msgRelayBoard(projSel, value.lower(), False)
await controlClient.publish(f'{room}/projectors/{projSel}/lift/status', payload=value, qos=1, retain=True)
await asyncio.sleep(0.01)
async def main():
global i2c_map, room
config_file = os.getenv('MM_CONFIG_PATH', './malinaConfig.toml')
conf = toml.load(config_file)
room = conf['global']['room']
mqttHost = conf['global']['mqttHost']
mqttPort = conf['global']['mqttPort']
projMotors = conf["projector_motors"]
i2c_map['main'] = projMotors["main"]['i2c_address']
i2c_map['side'] = projMotors["side"]['i2c_address']
async with aiomqtt.Client(mqttHost, mqttPort) as client:
task_control = asyncio.create_task(task_command2relays(client))
await asyncio.gather(task_control)
if __name__ == '__main__':
asyncio.run(main())