2023-08-01 19:17:05 +02:00
|
|
|
#!./.venv/bin/python
|
2023-07-19 10:20:07 +02:00
|
|
|
import zigate
|
|
|
|
from zigate import dispatcher
|
|
|
|
import time
|
|
|
|
import os
|
|
|
|
from datetime import datetime
|
|
|
|
from settings import MAGNET_ATTRIBUTE, MAGNET_ADDR, MAGNET_ATTRIBUTE_OPENED_VALUE, OPENING_TRIGGER_DELAY, LOL_OPENED_CMD, LOL_CLOSED_CMD, CLOSING_TRIGGER_DELAY
|
|
|
|
|
|
|
|
z = zigate.connect(port=None)
|
|
|
|
z.get_devices_list()
|
|
|
|
|
|
|
|
is_closing = False
|
|
|
|
is_opening = False
|
|
|
|
|
|
|
|
def trigger_opening():
|
|
|
|
global is_opening
|
|
|
|
global is_closing
|
|
|
|
|
|
|
|
is_closing = False
|
|
|
|
|
|
|
|
# An opening is already in process
|
|
|
|
if is_opening or is_closing:
|
|
|
|
return
|
|
|
|
|
|
|
|
is_opening = True
|
|
|
|
for i in range(0, OPENING_TRIGGER_DELAY):
|
|
|
|
z.set_led(on=False)
|
|
|
|
time.sleep(0.25)
|
|
|
|
z.set_led(on=True)
|
|
|
|
time.sleep(0.75)
|
|
|
|
|
|
|
|
# Check if we still opening room
|
|
|
|
if not is_opening:
|
|
|
|
return
|
|
|
|
|
|
|
|
is_opening = False
|
|
|
|
|
|
|
|
os.system(LOL_OPENED_CMD)
|
|
|
|
print(datetime.now().isoformat()+" LOL opened")
|
|
|
|
|
|
|
|
def trigger_closing():
|
|
|
|
global is_opening
|
|
|
|
global is_closing
|
|
|
|
|
2023-08-01 19:17:05 +02:00
|
|
|
if is_closing:
|
2023-07-19 10:20:07 +02:00
|
|
|
return
|
|
|
|
|
|
|
|
is_closing = True
|
2023-08-01 19:17:05 +02:00
|
|
|
is_opening = False
|
2023-07-19 10:20:07 +02:00
|
|
|
|
|
|
|
for i in range(0, CLOSING_TRIGGER_DELAY):
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
# Check if we still opening room
|
|
|
|
if not is_closing:
|
|
|
|
return
|
|
|
|
|
|
|
|
is_closing = False
|
|
|
|
is_opening = False
|
|
|
|
z.set_led(on=True)
|
|
|
|
|
|
|
|
os.system(LOL_CLOSED_CMD)
|
|
|
|
print(datetime.now().isoformat()+" LOL closed")
|
|
|
|
|
|
|
|
def on_zigbee_attribute_updated(sender, signal, attribute, **kwargs):
|
|
|
|
if attribute["addr"] == MAGNET_ADDR and attribute["name"] == MAGNET_ATTRIBUTE:
|
|
|
|
if attribute["value"] == MAGNET_ATTRIBUTE_OPENED_VALUE:
|
|
|
|
trigger_opening()
|
|
|
|
else:
|
|
|
|
trigger_closing()
|
|
|
|
|
|
|
|
dispatcher.connect(on_zigbee_attribute_updated, zigate.ZIGATE_ATTRIBUTE_UPDATED)
|
|
|
|
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
print('Interrupted by user')
|
|
|
|
|
|
|
|
z.save_state()
|
|
|
|
z.close()
|