from functools import wraps
import re
import socket
import struct
import time
import gevent
import requests
[docs]def tz_hours():
delta = time.localtime().tm_hour - time.gmtime().tm_hour
sign = '-' if delta < 0 else ''
return "%s%02d.00" % (sign, abs(delta))
[docs]def is_dst():
return 1 if time.localtime().tm_isdst else 0
[docs]def get_timesync():
timesync = """
<?xml version="1.0" encoding="utf-8"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:TimeSync xmlns:u="urn:Belkin:service:timesync:1">
<UTC>{utc}</UTC>
<TimeZone>{tz}</TimeZone>
<dst>{dst}</dst>
<DstSupported>{dstsupported}</DstSupported>
</u:TimeSync>
</s:Body>
</s:Envelope>""".format(
utc=int(time.time()),
tz=tz_hours(),
dst=is_dst(),
dstsupported=is_dst()).strip()
return timesync
[docs]def get_ip_address():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('1.2.3.4', 9))
return s.getsockname()[0]
except socket.error:
return None
finally:
del s
[docs]def matcher(match_string):
pattern = re.compile('.*?'.join(re.escape(c) for c in match_string.lower()))
def matches(s):
return pattern.search(s.lower()) is not None
return matches
# This is pretty arbitrary. I'm choosing, for no real reason, the length of
# a subscription.
_RETRIES = 1801/60
[docs]def get_retries():
return _RETRIES
[docs]def retry_with_delay(f, delay=60):
"""
Retry the wrapped requests.request function in case of ConnectionError.
Optionally limit the number of retries or set the delay between retries.
"""
@wraps(f)
def inner(*args, **kwargs):
kwargs['timeout'] = 5
remaining = get_retries() + 1
while remaining:
remaining -= 1
try:
return f(*args, **kwargs)
except (requests.ConnectionError, requests.Timeout):
if not remaining:
raise
gevent.sleep(delay)
return inner
requests_get = retry_with_delay(requests.get)
requests_post = retry_with_delay(requests.post)
requests_request = retry_with_delay(requests.request)