# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import absolute_import import errno import logging import socket import time import weakref from ._condition import Condition from ._delivery import Delivery from ._endpoints import Endpoint from ._events import Event, Handler, _dispatch from ._exceptions import ProtonException from ._io import IO from ._message import Message from ._selectable import Selectable from ._transport import Transport from ._url import Url log = logging.getLogger("proton") class OutgoingMessageHandler(Handler): """ A utility for simpler and more intuitive handling of delivery events related to outgoing i.e. sent messages. :param auto_settle: If ``True``, settle all messages (default). Otherwise messages must be explicitly settled. :type auto_settle: ``bool`` :param delegate: A client handler for the endpoint event """ def __init__(self, auto_settle=True, delegate=None): self.auto_settle = auto_settle self.delegate = delegate def on_link_flow(self, event): if event.link.is_sender and event.link.credit \ and event.link.state & Endpoint.LOCAL_ACTIVE \ and event.link.state & Endpoint.REMOTE_ACTIVE: self.on_sendable(event) def on_delivery(self, event): dlv = event.delivery if dlv.link.is_sender and dlv.updated: if dlv.remote_state == Delivery.ACCEPTED: self.on_accepted(event) elif dlv.remote_state == Delivery.REJECTED: self.on_rejected(event) elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: self.on_released(event) if dlv.settled: self.on_settled(event) if self.auto_settle: dlv.settle() def on_sendable(self, event): """ Called when the sender link has credit and messages can therefore be transferred. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_sendable', event) def on_accepted(self, event): """ Called when the remote peer accepts an outgoing message. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_accepted', event) def on_rejected(self, event): """ Called when the remote peer rejects an outgoing message. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_rejected', event) def on_released(self, event): """ Called when the remote peer releases an outgoing message. Note that this may be in response to either the ``RELEASE`` or ``MODIFIED`` state as defined by the AMQP specification. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_released', event) def on_settled(self, event): """ Called when the remote peer has settled the outgoing message. This is the point at which it should never be retransmitted. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_settled', event) def recv_msg(delivery): msg = Message() msg.decode(delivery.link.recv(delivery.pending)) delivery.link.advance() return msg class Reject(ProtonException): """ An exception that indicates a message should be rejected. """ pass class Release(ProtonException): """ An exception that indicates a message should be released. """ pass class Acking(object): """ A class containing methods for handling received messages. """ def accept(self, delivery): """ Accepts a received message. .. note:: This method cannot currently be used in combination with transactions. See :class:`proton.reactor.Transaction` for transactional methods. :param delivery: The message delivery tracking object :type delivery: :class:`proton.Delivery` """ self.settle(delivery, Delivery.ACCEPTED) def reject(self, delivery): """ Rejects a received message that is considered invalid or unprocessable. .. note:: This method cannot currently be used in combination with transactions. See :class:`proton.reactor.Transaction` for transactional methods. :param delivery: The message delivery tracking object :type delivery: :class:`proton.Delivery` """ self.settle(delivery, Delivery.REJECTED) def release(self, delivery, delivered=True): """ Releases a received message, making it available at the source for any (other) interested receiver. The ``delivered`` parameter indicates whether this should be considered a delivery attempt (and the delivery count updated) or not. .. note:: This method cannot currently be used in combination with transactions. See :class:`proton.reactor.Transaction` for transactional methods. :param delivery: The message delivery tracking object :type delivery: :class:`proton.Delivery` :param delivered: If ``True``, the message will be annotated with a delivery attempt (setting delivery flag :const:`proton.Delivery.MODIFIED`). Otherwise, the message will be returned without the annotation and released (setting delivery flag :const:`proton.Delivery.RELEASED` :type delivered: ``bool`` """ if delivered: self.settle(delivery, Delivery.MODIFIED) else: self.settle(delivery, Delivery.RELEASED) def settle(self, delivery, state=None): """ Settles the message delivery, and optionally updating the delivery state. :param delivery: The message delivery tracking object :type delivery: :class:`proton.Delivery` :param state: The delivery state, or ``None`` if not update is to be performed. :type state: ``int`` or ``None`` """ if state: delivery.update(state) delivery.settle() class IncomingMessageHandler(Handler, Acking): """ A utility for simpler and more intuitive handling of delivery events related to incoming i.e. received messages. :type auto_accept: ``bool`` :param auto_accept: If ``True``, accept all messages (default). Otherwise messages must be individually accepted or rejected. :param delegate: A client handler for the endpoint event """ def __init__(self, auto_accept=True, delegate=None): self.delegate = delegate self.auto_accept = auto_accept def on_delivery(self, event): dlv = event.delivery if not dlv.link.is_receiver: return if dlv.aborted: self.on_aborted(event) dlv.settle() elif dlv.readable and not dlv.partial: event.message = recv_msg(dlv) if event.link.state & Endpoint.LOCAL_CLOSED: if self.auto_accept: dlv.update(Delivery.RELEASED) dlv.settle() else: try: self.on_message(event) if self.auto_accept: dlv.update(Delivery.ACCEPTED) dlv.settle() except Reject: dlv.update(Delivery.REJECTED) dlv.settle() except Release: dlv.update(Delivery.MODIFIED) dlv.settle() elif dlv.updated and dlv.settled: self.on_settled(event) def on_message(self, event): """ Called when a message is received. The message itself can be obtained as a property on the event. For the purpose of referring to this message in further actions (e.g. if explicitly accepting it, the ``delivery`` should be used, also obtainable via a property on the event. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_message', event) def on_settled(self, event): """ Callback for when a message delivery is settled by the remote peer. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_settled', event) def on_aborted(self, event): """ Callback for when a message delivery is aborted by the remote peer. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_aborted', event) class EndpointStateHandler(Handler): """ A utility that exposes 'endpoint' events - ie the open/close for links, sessions and connections in a more intuitive manner. A ``XXX_opened()`` method will be called when both local and remote peers have opened the link, session or connection. This can be used to confirm a locally initiated action for example. A ``XXX_opening()`` method will be called when the remote peer has requested an open that was not initiated locally. By default this will simply open locally, which then triggers the ``XXX_opened()`` call. The same applies to close. :param peer_close_is_error: If ``True``, a peer endpoint closing will be treated as an error with an error callback. Otherwise (default), the normal callbacks for the closing will occur. :type peer_close_is_error: ``bool`` :param delegate: A client handler for the endpoint event """ def __init__(self, peer_close_is_error=False, delegate=None): self.delegate = delegate self.peer_close_is_error = peer_close_is_error @classmethod def is_local_open(cls, endpoint): """ Test if local ``enpoint`` is open (ie has state :const:`proton.Endpoint.LOCAL_ACTIVE`). :param endpoint: The local endpoint to be tested. :type endpoint: Any child of :class:`proton.Endpoint` :return: ``True`` if local endpoint is in state :const:`proton.Endpoint.LOCAL_ACTIVE`, ``False`` otherwise. :rtype: ``bool`` """ return endpoint.state & Endpoint.LOCAL_ACTIVE @classmethod def is_local_uninitialised(cls, endpoint): """ Test if local ``enpoint`` is uninitialised (ie has state :const:`proton.Endpoint.LOCAL_UNINIT`). :param endpoint: The local endpoint to be tested. :type endpoint: Any child of :class:`proton.Endpoint` :return: ``True`` if local endpoint is in state :const:`proton.Endpoint.LOCAL_UNINIT`, ``False`` otherwise. :rtype: ``bool`` """ return endpoint.state & Endpoint.LOCAL_UNINIT @classmethod def is_local_closed(cls, endpoint): """ Test if local ``enpoint`` is closed (ie has state :const:`proton.Endpoint.LOCAL_CLOSED`). :param endpoint: The local endpoint to be tested. :type endpoint: Any child of :class:`proton.Endpoint` :return: ``True`` if local endpoint is in state :const:`proton.Endpoint.LOCAL_CLOSED`, ``False`` otherwise. :rtype: ``bool`` """ return endpoint.state & Endpoint.LOCAL_CLOSED @classmethod def is_remote_open(cls, endpoint): """ Test if remote ``enpoint`` is open (ie has state :const:`proton.Endpoint.LOCAL_ACTIVE`). :param endpoint: The remote endpoint to be tested. :type endpoint: Any child of :class:`proton.Endpoint` :return: ``True`` if remote endpoint is in state :const:`proton.Endpoint.LOCAL_ACTIVE`, ``False`` otherwise. :rtype: ``bool`` """ return endpoint.state & Endpoint.REMOTE_ACTIVE @classmethod def is_remote_closed(cls, endpoint): """ Test if remote ``enpoint`` is closed (ie has state :const:`proton.Endpoint.REMOTE_CLOSED`). :param endpoint: The remote endpoint to be tested. :type endpoint: Any child of :class:`proton.Endpoint` :return: ``True`` if remote endpoint is in state :const:`proton.Endpoint.REMOTE_CLOSED`, ``False`` otherwise. :rtype: ``bool`` """ return endpoint.state & Endpoint.REMOTE_CLOSED @classmethod def print_error(cls, endpoint, endpoint_type): """ Logs an error message related to an error condition at an endpoint. :param endpoint: The endpoint to be tested :type endpoint: :class:`proton.Endpoint` :param endpoint_type: The endpoint type as a string to be printed in the log message. :type endpoint_type: ``str`` """ if endpoint.remote_condition: log.error(endpoint.remote_condition.description or endpoint.remote_condition.name) elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): log.error("%s closed by peer" % endpoint_type) def on_link_remote_close(self, event): if event.link.remote_condition: self.on_link_error(event) elif self.is_local_closed(event.link): self.on_link_closed(event) else: self.on_link_closing(event) event.link.close() def on_session_remote_close(self, event): if event.session.remote_condition: self.on_session_error(event) elif self.is_local_closed(event.session): self.on_session_closed(event) else: self.on_session_closing(event) event.session.close() def on_connection_remote_close(self, event): if event.connection.remote_condition: if event.connection.remote_condition.name == "amqp:connection:forced": # Treat this the same as just having the transport closed by the peer without # sending any events. Allow reconnection to happen transparently. return self.on_connection_error(event) elif self.is_local_closed(event.connection): self.on_connection_closed(event) else: self.on_connection_closing(event) event.connection.close() def on_connection_local_open(self, event): if self.is_remote_open(event.connection): self.on_connection_opened(event) def on_connection_remote_open(self, event): if self.is_local_open(event.connection): self.on_connection_opened(event) elif self.is_local_uninitialised(event.connection): self.on_connection_opening(event) event.connection.open() def on_session_local_open(self, event): if self.is_remote_open(event.session): self.on_session_opened(event) def on_session_remote_open(self, event): if self.is_local_open(event.session): self.on_session_opened(event) elif self.is_local_uninitialised(event.session): self.on_session_opening(event) event.session.open() def on_link_local_open(self, event): if self.is_remote_open(event.link): self.on_link_opened(event) def on_link_remote_open(self, event): if self.is_local_open(event.link): self.on_link_opened(event) elif self.is_local_uninitialised(event.link): self.on_link_opening(event) event.link.open() def on_connection_opened(self, event): """ Callback for when both the local and remote endpoints of a connection have opened. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_connection_opened', event) def on_session_opened(self, event): """ Callback for when both the local and remote endpoints of a session have opened. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_session_opened', event) def on_link_opened(self, event): """ Callback for when both the local and remote endpoints of a link have opened. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_link_opened', event) def on_connection_opening(self, event): """ Callback for when a remote peer initiates the opening of a connection. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_connection_opening', event) def on_session_opening(self, event): """ Callback for when a remote peer initiates the opening of a session. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_session_opening', event) def on_link_opening(self, event): """ Callback for when a remote peer initiates the opening of a link. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_link_opening', event) def on_connection_error(self, event): """ Callback for when an initiated connection open fails. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_connection_error', event) else: self.print_error(event.connection, "connection") def on_session_error(self, event): """ Callback for when an initiated session open fails. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_session_error', event) else: self.print_error(event.session, "session") event.connection.close() def on_link_error(self, event): """ Callback for when an initiated link open fails. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_link_error', event) else: self.print_error(event.link, "link") event.connection.close() def on_connection_closed(self, event): """ Callback for when both the local and remote endpoints of a connection have closed. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_connection_closed', event) def on_session_closed(self, event): """ Callback for when both the local and remote endpoints of a session have closed. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_session_closed', event) def on_link_closed(self, event): """ Callback for when both the local and remote endpoints of a link have closed. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_link_closed', event) def on_connection_closing(self, event): """ Callback for when a remote peer initiates the closing of a connection. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_connection_closing', event) elif self.peer_close_is_error: self.on_connection_error(event) def on_session_closing(self, event): """ Callback for when a remote peer initiates the closing of a session. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_session_closing', event) elif self.peer_close_is_error: self.on_session_error(event) def on_link_closing(self, event): """ Callback for when a remote peer initiates the closing of a link. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None: _dispatch(self.delegate, 'on_link_closing', event) elif self.peer_close_is_error: self.on_link_error(event) def on_transport_tail_closed(self, event): """ Callback for when the transport tail has closed (ie no further input will be accepted by the transport). :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ self.on_transport_closed(event) def on_transport_closed(self, event): """ Callback for when the transport has closed - ie both the head (input) and tail (output) of the transport pipeline are closed. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if self.delegate is not None and event.connection and self.is_local_open(event.connection): _dispatch(self.delegate, 'on_disconnected', event) class MessagingHandler(Handler, Acking): """ A general purpose handler that makes the proton-c events somewhat simpler to deal with and/or avoids repetitive tasks for common use cases. :param prefetch: Initial flow credit for receiving messages, defaults to 10. :type prefetch: ``int`` :param auto_accept: If ``True``, accept all messages (default). Otherwise messages must be individually accepted or rejected. :type auto_accept: ``bool`` :param auto_settle: If ``True``, settle all messages (default). Otherwise messages must be explicitly settled. :type auto_settle: ``bool`` :param peer_close_is_error: If ``True``, a peer endpoint closing will be treated as an error with an error callback. Otherwise (default), the normal callbacks for the closing will occur. :type peer_close_is_error: ``bool`` """ def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False): self.handlers = [] if prefetch: self.handlers.append(FlowController(prefetch)) self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) self.fatal_conditions = ["amqp:unauthorized-access"] def on_transport_error(self, event): """ Called when some error is encountered with the transport over which the AMQP connection is to be established. This includes authentication errors as well as socket errors. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if event.transport.condition: if event.transport.condition.info: log.error("%s: %s: %s" % ( event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) else: log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) if event.transport.condition.name in self.fatal_conditions: event.connection.close() else: logging.error("Unspecified transport error") def on_connection_error(self, event): """ Called when the peer closes the connection with an error condition. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ EndpointStateHandler.print_error(event.connection, "connection") def on_session_error(self, event): """ Called when the peer closes the session with an error condition. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ EndpointStateHandler.print_error(event.session, "session") event.connection.close() def on_link_error(self, event): """ Called when the peer closes the link with an error condition. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ EndpointStateHandler.print_error(event.link, "link") event.connection.close() def on_reactor_init(self, event): """ Called when the event loop - the reactor - starts. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ if hasattr(event.reactor, 'subclass'): setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) self.on_start(event) def on_start(self, event): """ Called when the event loop starts. (Just an alias for on_reactor_init) :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_connection_closed(self, event): """ Called when the connection is closed. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_session_closed(self, event): """ Called when the session is closed. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_link_closed(self, event): """ Called when the link is closed. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_connection_closing(self, event): """ Called when the peer initiates the closing of the connection. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_session_closing(self, event): """ Called when the peer initiates the closing of the session. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_link_closing(self, event): """ Called when the peer initiates the closing of the link. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_disconnected(self, event): """ Called when the socket is disconnected. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_sendable(self, event): """ Called when the sender link has credit and messages can therefore be transferred. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_accepted(self, event): """ Called when the remote peer accepts an outgoing message. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_rejected(self, event): """ Called when the remote peer rejects an outgoing message. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_released(self, event): """ Called when the remote peer releases an outgoing message. Note that this may be in response to either the RELEASE or MODIFIED state as defined by the AMQP specification. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_settled(self, event): """ Called when the remote peer has settled the outgoing message. This is the point at which it should never be retransmitted. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_message(self, event): """ Called when a message is received. The message itself can be obtained as a property on the event. For the purpose of referring to this message in further actions (e.g. if explicitly accepting it, the ``delivery`` should be used, also obtainable via a property on the event. :param event: The underlying event object. Use this to obtain further information on the event. In particular, the message itself may be obtained by accessing ``event.message``. :type event: :class:`proton.Event` """ pass class TransactionHandler(object): """ The interface for transaction handlers - ie objects that want to be notified of state changes related to a transaction. """ def on_transaction_declared(self, event): """ Called when a local transaction is declared. :param event: The underlying event object. Use this to obtain further information on the event. In particular, the :class:`proton.reactor.Transaction` object may be obtained by accessing ``event.transaction``. :type event: :class:`proton.Event` """ pass def on_transaction_committed(self, event): """ Called when a local transaction is discharged successfully (committed). :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_transaction_aborted(self, event): """ Called when a local transaction is discharged unsuccessfully (aborted). :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_transaction_declare_failed(self, event): """ Called when a local transaction declare fails. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass def on_transaction_commit_failed(self, event): """ Called when the commit of a local transaction fails. :param event: The underlying event object. Use this to obtain further information on the event. :type event: :class:`proton.Event` """ pass class TransactionalClientHandler(MessagingHandler, TransactionHandler): """ An extension to the MessagingHandler for applications using transactions. This handler provides all of the callbacks found in :class:`MessagingHandler` and :class:`TransactionHandler`, and provides a convenience method :meth:`accept` for performing a transactional acceptance of received messages. :param prefetch: Initial flow credit for receiving messages, defaults to 10. :type prefetch: ``int`` :param auto_accept: If ``True``, accept all messages (default). Otherwise messages must be individually accepted or rejected. :type auto_accept: ``bool`` :param auto_settle: If ``True``, settle all messages (default). Otherwise messages must be explicitly settled. :type auto_settle: ``bool`` :param peer_close_is_error: If ``True``, a peer endpoint closing will be treated as an error with an error callback. Otherwise (default), the normal callbacks for the closing will occur. :type peer_close_is_error: ``bool`` """ def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False): super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error) def accept(self, delivery, transaction=None): """ A convenience method for accepting a received message as part of a transaction. If no transaction object is supplied, a regular non-transactional acceptance will be performed. :param delivery: Delivery tracking object for received message. :type delivery: :class:`proton.Delivery` :param transaction: Transaction tracking object which is required if the message is being accepted under the transaction. If ``None`` (default), then a normal non-transactional accept occurs. :type transaction: :class:`proton.reactor.Transaction` """ if transaction: transaction.accept(delivery) else: super(TransactionalClientHandler, self).accept(delivery) class FlowController(Handler): def __init__(self, window=1024): self._window = window self._drained = 0 def on_link_local_open(self, event): self._flow(event.link) def on_link_remote_open(self, event): self._flow(event.link) def on_link_flow(self, event): self._flow(event.link) def on_delivery(self, event): self._flow(event.link) def _flow(self, link): if link.is_receiver: self._drained += link.drained() if self._drained == 0: delta = self._window - link.credit link.flow(delta) class Handshaker(Handler): @staticmethod def on_connection_remote_open(event): conn = event.connection if conn.state & Endpoint.LOCAL_UNINIT: conn.open() @staticmethod def on_session_remote_open(event): ssn = event.session if ssn.state & Endpoint.LOCAL_UNINIT: ssn.open() @staticmethod def on_link_remote_open(event): link = event.link if link.state & Endpoint.LOCAL_UNINIT: link.source.copy(link.remote_source) link.target.copy(link.remote_target) link.open() @staticmethod def on_connection_remote_close(event): conn = event.connection if not conn.state & Endpoint.LOCAL_CLOSED: conn.close() @staticmethod def on_session_remote_close(event): ssn = event.session if not ssn.state & Endpoint.LOCAL_CLOSED: ssn.close() @staticmethod def on_link_remote_close(event): link = event.link if not link.state & Endpoint.LOCAL_CLOSED: link.close() # Back compatibility definitions CFlowController = FlowController CHandshaker = Handshaker class PythonIO: def __init__(self): self.selectables = [] self.delegate = IOHandler() def on_unhandled(self, method, event): event.dispatch(self.delegate) def on_selectable_init(self, event): self.selectables.append(event.context) def on_selectable_updated(self, event): pass def on_selectable_final(self, event): sel = event.context if sel.is_terminal: self.selectables.remove(sel) sel.close() def on_reactor_quiesced(self, event): reactor = event.reactor # check if we are still quiesced, other handlers of # on_reactor_quiesced could have produced events to process if not reactor.quiesced: return reading = [] writing = [] deadline = None for sel in self.selectables: if sel.reading: reading.append(sel) if sel.writing: writing.append(sel) if sel.deadline: if deadline is None: deadline = sel.deadline else: deadline = min(sel.deadline, deadline) if deadline is not None: timeout = deadline - time.time() else: timeout = reactor.timeout if timeout < 0: timeout = 0 timeout = min(timeout, reactor.timeout) readable, writable, _ = IO.select(reading, writing, [], timeout) now = reactor.mark() for s in readable: s.readable() for s in writable: s.writable() for s in self.selectables: if s.deadline and now > s.deadline: s.expired() reactor.yield_() # For C style IO handler need to implement Selector class IOHandler(Handler): def __init__(self): self._selector = IO.Selector() def on_selectable_init(self, event): s = event.selectable self._selector.add(s) s._reactor._selectables += 1 def on_selectable_updated(self, event): s = event.selectable self._selector.update(s) def on_selectable_final(self, event): s = event.selectable self._selector.remove(s) s._reactor._selectables -= 1 s.close() def on_reactor_quiesced(self, event): r = event.reactor if not r.quiesced: return d = r.timer_deadline readable, writable, expired = self._selector.select(r.timeout) now = r.mark() for s in readable: s.readable() for s in writable: s.writable() for s in expired: s.expired() r.yield_() def on_selectable_readable(self, event): s = event.selectable t = s._transport # If we're an acceptor we can't have a transport # and we don't want to do anything here in any case if not t: return capacity = t.capacity() if capacity > 0: try: b = s.recv(capacity) if len(b) > 0: n = t.push(b) else: # EOF handling self.on_selectable_error(event) except socket.error as e: # TODO: What's the error handling to be here? log.error("Couldn't recv: %r" % e) t.close_tail() # Always update as we may have gone to not reading or from # not writing to writing when processing the incoming bytes r = s._reactor self.update(t, s, r.now) def on_selectable_writable(self, event): s = event.selectable t = s._transport # If we're an acceptor we can't have a transport # and we don't want to do anything here in any case if not t: return pending = t.pending() if pending > 0: try: n = s.send(t.peek(pending)) t.pop(n) except socket.error as e: log.error("Couldn't send: %r" % e) # TODO: Error? or actually an exception t.close_head() newpending = t.pending() if newpending != pending: r = s._reactor self.update(t, s, r.now) def on_selectable_error(self, event): s = event.selectable t = s._transport t.close_head() t.close_tail() s.terminate() s._transport = None t._selectable = None s.update() def on_selectable_expired(self, event): s = event.selectable t = s._transport r = s._reactor self.update(t, s, r.now) def on_connection_local_open(self, event): c = event.connection if not c.state & Endpoint.REMOTE_UNINIT: return t = Transport() # It seems perverse, but the C code ignores bind errors too! # and this is required or you get errors because Connector() has already # bound the transport and connection! t.bind_nothrow(c) def on_connection_bound(self, event): c = event.connection t = event.transport reactor = c._reactor # link the new transport to its reactor: t._reactor = reactor if c._acceptor: # this connection was created by the acceptor. There is already a # socket assigned to this connection. Nothing needs to be done. return url = c.url or Url(c.hostname) url.defaults() host = url.host port = int(url.port) if not c.user: user = url.username if user: c.user = user password = url.password if password: c.password = password addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) # Try first possible address log.debug("Connect trying first transport address: %s", addrs[0]) sock = IO.connect(addrs[0]) # At this point we need to arrange to be called back when the socket is writable ConnectSelectable(sock, reactor, addrs[1:], t, self) # TODO: Don't understand why we need this now - how can we get PN_TRANSPORT until the connection succeeds? t._selectable = None @staticmethod def update(transport, selectable, now): try: capacity = transport.capacity() selectable.reading = capacity > 0 except: if transport.closed: selectable.terminate() selectable._transport = None transport._selectable = None try: pending = transport.pending() selectable.writing = pending > 0 except: if transport.closed: selectable.terminate() selectable._transport = None transport._selectable = None selectable.deadline = transport.tick(now) selectable.update() def on_transport(self, event): t = event.transport r = t._reactor s = t._selectable if s and not s.is_terminal: self.update(t, s, r.now) def on_transport_closed(self, event): t = event.transport r = t._reactor s = t._selectable if s and not s.is_terminal: s.terminate() s._transport = None t._selectable = None r.update(s) t.unbind() class ConnectSelectable(Selectable): def __init__(self, sock, reactor, addrs, transport, iohandler): super(ConnectSelectable, self).__init__(sock, reactor) self.writing = True self._addrs = addrs self._transport = transport self._iohandler = iohandler transport._connect_selectable = self def readable(self): pass def writable(self): e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) t = self._transport t._connect_selectable = None # Always cleanup this ConnectSelectable: either we failed or created a new one # Do it first to ensure the socket gets deregistered before being registered again # in the case of connecting self.terminate() self._transport = None self.update() if e == 0: log.debug("Connection succeeded") # Disassociate from the socket (which will be passed on) self.release() s = self._reactor.selectable(delegate=self._delegate) s._transport = t t._selectable = s self._iohandler.update(t, s, t._reactor.now) return elif e == errno.ECONNREFUSED: if len(self._addrs) > 0: log.debug("Connection refused: trying next transport address: %s", self._addrs[0]) sock = IO.connect(self._addrs[0]) # New ConnectSelectable for the new socket with rest of addresses ConnectSelectable(sock, self._reactor, self._addrs[1:], t, self._iohandler) return else: log.debug("Connection refused, but tried all transport addresses") t.condition = Condition("proton.pythonio", "Connection refused to all addresses") else: log.error("Couldn't connect: %s", e) t.condition = Condition("proton.pythonio", "Connection error: %s" % e) t.close_tail() t.close_head()