import sys
import logging
import argparse
from .discovery import UPnPLoopbackException
from .environment import Environment
from .config import WemoConfiguration
from .utils import matcher
reqlog = logging.getLogger("requests.packages.urllib3.connectionpool")
reqlog.disabled = True
NOOP = lambda *x: None
def _state(device, readable=False):
state = device.get_state(force_update=True)
if readable:
return "on" if state else "off"
else:
return state
[docs]def scan(args, on_switch=NOOP, on_motion=NOOP, on_bridge=NOOP, on_maker=NOOP):
try:
env = Environment(on_switch, on_motion, on_bridge, on_maker,
with_subscribers=False, bind=args.bind)
env.start()
env.discover(args.timeout)
except KeyboardInterrupt:
sys.exit(0)
except UPnPLoopbackException:
print("""
Loopback interface is being used! You will probably not receive any responses
from devices. Use ifconfig to find your IP address, then either pass the
--bind argument or edit ~/.wemo/config.yml to specify the IP to which devices
should call back during discovery.""".strip())
sys.exit(1)
[docs]def switch(args):
if args.state.lower() in ("on", "1", "true"):
state = "on"
elif args.state.lower() in ("off", "0", "false"):
state = "off"
elif args.state.lower() == "toggle":
state = "toggle"
elif args.state.lower() == "status":
state = "status"
else:
print("""No valid action specified.
Usage: wemo switch NAME (on|off|toggle|status)""")
sys.exit(1)
matches = make_matcher(args.device)
def on_switch(switch):
if matches(switch.name):
if state == "toggle":
found_state = switch.get_state(force_update=True)
switch.set_state(not found_state)
elif state == "status":
print(_state(switch, args.human_readable))
else:
getattr(switch, state)()
if args.device.lower() != 'all':
sys.exit(0)
scan(args, on_switch)
if args.device != 'all':
# If we got here, we didn't find anything
print("No device found with that name.")
sys.exit(1)
[docs]def light(args):
if args.state.lower() in ("on", "1", "true"):
state = "on"
elif args.state.lower() in ("off", "0", "false"):
state = "off"
elif args.state.lower() == "toggle":
state = "toggle"
elif args.state.lower() == "status":
state = "status"
else:
print("""No valid action specified.
Usage: wemo light NAME (on|off|toggle|status)""")
sys.exit(1)
matches = make_matcher(args.name)
def on_switch(switch):
pass
def on_motion(motion):
pass
def on_bridge(bridge):
bridge.bridge_get_lights()
bridge.bridge_get_groups()
for light in bridge.Lights:
if matches(light):
if args.state == "toggle":
found_state = bridge.light_get_state(bridge.Lights[light]).get('state')
bridge.light_set_state(bridge.Lights[light], state=not found_state)
elif args.state == "status":
print(bridge.light_get_state(bridge.Lights[light]))
else:
if args.state == "on":
if args.dim is not None:
if args.dim <= 255 and args.dim >= 0:
dim = args.dim
state = None
else:
print("""Invalid dim specified.
Dim must be between 0 and 255""")
sys.exit(1)
else:
dim = None
state = 1
else:
dim = None
state = 0
bridge.light_set_state(bridge.Lights[light], state=state, dim=dim)
if args.name != 'all':
sys.exit(0)
for group in bridge.Groups:
if matches(group):
if args.state == "toggle":
found_state = bridge.group_get_state(bridge.Groups[group]).get('state')
bridge.group_set_state(bridge.Groups[group], state=not found_state)
elif args.state == "status":
print(bridge.group_get_state(bridge.Groups[group]))
else:
if args.dim == None and args.state == "on":
dim = bridge.group_get_state(bridge.Groups[group]).get('dim')
state = 1
elif args.state == "off":
dim = None
state = 0
elif args.dim <= 255 and args.dim >= 0:
dim = args.dim
state = 1
else:
print("""Invalid dim specified.
Dim must be between 0 and 255""")
sys.exit(1)
bridge.group_set_state(bridge.Groups[group], state=state, dim=dim)
sys.exit(0)
scan(args, on_switch, on_motion, on_bridge)
if args.name != 'all':
# If we got here, we didn't find anything
print("No device or group found with that name.")
sys.exit(1)
[docs]def make_matcher(device_name):
alias = WemoConfiguration().aliases.get(device_name)
if device_name.lower() in ('all', 'any'):
matches = lambda x: True
elif alias:
matches = lambda x: x == alias
elif device_name:
matches = matcher(device_name)
else:
matches = NOOP
return matches
[docs]def maker(args):
if args.state.lower() in ("on", "1", "true"):
state = "on"
elif args.state.lower() in ("off", "0", "false"):
state = "off"
elif args.state.lower() == "toggle":
state = "toggle"
elif args.state.lower() == "sensor":
state = "sensor"
elif args.state.lower() == "switch":
state = "switch"
else:
print("""No valid action specified.
Usage: wemo maker NAME (on|off|toggle|sensor|switch)""")
sys.exit(1)
matches = make_matcher(args.device)
def on_switch(maker):
return
def on_motion(maker):
return
def on_bridge(maker):
return
def on_maker(maker):
if matches(maker.name):
if state == "toggle":
found_state = maker.get_state(force_update=True)
maker.set_state(not found_state)
elif state == "sensor":
if maker.has_sensor:
if args.human_readable:
if maker.sensor_state:
sensorstate = 'Sensor not triggered'
else:
sensorstate = 'Sensor triggered'
print(sensorstate)
else:
print(maker.sensor_state)
else:
print("Sensor not present")
elif state == "switch":
if maker.switch_mode:
print("Momentary Switch")
else:
print(_state(maker, args.human_readable))
else:
getattr(maker, state)()
if args.device != 'all':
sys.exit(0)
scan(args, on_switch, on_motion, on_bridge, on_maker)
if args.device != 'all':
# If we got here, we didn't find anything
print("No device found with that name.")
sys.exit(1)
[docs]def list_(args):
def on_switch(switch):
print("Switch:", switch.name)
def on_motion(motion):
print("Motion:", motion.name)
def on_maker(maker):
print("Maker:", maker.name)
def on_bridge(bridge):
print("Bridge:", bridge.name)
bridge.bridge_get_lights()
bridge.bridge_get_groups()
for group in bridge.Groups:
print("Group:", group)
for light in bridge.Lights:
print("Light:", light)
scan(args, on_switch, on_motion, on_bridge, on_maker)
[docs]def status(args):
def on_switch(switch):
print("Switch:", switch.name, '\t', _state(switch, args.human_readable))
def on_motion(motion):
print("Motion:", motion.name, '\t', _state(motion, args.human_readable))
def on_maker(maker):
if maker.switch_mode:
print("Maker:", maker.name, '\t', "Momentary State:", _state(maker, args.human_readable))
else:
print("Maker:", maker.name, '\t', "Persistent State:", _state(maker, args.human_readable))
if maker.has_sensor:
if args.human_readable:
if maker.sensor_state:
sensorstate = 'Sensor not triggered'
else:
sensorstate = 'Sensor triggered'
print('\t\t\t', "Sensor:", sensorstate)
else:
print('\t\t\t', "Sensor:", maker.sensor_state)
else:
print('\t\t\t' "Sensor not present")
def on_bridge(bridge):
print("Bridge:", bridge.name, '\t', _state(bridge, args.human_readable))
bridge.bridge_get_lights()
for light in bridge.Lights:
print("Light:", light, '\t', bridge.light_get_state(bridge.Lights[light]))
for group in bridge.Groups:
print("Group:", group, '\t', bridge.group_get_state(bridge.Groups[group]))
scan(args, on_switch, on_motion, on_bridge, on_maker)
[docs]def server(args):
try:
from socketio.server import SocketIOServer
from ouimeaux.server import app, initialize
except ImportError:
print("ouimeaux server dependencies are not installed. Please run, e.g., 'pip install ouimeaux[server]'")
sys.exit(1)
initialize(bind=getattr(args, 'bind', None), auth=(WemoConfiguration().auth or None))
level = logging.INFO
if getattr(args, 'debug', False):
level = logging.DEBUG
logging.basicConfig(level=level)
try:
# TODO: Move this to configuration
listen = WemoConfiguration().listen or '0.0.0.0:5000'
try:
host, port = listen.split(':')
except Exception:
print("Invalid bind address configuration:", listen)
sys.exit(1)
SocketIOServer((host, int(port)), app,
policy_server=False,
namespace="socket.io").serve_forever()
except (KeyboardInterrupt, SystemExit):
sys.exit(0)
[docs]def wemo():
import ouimeaux.utils
ouimeaux.utils._RETRIES = 0
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bind", default=None,
help="ip:port to which to bind the response server."
" Default is localhost:54321")
parser.add_argument("-d", "--debug", action="store_true", default=False,
help="Enable debug logging")
parser.add_argument("-e", "--exact-match", action="store_true",
default=False,
help="Disable fuzzy matching for device names")
parser.add_argument("-v", "--human-readable", dest="human_readable",
action="store_true", default=False,
help="Print statuses as human-readable words")
parser.add_argument("-t", "--timeout", type=int, default=5,
help="Time in seconds to allow for discovery")
subparsers = parser.add_subparsers()
statusparser = subparsers.add_parser("status",
help="Print status of WeMo devices")
statusparser.set_defaults(func=status)
stateparser = subparsers.add_parser("switch",
help="Turn a WeMo Switch on or off")
stateparser.add_argument("device", help="Name or alias of the device")
stateparser.add_argument("state", help="'on' or 'off'")
stateparser.set_defaults(func=switch)
makerparser = subparsers.add_parser("maker",
help="Get sensor or switch state of a Maker or Turn on or off")
makerparser.add_argument("device", help="Name or alias of the device")
makerparser.add_argument("state", help="'on' or 'off' or 'toggle' or 'sensor' or 'switch'")
makerparser.set_defaults(func=maker)
stateparser = subparsers.add_parser("light",
help="Turn a WeMo LED light on or off")
stateparser.add_argument("name", help="Name or alias of the device or group")
stateparser.add_argument("state", help="'on' or 'off'")
stateparser.add_argument("dim", nargs='?', type=int,
help="Dim value 0 to 255")
stateparser.set_defaults(func=light)
listparser = subparsers.add_parser("list",
help="List all devices found in the environment")
listparser.set_defaults(func=list_)
serverparser = subparsers.add_parser("server",
help="Run the API server and web app")
serverparser.set_defaults(func=server)
args = parser.parse_args()
if getattr(args, 'debug', False):
logging.basicConfig(level=logging.DEBUG)
args.func(args)