Source code for ouimeaux.discovery

import logging

import gevent
from gevent import socket
from gevent.server import DatagramServer

from ouimeaux.utils import get_ip_address
from ouimeaux.pysignals import receiver
from ouimeaux.signals import discovered


log = logging.getLogger(__name__)


[docs]class UPnPLoopbackException(Exception): """ Using loopback interface as callback IP. """
[docs]class UPnP(object): """ Makes M-SEARCH requests, filters out non-WeMo responses, and dispatches signals with the results. """ def __init__(self, mcast_ip='239.255.255.250', mcast_port=1900, bind=None): if bind is None: host = get_ip_address() if host.startswith('127.'): raise UPnPLoopbackException("Using %s as a callback IP for " "discovery will not be successful.") port = 54321 bind = '{0}:{1}'.format(host, port) self.bind = bind self.mcast_ip = mcast_ip self.mcast_port = mcast_port self.clients = {} def _response_received(self, message, address): log.debug("Received a response from {0}:{1}".format(*address)) lines = [x.decode() for x in message.splitlines()] lines.pop(0) # HTTP status headers = {} for line in lines: try: header, value = line.split(":", 1) headers[header.lower()] = value.strip() except ValueError: continue if (headers.get('x-user-agent', None) == 'redsonic'): location=headers.get('location',None) if location is not None and location not in self.clients: log.debug("Found WeMo at {0}".format(location)) self.clients[location] = headers gevent.spawn(discovered.send, self, address=address, headers=headers) @property def server(self): """ UDP server to listen for responses. """ server = getattr(self, "_server", None) if server is None: log.debug("Binding datagram server to %s", self.bind) server = DatagramServer(self.bind, self._response_received) self._server = server return server
[docs] def broadcast(self): """ Send a multicast M-SEARCH request asking for devices to report in. """ log.debug("Broadcasting M-SEARCH to %s:%s", self.mcast_ip, self.mcast_port) request = '\r\n'.join(("M-SEARCH * HTTP/1.1", "HOST:{mcast_ip}:{mcast_port}", "ST:upnp:rootdevice", "MX:2", 'MAN:"ssdp:discover"', "", "")).format(**self.__dict__) self.server.sendto(request.encode(), (self.mcast_ip, self.mcast_port))
[docs]def test(): logging.basicConfig(level=logging.DEBUG) @receiver(discovered) def handler(sender, **kwargs): print("I GOT ONE") print(kwargs['address'], kwargs['headers']) upnp = UPnP() upnp.server.set_spawn(1) upnp.server.start() log.debug("Started server, listening for responses") with gevent.Timeout(2, KeyboardInterrupt): while True: try: upnp.broadcast() gevent.sleep(2) except KeyboardInterrupt: break
if __name__ == "__main__": test()