#import gpiozero.pins.mock #from gpiozero import * #from grove.factory import Factory #from grove.grove_relay import GroveRelay import aiomqtt import asyncio #from i2cpy import I2C from smbus2 import SMBus #import i2cpy #i2cset -y 1 0x11 0x11 0x42 #set i2c address from 0x11 to 0x42 # ONLY FOR TESTING ON NON RPI #Device.pin_factory = gpiozero.pins.mock.MockFactory() #relays = [LED(17), LED(27), LED(22), LED(23), LED(5), LED(6), LED(24), LED(25)] relayBoardMain = 0x42 relayBoardSide = 0x43 """ 0 1 2 3 """ relayMasks = [0b0001, 0b0010, 0b0100, 0b1000] #probably ne rabim bus = SMBus(1) relMapping = { 'service_down': 0, 'service_up': 1, 'down': 2, 'up': 3 } currentState = { 'glavni': 0b0000, 'stranski': 0b0000 } async def msgRelayBoard(projSelect, command, state: bool): #i2cAddr = relayBoardMain if projSelect == 'glavni' else relayBoardSide #TODO this is not optimal, check for more crap # register 0x10 za releje I2CAddr = 0x42 #glavni if projSelect == 'stranski': I2CAddr += 0x1 maskShift = relMapping[command] mask = (1 << maskShift) if state: currentState[projSelect] = currentState[projSelect] | mask else: currentState[projSelect] = currentState[projSelect] & ~mask bus.write_byte_data(I2CAddr, 0x10, currentState[projSelect]) print('testirovano jako') """ SrvDwn SrvUp OpDwn OpUp MAIN: 0x42 0001 0010 0100 1000 SIDE: 0x43 0001 0010 0100 1000 """ #old board """ MAIN: SrvDwn SrvUp OpDwn OpUp 0 1 2 3 SIDE: 4 5 6 7 """ #dej like bolsˌ to podukumentiraj or smth async def task_command2relays(controlClient: aiomqtt.Client): #relayCtrl = lambda cmd, relay: relays[relay].on() if cmd == 1 else relays[relay].off() #relayCtrl = lambda cmd, relay: [relays[r].on() if cmd == 1 and r == relay else relays[r].off for r in range(len(relays))] relayCtrl = lambda x, y: print(x, y) await controlClient.subscribe("p01/projectors/+/lift/#") msgs = controlClient.messages async for mesg in msgs: mesg: aiomqtt.Message if mesg.topic.matches('p01/projectors/+/lift/move/#'): msgTopicArr = mesg.topic.value.split('/') state = mesg.payload.decode() print("Received: " + str(msgTopicArr) + " payload: [" + state + "]") #testCase = (msgTopicArr[2], msgTopicArr[4]) projSel = msgTopicArr[2] if projSel != 'glavni' and projSel != 'stranski': continue command = msgTopicArr[5] await msgRelayBoard(projSel, command, state == '1') #print("Pushing \'off\' to other relays to prevent conflicts") await asyncio.sleep(0.01) async def main(): async with aiomqtt.Client('localhost', 1883) as client: task_control = asyncio.create_task(task_command2relays(client)) await asyncio.gather(task_control) if __name__ == '__main__': asyncio.run(main())