#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import tornado.ioloop from proton.reactor import Container as BaseContainer from proton.handlers import IOHandler class TornadoLoopHandler: def __init__(self, loop=None, handler_base=None): self.loop = loop or tornado.ioloop.IOLoop.instance() self.io = handler_base or IOHandler() self.count = 0 def on_reactor_init(self, event): self.reactor = event.reactor def on_reactor_quiesced(self, event): event.reactor.yield_() def on_unhandled(self, name, event): event.dispatch(self.io) def _events(self, sel): events = self.loop.ERROR if sel.reading: events |= self.loop.READ if sel.writing: events |= self.loop.WRITE return events def _schedule(self, sel): if sel.deadline: self.loop.add_timeout(sel.deadline, lambda: self._expired(sel)) def _expired(self, sel): self.reactor.mark() sel.expired() self._process() def _process(self): self.reactor.process() if not self.reactor.quiesced: self.loop.add_callback(self._process) def _callback(self, sel, events): if self.loop.READ & events: sel.readable() if self.loop.WRITE & events: sel.writable() self._process() def on_selectable_init(self, event): sel = event.context if sel.fileno() >= 0: self.loop.add_handler(sel.fileno(), lambda fd, events: self._callback(sel, events), self._events(sel)) self._schedule(sel) self.count += 1 def on_selectable_updated(self, event): sel = event.context if sel.fileno() >= 0: self.loop.update_handler(sel.fileno(), self._events(sel)) self._schedule(sel) def on_selectable_final(self, event): sel = event.context if sel.fileno() >= 0: self.loop.remove_handler(sel.fileno()) sel.close() self.count -= 1 if self.count == 0: self.loop.add_callback(self._stop) def _stop(self): # We could have received a new selectable since we sent the stop if self.count == 0: self.reactor.stop() self.loop.stop() class Container(object): def __init__(self, *handlers, **kwargs): self.tornado_loop = kwargs.get('loop', tornado.ioloop.IOLoop.instance()) kwargs['global_handler'] = TornadoLoopHandler(self.tornado_loop, kwargs.get('handler_base', None)) self.container = BaseContainer(*handlers, **kwargs) def initialise(self): self.container.start() self.container.process() def run(self): self.initialise() self.tornado_loop.start() def stop(self): self.container.stop() def touch(self): self._process() def _process(self): self.container.process() if not self.container.quiesced: self.tornado_loop.add_callback(self._process)