"""
Utilities
==============
Rate limiter classes.
These are basically callables that when called register that a request was
issued. Depending on how they are configured that may cause a pause or exception
if a rate limit has been exceeded. Obviously it is up to the calling code to ensure
that these callables are invoked with every (successful?) call to the backend
API. (There is probably a better way to hook these into the requests library
directly ... TBD.)
From the Strava docs:
Strava API usage is limited on a per-application basis using a short term,
15 minute, limit and a long term, daily, limit. The default rate limit allows
600 requests every 15 minutes, with up to 30,000 requests per day.
This limit allows applications to make 40 requests per minute for about
half the day.
"""
from __future__ import division, absolute_import, print_function, unicode_literals
import collections
import logging
import time
from datetime import datetime, timedelta
import arrow
from stravalib import exc
[docs]def total_seconds(td):
"""Alternative to datetime.timedelta.total_seconds
total_seconds() only available since Python 2.7
https://docs.python.org/2/library/datetime.html#datetime.timedelta.total_seconds
"""
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
RequestRate = collections.namedtuple('RequestRate', ['short_usage', 'long_usage', 'short_limit', 'long_limit'])
[docs]def get_seconds_until_next_quarter(now=None):
"""
Returns the number of seconds until the next quarter of an hour. This is the short-term rate limit used by Strava.
:param now: A (utc) timestamp
:type now: arrow.arrow.Arrow
:return: the number of seconds until the next quarter, as int
"""
if now is None:
now = arrow.utcnow()
return 899 - (now - now.replace(minute=(now.minute // 15) * 15, second=0, microsecond=0)).seconds
[docs]def get_seconds_until_next_day(now=None):
"""
Returns the number of seconds until the next day (utc midnight). This is the long-term rate limit used by Strava.
:param now: A (utc) timestamp
:type now: arrow.arrow.Arrow
:return: the number of seconds until next day, as int
"""
if now is None:
now = arrow.utcnow()
return (now.ceil('day') - now).seconds
[docs]class XRateLimitRule(object):
def __init__(self, limits, force_limits=False):
"""
:param limits: THe limits structure.
:param force_limits: If False (default), this rule will set/update its limits based on what the Strava API
tells it. If True, the provided limits will be enforced, i.e. ignoring the limits given by the API.
"""
self.log = logging.getLogger('{0.__module__}.{0.__name__}'.format(self.__class__))
self.rate_limits = limits
# should limit args be validated?
self.limit_time_invalid = 0
self.force_limits = force_limits
@property
def limit_timeout(self):
return self.limit_time_invalid
def __call__(self, response_headers):
self._update_usage(response_headers)
for limit in self.rate_limits.values():
self._check_limit_time_invalid(limit)
self._check_limit_rates(limit)
def _update_usage(self, response_headers):
rates = get_rates_from_response_headers(response_headers)
if rates:
self.log.debug("Updating rate-limit limits and usage from headers: {}".format(rates))
self.rate_limits['short']['usage'] = rates.short_usage
self.rate_limits['long']['usage'] = rates.long_usage
if not self.force_limits:
self.rate_limits['short']['limit'] = rates.short_limit
self.rate_limits['long']['limit'] = rates.long_limit
def _check_limit_rates(self, limit):
if limit['usage'] >= limit['limit']:
self.log.debug("Rate limit of {0} reached.".format(limit['limit']))
limit['lastExceeded'] = datetime.now()
self._raise_rate_limit_exception(limit['limit'], limit['time'])
def _check_limit_time_invalid(self, limit):
self.limit_time_invalid = 0
if limit['lastExceeded'] is not None:
delta = (datetime.now() - limit['lastExceeded']).total_seconds()
if delta < limit['time']:
self.limit_time_invalid = limit['time'] - delta
self.log.debug("Rate limit invalid duration {0} seconds."
.format(self.limit_time_invalid))
self._raise_rate_limit_timeout(self.limit_timeout, limit['limit'])
def _raise_rate_limit_exception(self, timeout, limit_rate):
raise exc.RateLimitExceeded("Rate limit of {0} exceeded. "
"Try again in {1} seconds.".format(limit_rate, timeout),
limit=limit_rate, timeout=timeout)
def _raise_rate_limit_timeout(self, timeout, limit_rate):
raise exc.RateLimitTimeout("Rate limit of {0} exceeded. "
"Try again in {1} seconds.".format(limit_rate, timeout),
limit=limit_rate, timeout=timeout)
[docs]class SleepingRateLimitRule(object):
"""
A rate limit rule that can be prioritized and can dynamically adapt its limits based on API responses.
Given its priority, it will enforce a variable "cool-down" period after each response. When rate limits
are reached within their period, this limiter will wait until the end of that period. It will NOT raise
any kind of exception in this case.
"""
def __init__(self, priority='high', short_limit=10000, long_limit=1000000, force_limits=False):
"""
Constructs a new SleepingRateLimitRule.
:param priority: The priority for this rule. When 'low', the cool-down period after each request will be such
that the long-term limits will not be exceeded. When 'medium', the cool-down period will be such that the
short-term limits will not be exceeded. When 'high', there will be no cool-down period.
:type priority: str
:param short_limit: (Optional) explicit short-term limit
:type short_limit: int
:param long_limit: (Optional) explicit long-term limit
:type long_limit: int
:param force_limits: If False (default), this rule will set/update its limits based on what the Strava API
tells it. If True, the provided limits will be enforced, i.e. ignoring the limits given by the API.
"""
if priority not in ['low', 'medium', 'high']:
raise ValueError('Invalid priority "{0}", expecting one of "low", "medium" or "high"'.format(priority))
self.log = logging.getLogger('{0.__module__}.{0.__name__}'.format(self.__class__))
self.priority = priority
self.short_limit = short_limit
self.long_limit = long_limit
self.force_limits = force_limits
def _get_wait_time(self, short_usage, long_usage, seconds_until_short_limit, seconds_until_long_limit):
if long_usage >= self.long_limit:
self.log.warning('Long term API rate limit exceeded')
return seconds_until_long_limit
elif short_usage >= self.short_limit:
self.log.warning('Short term API rate limit exceeded')
return seconds_until_short_limit
if self.priority == 'high':
return 0
elif self.priority == 'medium':
return seconds_until_short_limit / (self.short_limit - short_usage)
elif self.priority == 'low':
return seconds_until_long_limit / (self.long_limit - long_usage)
def __call__(self, response_headers):
rates = get_rates_from_response_headers(response_headers)
if rates:
time.sleep(self._get_wait_time(rates.short_usage, rates.long_usage,
get_seconds_until_next_quarter(), get_seconds_until_next_day()))
if not self.force_limits:
self.short_limit = rates.short_limit
self.long_limit = rates.long_limit
[docs]class RateLimitRule(object):
def __init__(self, requests, seconds, raise_exc=False):
"""
:param requests: Number of requests for limit.
:param seconds: The number of seconds for that number of requests (may be float)
:param raise_exc: Whether to raise an exception when limit is reached (as opposed to pausing)
"""
self.log = logging.getLogger('{0.__module__}.{0.__name__}'.format(self.__class__))
self.timeframe = timedelta(seconds=seconds)
self.requests = requests
self.tab = collections.deque(maxlen=self.requests)
self.raise_exc = raise_exc
def __call__(self, args):
"""
Register another request is being issued.
Depending on configuration of the rule will pause if rate limit has
been reached, or raise exception, etc.
"""
# First check if the deque is full; that indicates that we'd better check whether
# we need to pause.
if len(self.tab) == self.requests:
# Grab the oldest (leftmost) timestamp and check to see if it is greater than 1 second
delta = datetime.now() - self.tab[0]
if delta < self.timeframe: # Has it been less than configured timeframe since oldest request?
if self.raise_exc:
raise exc.RateLimitExceeded("Rate limit exceeded (can try again in {0})".format(self.timeframe - delta))
else:
# Wait the difference between timeframe and the oldest request.
td = self.timeframe - delta
sleeptime = hasattr(td, 'total_seconds') and td.total_seconds() or total_seconds(td)
self.log.debug("Rate limit triggered; sleeping for {0}".format(sleeptime))
time.sleep(sleeptime)
self.tab.append(datetime.now())
[docs]class RateLimiter(object):
def __init__(self):
self.log = logging.getLogger('{0.__module__}.{0.__name__}'.format(self.__class__))
self.rules = []
def __call__(self, args):
"""
Register another request is being issued.
"""
for r in self.rules:
r(args)
[docs]class DefaultRateLimiter(RateLimiter):
"""
Implements something similar to the default rate limit for Strava apps.
To do this correctly we would actually need to change our logic to reset
the limit at midnight, etc. Will make this more complex in the future.
Strava API usage is limited on a per-application basis using a short term,
15 minute, limit and a long term, daily, limit. The default rate limit allows
600 requests every 15 minutes, with up to 30,000 requests per day.
"""
def __init__(self):
"""
Strava API usage is limited on a per-application basis using a short term,
15 minute, limit and a long term, daily, limit. The default rate limit
allows 600 requests every 15 minutes, with up to 30,000 requests per day.
This limit allows applications to make 40 requests per minute for about half the day.
"""
super(DefaultRateLimiter, self).__init__()
self.rules.append(XRateLimitRule(
{'short': {'usageFieldIndex': 0, 'usage': 0,
# 60s * 15 = 15 min
'limit': 600, 'time': (60*15),
'lastExceeded': None},
'long': {'usageFieldIndex': 1, 'usage': 0,
# 60s * 60m * 24 = 1 day
'limit': 30000, 'time': (60*60*24),
'lastExceeded': None}}))
# XRateLimitRule used instead of timer based RateLimitRule
# self.rules.append(RateLimitRule(requests=40, seconds=60, raise_exc=False))
# self.rules.append(RateLimitRule(requests=30000, seconds=(3600 * 24), raise_exc=True))