"""Provides the 1.0, 1.1 and 1.2 protocol classes. """ from stomp.constants import * from stomp.exception import ConnectFailedException from stomp.listener import * import stomp.utils as utils log = logging.getLogger('stomp.py') class Protocol10(ConnectionListener): """ Represents version 1.0 of the protocol (see https://stomp.github.io/stomp-specification-1.0.html). Most users should not instantiate the protocol directly. See :py:mod:`stomp.connect` for connection classes. :param transport: :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set """ def __init__(self, transport, auto_content_length=True): self.transport = transport self.auto_content_length = auto_content_length transport.set_listener('protocol-listener', self) self.version = '1.0' def send_frame(self, cmd, headers=None, body=''): """ Encode and send a stomp frame through the underlying transport. :param str cmd: the protocol command :param dict headers: a map of headers to include in the frame :param body: the content of the message """ frame = utils.Frame(cmd, headers, body) self.transport.transmit(frame) def abort(self, transaction, headers=None, **keyword_headers): """ Abort a transaction. :param str transaction: the identifier of the transaction :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ assert transaction is not None, "'transaction' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_TRANSACTION] = transaction self.send_frame(CMD_ABORT, headers) def ack(self, id, transaction=None, receipt=None): """ Acknowledge 'consumption' of a message by id. :param str id: identifier of the message :param str transaction: include the acknowledgement in the specified transaction """ assert id is not None, "'id' is required" headers = {HDR_MESSAGE_ID: id} if transaction: headers[HDR_TRANSACTION] = transaction if receipt: headers[HDR_RECEIPT] = receipt self.send_frame(CMD_ACK, headers) def begin(self, transaction=None, headers=None, **keyword_headers): """ Begin a transaction. :param str transaction: the identifier for the transaction (optional - if not specified a unique transaction id will be generated) :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires :return: the transaction id :rtype: str """ headers = utils.merge_headers([headers, keyword_headers]) if not transaction: transaction = utils.get_uuid() headers[HDR_TRANSACTION] = transaction self.send_frame(CMD_BEGIN, headers) return transaction def commit(self, transaction=None, headers=None, **keyword_headers): """ Commit a transaction. :param str transaction: the identifier for the transaction :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ assert transaction is not None, "'transaction' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_TRANSACTION] = transaction self.send_frame(CMD_COMMIT, headers) def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers): """ Start a connection. :param str username: the username to connect with :param str passcode: the password used to authenticate with :param bool wait: if True, wait for the connection to be established/acknowledged :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ cmd = CMD_CONNECT headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_ACCEPT_VERSION] = self.version if username is not None: headers[HDR_LOGIN] = username if passcode is not None: headers[HDR_PASSCODE] = passcode self.send_frame(cmd, headers) if wait: self.transport.wait_for_connection() if self.transport.connection_error: raise ConnectFailedException() def disconnect(self, receipt=None, headers=None, **keyword_headers): """ Disconnect from the server. :param str receipt: the receipt to use (once the server acknowledges that receipt, we're officially disconnected; optional - if not specified a unique receipt id will be generated) :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ if not self.transport.is_connected(): log.debug('Not sending disconnect, already disconnected') return headers = utils.merge_headers([headers, keyword_headers]) rec = receipt or utils.get_uuid() headers[HDR_RECEIPT] = rec self.set_receipt(rec, CMD_DISCONNECT) self.send_frame(CMD_DISCONNECT, headers) def send(self, destination, body, content_type=None, headers=None, **keyword_headers): """ Send a message to a destination. :param str destination: the destination of the message (e.g. queue or topic name) :param body: the content of the message :param str content_type: the content type of the message :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ assert destination is not None, "'destination' is required" assert body is not None, "'body' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_DESTINATION] = destination if content_type: headers[HDR_CONTENT_TYPE] = content_type if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers: headers[HDR_CONTENT_LENGTH] = len(body) self.send_frame(CMD_SEND, headers, body) def subscribe(self, destination, id=None, ack='auto', headers=None, **keyword_headers): """ Subscribe to a destination. :param str destination: the topic or queue to subscribe to :param str id: a unique id to represent the subscription :param str ack: acknowledgement mode, either auto, client, or client-individual (see http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header) for more information :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ assert destination is not None, "'destination' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_DESTINATION] = destination if id: headers[HDR_ID] = id headers[HDR_ACK] = ack self.send_frame(CMD_SUBSCRIBE, headers) def unsubscribe(self, destination=None, id=None, headers=None, **keyword_headers): """ Unsubscribe from a destination by either id or the destination name. :param str destination: the name of the topic or queue to unsubscribe from :param str id: the unique identifier of the topic or queue to unsubscribe from :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ assert id is not None or destination is not None, "'id' or 'destination' is required" headers = utils.merge_headers([headers, keyword_headers]) if id: headers[HDR_ID] = id if destination: headers[HDR_DESTINATION] = destination self.send_frame(CMD_UNSUBSCRIBE, headers) class Protocol11(HeartbeatListener, ConnectionListener): """ Represents version 1.1 of the protocol (see https://stomp.github.io/stomp-specification-1.1.html). Most users should not instantiate the protocol directly. See :py:mod:`stomp.connect` for connection classes. :param transport: :param (int,int) heartbeats: :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set :param float heart_beat_receive_scale: how long to wait for a heartbeat before timing out, as a scale factor of receive time """ def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True, heart_beat_receive_scale=1.5): HeartbeatListener.__init__(self, heartbeats) self.transport = transport self.auto_content_length = auto_content_length transport.set_listener('protocol-listener', self) self.version = '1.1' self.heart_beat_receive_scale = heart_beat_receive_scale def _escape_headers(self, headers): """ :param dict(str,str) headers: """ for key, val in headers.items(): try: val = val.replace('\\', '\\\\').replace('\n', '\\n').replace(':', '\\c') except: pass headers[key] = val def send_frame(self, cmd, headers=None, body=''): """ Encode and send a stomp frame through the underlying transport: :param str cmd: the protocol command :param dict headers: a map of headers to include in the frame :param body: the content of the message """ if cmd != CMD_CONNECT: if headers is None: headers = {} self._escape_headers(headers) frame = utils.Frame(cmd, headers, body) self.transport.transmit(frame) def abort(self, transaction, headers=None, **keyword_headers): """ Abort a transaction. :param str transaction: the identifier of the transaction :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ assert transaction is not None, "'transaction' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_TRANSACTION] = transaction self.send_frame(CMD_ABORT, headers) def ack(self, id, subscription, transaction=None, receipt=None): """ Acknowledge 'consumption' of a message by id. :param str id: identifier of the message :param str subscription: the subscription this message is associated with :param str transaction: include the acknowledgement in the specified transaction """ assert id is not None, "'id' is required" assert subscription is not None, "'subscription' is required" headers = {HDR_MESSAGE_ID: id, HDR_SUBSCRIPTION: subscription} if transaction: headers[HDR_TRANSACTION] = transaction if receipt: headers[HDR_RECEIPT] = receipt self.send_frame(CMD_ACK, headers) def begin(self, transaction=None, headers=None, **keyword_headers): """ Begin a transaction. :param str transaction: the identifier for the transaction (optional - if not specified a unique transaction id will be generated) :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires :return: the transaction id :rtype: str """ headers = utils.merge_headers([headers, keyword_headers]) if not transaction: transaction = utils.get_uuid() headers[HDR_TRANSACTION] = transaction self.send_frame(CMD_BEGIN, headers) return transaction def commit(self, transaction=None, headers=None, **keyword_headers): """ Commit a transaction. :param str transaction: the identifier for the transaction :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ assert transaction is not None, "'transaction' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_TRANSACTION] = transaction self.send_frame(CMD_COMMIT, headers) def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers): """ Start a connection. :param str username: the username to connect with :param str passcode: the password used to authenticate with :param bool wait: if True, wait for the connection to be established/acknowledged :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ cmd = CMD_STOMP headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_ACCEPT_VERSION] = self.version if self.transport.vhost: headers[HDR_HOST] = self.transport.vhost if username is not None: headers[HDR_LOGIN] = username if passcode is not None: headers[HDR_PASSCODE] = passcode self.send_frame(cmd, headers) if wait: self.transport.wait_for_connection() if self.transport.connection_error: raise ConnectFailedException() def disconnect(self, receipt=None, headers=None, **keyword_headers): """ Disconnect from the server. :param str receipt: the receipt to use (once the server acknowledges that receipt, we're officially disconnected; optional - if not specified a unique receipt id will be generated) :param dict headers: a map of any additional headers the broker requires :param keyword_headers: any additional headers the broker requires """ if not self.transport.is_connected(): log.debug('Not sending disconnect, already disconnected') return headers = utils.merge_headers([headers, keyword_headers]) rec = receipt or utils.get_uuid() headers[HDR_RECEIPT] = rec self.set_receipt(rec, CMD_DISCONNECT) self.send_frame(CMD_DISCONNECT, headers) def nack(self, id, subscription, transaction=None, receipt=None): """ Let the server know that a message was not consumed. :param str id: the unique id of the message to nack :param str subscription: the subscription this message is associated with :param str transaction: include this nack in a named transaction """ assert id is not None, "'id' is required" assert subscription is not None, "'subscription' is required" headers = {HDR_MESSAGE_ID: id, HDR_SUBSCRIPTION: subscription} if transaction: headers[HDR_TRANSACTION] = transaction if receipt: headers[HDR_RECEIPT] = receipt self.send_frame(CMD_NACK, headers) def send(self, destination, body, content_type=None, headers=None, **keyword_headers): """ Send a message to a destination in the messaging system (as per https://stomp.github.io/stomp-specification-1.2.html#SEND) :param str destination: the destination (such as a message queue - for example '/queue/test' - or a message topic) :param body: the content of the message :param str content_type: the MIME type of message :param dict headers: additional headers to send in the message frame :param keyword_headers: any additional headers the broker requires """ assert destination is not None, "'destination' is required" assert body is not None, "'body' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_DESTINATION] = destination if content_type: headers[HDR_CONTENT_TYPE] = content_type if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers: headers[HDR_CONTENT_LENGTH] = len(body) self.send_frame(CMD_SEND, headers, body) def subscribe(self, destination, id, ack='auto', headers=None, **keyword_headers): """ Subscribe to a destination :param str destination: the topic or queue to subscribe to :param str id: the identifier to uniquely identify the subscription :param str ack: either auto, client or client-individual (see https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE for more info) :param dict headers: a map of any additional headers to send with the subscription :param keyword_headers: any additional headers to send with the subscription """ assert destination is not None, "'destination' is required" assert id is not None, "'id' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_DESTINATION] = destination headers[HDR_ID] = id headers[HDR_ACK] = ack self.send_frame(CMD_SUBSCRIBE, headers) def unsubscribe(self, id, headers=None, **keyword_headers): """ Unsubscribe from a destination by its unique identifier :param str id: the unique identifier to unsubscribe from :param dict headers: additional headers to send with the unsubscribe :param keyword_headers: any additional headers to send with the subscription """ assert id is not None, "'id' is required" headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_ID] = id self.send_frame(CMD_UNSUBSCRIBE, headers) class Protocol12(Protocol11): """ Represents version 1.2 of the protocol (see https://stomp.github.io/stomp-specification-1.2.html). Most users should not instantiate the protocol directly. See :py:mod:`stomp.connect` for connection classes. :param transport: :param (int,int) heartbeats: :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set :param float heart_beat_receive_scale: how long to wait for a heartbeat before timing out, as a scale factor of receive time """ def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True, heart_beat_receive_scale=1.5): Protocol11.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale) self.version = '1.2' def _escape_headers(self, headers): """ :param dict(str,str) headers: """ for key, val in headers.items(): try: val = val.replace('\\', '\\\\').replace('\n', '\\n').replace(':', '\\c').replace('\r', '\\r') except: pass headers[key] = val def ack(self, id, transaction=None, receipt=None): """ Acknowledge 'consumption' of a message by id. :param str id: identifier of the message :param str transaction: include the acknowledgement in the specified transaction """ assert id is not None, "'id' is required" headers = {HDR_ID: id} if transaction: headers[HDR_TRANSACTION] = transaction if receipt: headers[HDR_RECEIPT] = receipt self.send_frame(CMD_ACK, headers) def nack(self, id, transaction=None, receipt=None): """ Let the server know that a message was not consumed. :param str id: the unique id of the message to nack :param str transaction: include this nack in a named transaction """ assert id is not None, "'id' is required" headers = {HDR_ID: id} if transaction: headers[HDR_TRANSACTION] = transaction if receipt: headers[HDR_RECEIPT] = receipt self.send_frame(CMD_NACK, headers) def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers): """ Send a STOMP CONNECT frame. Differs from 1.0 and 1.1 versions in that the HOST header is enforced. :param str username: optionally specify the login user :param str passcode: optionally specify the user password :param bool wait: wait for the connection to complete before returning :param dict headers: a map of any additional headers to send with the subscription :param keyword_headers: any additional headers to send with the subscription """ cmd = CMD_STOMP headers = utils.merge_headers([headers, keyword_headers]) headers[HDR_ACCEPT_VERSION] = self.version headers[HDR_HOST] = self.transport.current_host_and_port[0] if self.transport.vhost: headers[HDR_HOST] = self.transport.vhost if username is not None: headers[HDR_LOGIN] = username if passcode is not None: headers[HDR_PASSCODE] = passcode self.send_frame(cmd, headers) if wait: self.transport.wait_for_connection() if self.transport.connection_error: raise ConnectFailedException()