"""Base class for implementing servers""" # Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details. import sys import _socket import errno from gevent.greenlet import Greenlet from gevent.event import Event from gevent.hub import string_types, integer_types, get_hub, xrange __all__ = ['BaseServer'] # We define a helper function to handle closing the socket in # do_handle; We'd like to bind it to a kwarg to avoid *any* lookups at # all, but that's incompatible with the calling convention of # do_handle. On CPython, this is ~20% faster than creating and calling # a closure and ~10% faster than using a @staticmethod. (In theory, we # could create a closure only once in set_handle, to wrap self._handle, # but this is safer from a backwards compat standpoint.) # we also avoid unpacking the *args tuple when calling/spawning this object # for a tiny improvement (benchmark shows a wash) def _handle_and_close_when_done(handle, close, args_tuple): try: return handle(*args_tuple) finally: close(*args_tuple) class BaseServer(object): """ An abstract base class that implements some common functionality for the servers in gevent. :param listener: Either be an address that the server should bind on or a :class:`gevent.socket.socket` instance that is already bound (and put into listening mode in case of TCP socket). :keyword handle: If given, the request handler. The request handler can be defined in a few ways. Most commonly, subclasses will implement a ``handle`` method as an instance method. Alternatively, a function can be passed as the ``handle`` argument to the constructor. In either case, the handler can later be changed by calling :meth:`set_handle`. When the request handler returns, the socket used for the request will be closed. :keyword spawn: If provided, is called to create a new greenlet to run the handler. By default, :func:`gevent.spawn` is used (meaning there is no artificial limit on the number of concurrent requests). Possible values for *spawn*: - a :class:`gevent.pool.Pool` instance -- ``handle`` will be executed using :meth:`gevent.pool.Pool.spawn` only if the pool is not full. While it is full, no new connections are accepted; - :func:`gevent.spawn_raw` -- ``handle`` will be executed in a raw greenlet which has a little less overhead then :class:`gevent.Greenlet` instances spawned by default; - ``None`` -- ``handle`` will be executed right away, in the :class:`Hub` greenlet. ``handle`` cannot use any blocking functions as it would mean switching to the :class:`Hub`. - an integer -- a shortcut for ``gevent.pool.Pool(integer)`` .. versionchanged:: 1.1a1 When the *handle* function returns from processing a connection, the client socket will be closed. This resolves the non-deterministic closing of the socket, fixing ResourceWarnings under Python 3 and PyPy. """ #: the number of seconds to sleep in case there was an error in accept() call #: for consecutive errors the delay will double until it reaches max_delay #: when accept() finally succeeds the delay will be reset to min_delay again min_delay = 0.01 max_delay = 1 #: Sets the maximum number of consecutive accepts that a process may perform on #: a single wake up. High values give higher priority to high connection rates, #: while lower values give higher priority to already established connections. #: Default is 100. Note, that in case of multiple working processes on the same #: listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it #: to 1 when environ["wsgi.multiprocess"] is true) max_accept = 100 _spawn = Greenlet.spawn #: the default timeout that we wait for the client connections to close in stop() stop_timeout = 1 fatal_errors = (errno.EBADF, errno.EINVAL, errno.ENOTSOCK) def __init__(self, listener, handle=None, spawn='default'): self._stop_event = Event() self._stop_event.set() self._watcher = None self._timer = None self.pool = None try: self.set_listener(listener) self.set_spawn(spawn) self.set_handle(handle) self.delay = self.min_delay self.loop = get_hub().loop if self.max_accept < 1: raise ValueError('max_accept must be positive int: %r' % (self.max_accept, )) except: self.close() raise def set_listener(self, listener): if hasattr(listener, 'accept'): if hasattr(listener, 'do_handshake'): raise TypeError('Expected a regular socket, not SSLSocket: %r' % (listener, )) self.family = listener.family self.address = listener.getsockname() self.socket = listener else: self.family, self.address = parse_address(listener) def set_spawn(self, spawn): if spawn == 'default': self.pool = None self._spawn = self._spawn elif hasattr(spawn, 'spawn'): self.pool = spawn self._spawn = spawn.spawn elif isinstance(spawn, integer_types): from gevent.pool import Pool self.pool = Pool(spawn) self._spawn = self.pool.spawn else: self.pool = None self._spawn = spawn if hasattr(self.pool, 'full'): self.full = self.pool.full if self.pool is not None: self.pool._semaphore.rawlink(self._start_accepting_if_started) def set_handle(self, handle): if handle is not None: self.handle = handle if hasattr(self, 'handle'): self._handle = self.handle else: raise TypeError("'handle' must be provided") def _start_accepting_if_started(self, _event=None): if self.started: self.start_accepting() def start_accepting(self): if self._watcher is None: # just stop watcher without creating a new one? self._watcher = self.loop.io(self.socket.fileno(), 1) self._watcher.start(self._do_read) def stop_accepting(self): if self._watcher is not None: self._watcher.stop() self._watcher = None if self._timer is not None: self._timer.stop() self._timer = None def do_handle(self, *args): spawn = self._spawn handle = self._handle close = self.do_close try: if spawn is None: _handle_and_close_when_done(handle, close, args) else: spawn(_handle_and_close_when_done, handle, close, args) except: close(*args) raise def do_close(self, *args): pass def _do_read(self): for _ in xrange(self.max_accept): if self.full(): self.stop_accepting() return try: args = self.do_read() self.delay = self.min_delay if not args: return except: self.loop.handle_error(self, *sys.exc_info()) ex = sys.exc_info()[1] if self.is_fatal_error(ex): self.close() sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex))) return if self.delay >= 0: self.stop_accepting() self._timer = self.loop.timer(self.delay) self._timer.start(self._start_accepting_if_started) self.delay = min(self.max_delay, self.delay * 2) break else: try: self.do_handle(*args) except: self.loop.handle_error((args[1:], self), *sys.exc_info()) if self.delay >= 0: self.stop_accepting() self._timer = self.loop.timer(self.delay) self._timer.start(self._start_accepting_if_started) self.delay = min(self.max_delay, self.delay * 2) break def full(self): return False def __repr__(self): return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo()) def __str__(self): return '<%s %s>' % (type(self).__name__, self._formatinfo()) def _formatinfo(self): if hasattr(self, 'socket'): try: fileno = self.socket.fileno() except Exception as ex: fileno = str(ex) result = 'fileno=%s ' % fileno else: result = '' try: if isinstance(self.address, tuple) and len(self.address) == 2: result += 'address=%s:%s' % self.address else: result += 'address=%s' % (self.address, ) except Exception as ex: result += str(ex) or '' handle = self.__dict__.get('handle') if handle is not None: fself = getattr(handle, '__self__', None) try: if fself is self: # Checks the __self__ of the handle in case it is a bound # method of self to prevent recursivly defined reprs. handle_repr = '' % ( self.__class__.__name__, handle.__name__, ) else: handle_repr = repr(handle) result += ' handle=' + handle_repr except Exception as ex: result += str(ex) or '' return result @property def server_host(self): """IP address that the server is bound to (string).""" if isinstance(self.address, tuple): return self.address[0] @property def server_port(self): """Port that the server is bound to (an integer).""" if isinstance(self.address, tuple): return self.address[1] def init_socket(self): """If the user initialized the server with an address rather than socket, then this function will create a socket, bind it and put it into listening mode. It is not supposed to be called by the user, it is called by :meth:`start` before starting the accept loop.""" pass @property def started(self): return not self._stop_event.is_set() def start(self): """Start accepting the connections. If an address was provided in the constructor, then also create a socket, bind it and put it into the listening mode. """ self.init_socket() self._stop_event.clear() try: self.start_accepting() except: self.close() raise def close(self): """Close the listener socket and stop accepting.""" self._stop_event.set() try: self.stop_accepting() finally: try: self.socket.close() except Exception: pass finally: self.__dict__.pop('socket', None) self.__dict__.pop('handle', None) self.__dict__.pop('_handle', None) self.__dict__.pop('_spawn', None) self.__dict__.pop('full', None) if self.pool is not None: self.pool._semaphore.unlink(self._start_accepting_if_started) @property def closed(self): return not hasattr(self, 'socket') def stop(self, timeout=None): """ Stop accepting the connections and close the listening socket. If the server uses a pool to spawn the requests, then :meth:`stop` also waits for all the handlers to exit. If there are still handlers executing after *timeout* has expired (default 1 second, :attr:`stop_timeout`), then the currently running handlers in the pool are killed. If the server does not use a pool, then this merely stops accepting connections; any spawned greenlets that are handling requests continue running until they naturally complete. """ self.close() if timeout is None: timeout = self.stop_timeout if self.pool: self.pool.join(timeout=timeout) self.pool.kill(block=True, timeout=1) def serve_forever(self, stop_timeout=None): """Start the server if it hasn't been already started and wait until it's stopped.""" # add test that serve_forever exists on stop() if not self.started: self.start() try: self._stop_event.wait() finally: Greenlet.spawn(self.stop, timeout=stop_timeout).join() def is_fatal_error(self, ex): return isinstance(ex, _socket.error) and ex.args[0] in self.fatal_errors def _extract_family(host): if host.startswith('[') and host.endswith(']'): host = host[1:-1] return _socket.AF_INET6, host return _socket.AF_INET, host def _parse_address(address): if isinstance(address, tuple): if ':' in address[0]: return _socket.AF_INET6, address return _socket.AF_INET, address elif isinstance(address, string_types): if ':' in address: host, port = address.rsplit(':', 1) family, host = _extract_family(host) if host == '*': host = '' return family, (host, int(port)) else: return _socket.AF_INET, ('', int(address)) elif isinstance(address, integer_types): return _socket.AF_INET, ('', int(address)) else: raise TypeError('Expected tuple or string, got %s' % type(address)) def parse_address(address): try: return _parse_address(address) except ValueError as ex: raise ValueError('Failed to parse address %r: %s' % (address, ex))