"""Proxy classes for forwarding tornado handlers to be run in separate processes. This module uses ZeroMQ/PyZMQ sockets (DEALER/ROUTER) to enable individual Tornado handlers to be run in a separate backend process. Through the usage of DEALER/ROUTER sockets, multiple backend processes for a given handler can be started and requests will be load balanced among the backend processes. Authors: * Brian Granger """ #----------------------------------------------------------------------------- # Copyright (c) 2012 Brian Granger, Min Ragan-Kelley # # This file is part of pyzmq # # Distributed under the terms of the New BSD License. The full license is in # the file COPYING.BSD, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import logging import socket import time import uuid from tornado import web from tornado import stack_context import zmq from zmq.eventloop.zmqstream import ZMQStream from zmq.eventloop.ioloop import IOLoop, DelayedCallback from zmq.utils import jsonapi from .zmqweb import ZMQHTTPRequest #----------------------------------------------------------------------------- # Service client #----------------------------------------------------------------------------- class ZMQApplicationProxy(object): """A proxy for a ZeroMQ based ZMQApplication that is using ZMQHTTPRequest. This class is a proxy for a backend that is running a ZMQApplication and MUST be used with the ZMQHTTPRequest class. This version sends the reply parts (each generated by RequestHandler.flush) as a single multipart message for low latency replies. See ZMQStreamingApplicationProxy, for a version that has higher latency, but which sends each reply part as a separate zmq message. """ def __init__(self, loop=None, context=None): self.loop = loop if loop is not None else IOLoop.instance() self.context = context if context is not None else zmq.Context.instance() self._callbacks = {} self.socket = self.context.socket(zmq.DEALER) self.stream = ZMQStream(self.socket, self.loop) self.stream.on_recv(self._handle_reply) self.urls = [] def connect(self, url): """Connect the service client to the proto://ip:port given in the url.""" self.urls.append(url) self.socket.connect(url) def bind(self, url): """Bind the service client to the proto://ip:port given in the url.""" self.urls.append(url) self.socket.bind(url) def send_request(self, request, args, kwargs, handler, timeout): """Send a request to the service.""" req = {} req['method'] = request.method req['uri'] = request.uri req['version'] = request.version req['headers'] = dict(request.headers) body = request.body req['remote_ip'] = request.remote_ip req['protocol'] = request.protocol req['host'] = request.host req['files'] = request.files req['arguments'] = request.arguments req['args'] = args req['kwargs'] = kwargs msg_id = bytes(uuid.uuid4()) msg_list = [b'|', msg_id, jsonapi.dumps(req)] if body: msg_list.append(body) logging.debug('Sending request: %r', msg_list) self.stream.send_multipart(msg_list) if timeout > 0: def _handle_timeout(): handler.send_error(504) # Gateway timeout try: self._callbacks.pop(msg_id) except KeyError: logging.error('Unexpected error removing callbacks') dc = DelayedCallback(_handle_timeout, timeout, self.loop) dc.start() else: dc = None self._callbacks[msg_id] = (handler, dc) return msg_id def _handle_reply(self, msg_list): logging.debug('Handling reply: %r', msg_list) len_msg_list = len(msg_list) if len_msg_list < 3 or not msg_list[0] == b'|': logging.error('Unexpected reply in ZMQApplicationProxy._handle_reply') return msg_id = msg_list[1] replies = msg_list[2:] cb = self._callbacks.pop(msg_id, None) if cb is not None: handler, dc = cb if dc is not None: dc.stop() try: for reply in replies: handler.write(reply) # The backend has already processed the headers and they are # included in the above write calls, so we manually tell the # handler that the headers are already written. handler._headers_written = True # We set transforms to an empty list because the backend # has already applied all of the transforms. handler._transforms = [] handler.finish() except: logging.error('Unexpected error in ZMQApplicationProxy._handle_reply', exc_info=True) class ZMQStreamingApplicationProxy(ZMQApplicationProxy): """A proxy for a ZeroMQ based ZMQApplication that is using ZMQStreamingHTTPRequest. This class is a proxy for a backend that is running a ZMQApplication and MUST be used with the ZMQStreamingHTTPRequest class. This version sends the reply parts (each generated by RequestHandler.flush) as separate zmq messages to enable streaming replies. See ZMQApplicationProxy, for a version that has lower latency, but which sends all reply parts as a single zmq message. """ def _handle_reply(self, msg_list): logging.debug('Handling reply: %r', msg_list) len_msg_list = len(msg_list) if len_msg_list < 3 or not msg_list[0] == b'|': logging.error('Unexpected reply in ZMQStreamingApplicationProxy._handle_reply') return msg_id = msg_list[1] reply = msg_list[2] cb = self._callbacks.get(msg_id) if cb is not None: handler, dc = cb if reply == b'DATA' and len_msg_list == 4: if dc is not None: # Stop the timeout DelayedCallback and set it to None. dc.stop() self._callbacks[msg_id] = (handler, None) try: handler.write(msg_list[3]) # The backend has already processed the headers and they are # included in the above write calls, so we manually tell the # handler that the headers are already written. handler._headers_written = True # We set transforms to an empty list because the backend # has already applied all of the transforms. handler._transforms = [] handler.flush() except socket.error: # socket.error is raised if the client disconnects while # we are sending. pass except: logging.error('Unexpected write error', exc_info=True) elif reply == b'FINISH': # We are done so we can get rid of the callbacks for this msg_id. self._callbacks.pop(msg_id) try: handler.finish() except socket.error: # socket.error is raised if the client disconnects while # we are sending. pass except: logging.error('Unexpected finish error', exc_info=True) class ZMQRequestHandlerProxy(web.RequestHandler): """A handler for use with a ZeroMQ backend service client.""" SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT", "OPTIONS") def initialize(self, proxy, timeout=0): """Initialize with a proxy and timeout. Parameters ---------- proxy : ZMQApplicationProxy. ZMQStreamingApplicationProxy A proxy instance that will be used to send requests to a backend process. timeout : int The timeout, in milliseconds. If this timeout is reached before the backend's first reply, then the server is sent a status code of 504 to the browser to indicate a gateway/proxy timeout. Set to 0 or a negative number to disable (infinite timeout). """ # zmqweb Note: This method is empty in the base class. self.proxy = proxy self.timeout = timeout def _execute(self, transforms, *args, **kwargs): """Executes this request with the given output transforms.""" # ZMQWEB NOTE: Transforms should be applied in the backend service so # we null any transforms passed in here. This may be a little too # silent, but there may be other handlers that do need the transforms. self._transforms = [] # ZMQWEB NOTE: This following try/except block is taken from the base # class, but is modified to send the request to the proxy. try: if self.request.method not in self.SUPPORTED_METHODS: raise web.HTTPError(405) # ZMQWEB NOTE: We have removed the XSRF cookie handling from here # as it will be handled in the backend. self.prepare() if not self._finished: # ZMQWEB NOTE: Here is where we send the request to the proxy. # We don't decode args or kwargs as that will be done in the # backen. self.proxy.send_request( self.request, args, kwargs, self, self.timeout ) except Exception: # ZMQWEB NOTE: We don't call the usual error handling logic # as that will be called by the backend process. logging.error('Unexpected error in _execute', exc_info=True)