80 lines
1.8 KiB
Python
80 lines
1.8 KiB
Python
|
#!/bin/env python
|
||
|
import zigate
|
||
|
from zigate import dispatcher
|
||
|
import time
|
||
|
import os
|
||
|
from datetime import datetime
|
||
|
from settings import MAGNET_ATTRIBUTE, MAGNET_ADDR, MAGNET_ATTRIBUTE_OPENED_VALUE, OPENING_TRIGGER_DELAY, LOL_OPENED_CMD, LOL_CLOSED_CMD, CLOSING_TRIGGER_DELAY
|
||
|
|
||
|
z = zigate.connect(port=None)
|
||
|
z.get_devices_list()
|
||
|
|
||
|
is_closing = False
|
||
|
is_opening = False
|
||
|
|
||
|
def trigger_opening():
|
||
|
global is_opening
|
||
|
global is_closing
|
||
|
|
||
|
is_closing = False
|
||
|
|
||
|
# An opening is already in process
|
||
|
if is_opening or is_closing:
|
||
|
return
|
||
|
|
||
|
is_opening = True
|
||
|
for i in range(0, OPENING_TRIGGER_DELAY):
|
||
|
z.set_led(on=False)
|
||
|
time.sleep(0.25)
|
||
|
z.set_led(on=True)
|
||
|
time.sleep(0.75)
|
||
|
|
||
|
# Check if we still opening room
|
||
|
if not is_opening:
|
||
|
return
|
||
|
|
||
|
is_opening = False
|
||
|
|
||
|
os.system(LOL_OPENED_CMD)
|
||
|
print(datetime.now().isoformat()+" LOL opened")
|
||
|
|
||
|
def trigger_closing():
|
||
|
global is_opening
|
||
|
global is_closing
|
||
|
|
||
|
if is_opening:
|
||
|
return
|
||
|
|
||
|
is_closing = True
|
||
|
|
||
|
for i in range(0, CLOSING_TRIGGER_DELAY):
|
||
|
time.sleep(1)
|
||
|
|
||
|
# Check if we still opening room
|
||
|
if not is_closing:
|
||
|
return
|
||
|
|
||
|
is_closing = False
|
||
|
is_opening = False
|
||
|
z.set_led(on=True)
|
||
|
|
||
|
os.system(LOL_CLOSED_CMD)
|
||
|
print(datetime.now().isoformat()+" LOL closed")
|
||
|
|
||
|
def on_zigbee_attribute_updated(sender, signal, attribute, **kwargs):
|
||
|
if attribute["addr"] == MAGNET_ADDR and attribute["name"] == MAGNET_ATTRIBUTE:
|
||
|
if attribute["value"] == MAGNET_ATTRIBUTE_OPENED_VALUE:
|
||
|
trigger_opening()
|
||
|
else:
|
||
|
trigger_closing()
|
||
|
|
||
|
dispatcher.connect(on_zigbee_attribute_updated, zigate.ZIGATE_ATTRIBUTE_UPDATED)
|
||
|
|
||
|
try:
|
||
|
while True:
|
||
|
time.sleep(1)
|
||
|
except KeyboardInterrupt:
|
||
|
print('Interrupted by user')
|
||
|
|
||
|
z.save_state()
|
||
|
z.close()
|