# SPDX-FileCopyrightText: 2015 Eric Larson # # SPDX-License-Identifier: Apache-2.0 """ The httplib2 algorithms ported for use with requests. """ from __future__ import annotations import calendar import logging import re import time from email.utils import parsedate_tz from typing import TYPE_CHECKING, Collection, Mapping from requests.structures import CaseInsensitiveDict from cachecontrol.cache import DictCache, SeparateBodyBaseCache from cachecontrol.serialize import Serializer if TYPE_CHECKING: from typing import Literal from requests import PreparedRequest from urllib3 import HTTPResponse from cachecontrol.cache import BaseCache logger = logging.getLogger(__name__) URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?") PERMANENT_REDIRECT_STATUSES = (301, 308) def parse_uri(uri: str) -> tuple[str, str, str, str, str]: """Parses a URI using the regex given in Appendix B of RFC 3986. (scheme, authority, path, query, fragment) = parse_uri(uri) """ match = URI.match(uri) assert match is not None groups = match.groups() return (groups[1], groups[3], groups[4], groups[6], groups[8]) class CacheController: """An interface to see if request should cached or not.""" def __init__( self, cache: BaseCache | None = None, cache_etags: bool = True, serializer: Serializer | None = None, status_codes: Collection[int] | None = None, ): self.cache = DictCache() if cache is None else cache self.cache_etags = cache_etags self.serializer = serializer or Serializer() self.cacheable_status_codes = status_codes or (200, 203, 300, 301, 308) @classmethod def _urlnorm(cls, uri: str) -> str: """Normalize the URL to create a safe key for the cache""" (scheme, authority, path, query, fragment) = parse_uri(uri) if not scheme or not authority: raise Exception("Only absolute URIs are allowed. uri = %s" % uri) scheme = scheme.lower() authority = authority.lower() if not path: path = "/" # Could do syntax based normalization of the URI before # computing the digest. See Section 6.2.2 of Std 66. request_uri = query and "?".join([path, query]) or path defrag_uri = scheme + "://" + authority + request_uri return defrag_uri @classmethod def cache_url(cls, uri: str) -> str: return cls._urlnorm(uri) def parse_cache_control(self, headers: Mapping[str, str]) -> dict[str, int | None]: known_directives = { # https://tools.ietf.org/html/rfc7234#section-5.2 "max-age": (int, True), "max-stale": (int, False), "min-fresh": (int, True), "no-cache": (None, False), "no-store": (None, False), "no-transform": (None, False), "only-if-cached": (None, False), "must-revalidate": (None, False), "public": (None, False), "private": (None, False), "proxy-revalidate": (None, False), "s-maxage": (int, True), } cc_headers = headers.get("cache-control", headers.get("Cache-Control", "")) retval: dict[str, int | None] = {} for cc_directive in cc_headers.split(","): if not cc_directive.strip(): continue parts = cc_directive.split("=", 1) directive = parts[0].strip() try: typ, required = known_directives[directive] except KeyError: logger.debug("Ignoring unknown cache-control directive: %s", directive) continue if not typ or not required: retval[directive] = None if typ: try: retval[directive] = typ(parts[1].strip()) except IndexError: if required: logger.debug( "Missing value for cache-control " "directive: %s", directive, ) except ValueError: logger.debug( "Invalid value for cache-control directive " "%s, must be %s", directive, typ.__name__, ) return retval def _load_from_cache(self, request: PreparedRequest) -> HTTPResponse | None: """ Load a cached response, or return None if it's not available. """ # We do not support caching of partial content: so if the request contains a # Range header then we don't want to load anything from the cache. if "Range" in request.headers: return None cache_url = request.url assert cache_url is not None cache_data = self.cache.get(cache_url) if cache_data is None: logger.debug("No cache entry available") return None if isinstance(self.cache, SeparateBodyBaseCache): body_file = self.cache.get_body(cache_url) else: body_file = None result = self.serializer.loads(request, cache_data, body_file) if result is None: logger.warning("Cache entry deserialization failed, entry ignored") return result def cached_request(self, request: PreparedRequest) -> HTTPResponse | Literal[False]: """ Return a cached response if it exists in the cache, otherwise return False. """ assert request.url is not None cache_url = self.cache_url(request.url) logger.debug('Looking up "%s" in the cache', cache_url) cc = self.parse_cache_control(request.headers) # Bail out if the request insists on fresh data if "no-cache" in cc: logger.debug('Request header has "no-cache", cache bypassed') return False if "max-age" in cc and cc["max-age"] == 0: logger.debug('Request header has "max_age" as 0, cache bypassed') return False # Check whether we can load the response from the cache: resp = self._load_from_cache(request) if not resp: return False # If we have a cached permanent redirect, return it immediately. We # don't need to test our response for other headers b/c it is # intrinsically "cacheable" as it is Permanent. # # See: # https://tools.ietf.org/html/rfc7231#section-6.4.2 # # Client can try to refresh the value by repeating the request # with cache busting headers as usual (ie no-cache). if int(resp.status) in PERMANENT_REDIRECT_STATUSES: msg = ( "Returning cached permanent redirect response " "(ignoring date and etag information)" ) logger.debug(msg) return resp headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers) if not headers or "date" not in headers: if "etag" not in headers: # Without date or etag, the cached response can never be used # and should be deleted. logger.debug("Purging cached response: no date or etag") self.cache.delete(cache_url) logger.debug("Ignoring cached response: no date") return False now = time.time() time_tuple = parsedate_tz(headers["date"]) assert time_tuple is not None date = calendar.timegm(time_tuple[:6]) current_age = max(0, now - date) logger.debug("Current age based on date: %i", current_age) # TODO: There is an assumption that the result will be a # urllib3 response object. This may not be best since we # could probably avoid instantiating or constructing the # response until we know we need it. resp_cc = self.parse_cache_control(headers) # determine freshness freshness_lifetime = 0 # Check the max-age pragma in the cache control header max_age = resp_cc.get("max-age") if max_age is not None: freshness_lifetime = max_age logger.debug("Freshness lifetime from max-age: %i", freshness_lifetime) # If there isn't a max-age, check for an expires header elif "expires" in headers: expires = parsedate_tz(headers["expires"]) if expires is not None: expire_time = calendar.timegm(expires[:6]) - date freshness_lifetime = max(0, expire_time) logger.debug("Freshness lifetime from expires: %i", freshness_lifetime) # Determine if we are setting freshness limit in the # request. Note, this overrides what was in the response. max_age = cc.get("max-age") if max_age is not None: freshness_lifetime = max_age logger.debug( "Freshness lifetime from request max-age: %i", freshness_lifetime ) min_fresh = cc.get("min-fresh") if min_fresh is not None: # adjust our current age by our min fresh current_age += min_fresh logger.debug("Adjusted current age from min-fresh: %i", current_age) # Return entry if it is fresh enough if freshness_lifetime > current_age: logger.debug('The response is "fresh", returning cached response') logger.debug("%i > %i", freshness_lifetime, current_age) return resp # we're not fresh. If we don't have an Etag, clear it out if "etag" not in headers: logger.debug('The cached response is "stale" with no etag, purging') self.cache.delete(cache_url) # return the original handler return False def conditional_headers(self, request: PreparedRequest) -> dict[str, str]: resp = self._load_from_cache(request) new_headers = {} if resp: headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers) if "etag" in headers: new_headers["If-None-Match"] = headers["ETag"] if "last-modified" in headers: new_headers["If-Modified-Since"] = headers["Last-Modified"] return new_headers def _cache_set( self, cache_url: str, request: PreparedRequest, response: HTTPResponse, body: bytes | None = None, expires_time: int | None = None, ) -> None: """ Store the data in the cache. """ if isinstance(self.cache, SeparateBodyBaseCache): # We pass in the body separately; just put a placeholder empty # string in the metadata. self.cache.set( cache_url, self.serializer.dumps(request, response, b""), expires=expires_time, ) # body is None can happen when, for example, we're only updating # headers, as is the case in update_cached_response(). if body is not None: self.cache.set_body(cache_url, body) else: self.cache.set( cache_url, self.serializer.dumps(request, response, body), expires=expires_time, ) def cache_response( self, request: PreparedRequest, response: HTTPResponse, body: bytes | None = None, status_codes: Collection[int] | None = None, ) -> None: """ Algorithm for caching requests. This assumes a requests Response object. """ # From httplib2: Don't cache 206's since we aren't going to # handle byte range requests cacheable_status_codes = status_codes or self.cacheable_status_codes if response.status not in cacheable_status_codes: logger.debug( "Status code %s not in %s", response.status, cacheable_status_codes ) return response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict( response.headers ) if "date" in response_headers: time_tuple = parsedate_tz(response_headers["date"]) assert time_tuple is not None date = calendar.timegm(time_tuple[:6]) else: date = 0 # If we've been given a body, our response has a Content-Length, that # Content-Length is valid then we can check to see if the body we've # been given matches the expected size, and if it doesn't we'll just # skip trying to cache it. if ( body is not None and "content-length" in response_headers and response_headers["content-length"].isdigit() and int(response_headers["content-length"]) != len(body) ): return cc_req = self.parse_cache_control(request.headers) cc = self.parse_cache_control(response_headers) assert request.url is not None cache_url = self.cache_url(request.url) logger.debug('Updating cache with response from "%s"', cache_url) # Delete it from the cache if we happen to have it stored there no_store = False if "no-store" in cc: no_store = True logger.debug('Response header has "no-store"') if "no-store" in cc_req: no_store = True logger.debug('Request header has "no-store"') if no_store and self.cache.get(cache_url): logger.debug('Purging existing cache entry to honor "no-store"') self.cache.delete(cache_url) if no_store: return # https://tools.ietf.org/html/rfc7234#section-4.1: # A Vary header field-value of "*" always fails to match. # Storing such a response leads to a deserialization warning # during cache lookup and is not allowed to ever be served, # so storing it can be avoided. if "*" in response_headers.get("vary", ""): logger.debug('Response header has "Vary: *"') return # If we've been given an etag, then keep the response if self.cache_etags and "etag" in response_headers: expires_time = 0 if response_headers.get("expires"): expires = parsedate_tz(response_headers["expires"]) if expires is not None: expires_time = calendar.timegm(expires[:6]) - date expires_time = max(expires_time, 14 * 86400) logger.debug(f"etag object cached for {expires_time} seconds") logger.debug("Caching due to etag") self._cache_set(cache_url, request, response, body, expires_time) # Add to the cache any permanent redirects. We do this before looking # that the Date headers. elif int(response.status) in PERMANENT_REDIRECT_STATUSES: logger.debug("Caching permanent redirect") self._cache_set(cache_url, request, response, b"") # Add to the cache if the response headers demand it. If there # is no date header then we can't do anything about expiring # the cache. elif "date" in response_headers: time_tuple = parsedate_tz(response_headers["date"]) assert time_tuple is not None date = calendar.timegm(time_tuple[:6]) # cache when there is a max-age > 0 max_age = cc.get("max-age") if max_age is not None and max_age > 0: logger.debug("Caching b/c date exists and max-age > 0") expires_time = max_age self._cache_set( cache_url, request, response, body, expires_time, ) # If the request can expire, it means we should cache it # in the meantime. elif "expires" in response_headers: if response_headers["expires"]: expires = parsedate_tz(response_headers["expires"]) if expires is not None: expires_time = calendar.timegm(expires[:6]) - date else: expires_time = None logger.debug( "Caching b/c of expires header. expires in {} seconds".format( expires_time ) ) self._cache_set( cache_url, request, response, body, expires_time, ) def update_cached_response( self, request: PreparedRequest, response: HTTPResponse ) -> HTTPResponse: """On a 304 we will get a new set of headers that we want to update our cached value with, assuming we have one. This should only ever be called when we've sent an ETag and gotten a 304 as the response. """ assert request.url is not None cache_url = self.cache_url(request.url) cached_response = self._load_from_cache(request) if not cached_response: # we didn't have a cached response return response # Lets update our headers with the headers from the new request: # http://tools.ietf.org/html/draft-ietf-httpbis-p4-conditional-26#section-4.1 # # The server isn't supposed to send headers that would make # the cached body invalid. But... just in case, we'll be sure # to strip out ones we know that might be problmatic due to # typical assumptions. excluded_headers = ["content-length"] cached_response.headers.update( { k: v for k, v in response.headers.items() if k.lower() not in excluded_headers } ) # we want a 200 b/c we have content via the cache cached_response.status = 200 # update our cache self._cache_set(cache_url, request, cached_response) return cached_response