Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import logging, os, Queue, socket, time, types 
 20  from heapq import heappush, heappop, nsmallest 
 21  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 22  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 23  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 24  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 25  from select import select 
 26  from proton.handlers import OutgoingMessageHandler 
 27  from proton import unicode2utf8, utf82unicode 
 28   
 29  import traceback 
 30  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 31  from wrapper import Wrapper, PYCTX 
 32  from cproton import * 
33 34 -class Task(Wrapper):
35 36 @staticmethod
37 - def wrap(impl):
38 if impl is None: 39 return None 40 else: 41 return Task(impl)
42
43 - def __init__(self, impl):
44 Wrapper.__init__(self, impl, pn_task_attachments)
45
46 - def _init(self):
47 pass
48
49 -class Acceptor(Wrapper):
50
51 - def __init__(self, impl):
52 Wrapper.__init__(self, impl)
53
54 - def close(self):
55 pn_acceptor_close(self._impl)
56
57 -class Reactor(Wrapper):
58 59 @staticmethod
60 - def wrap(impl):
61 if impl is None: 62 return None 63 else: 64 record = pn_reactor_attachments(impl) 65 attrs = pn_void2py(pn_record_get(record, PYCTX)) 66 if attrs and 'subclass' in attrs: 67 return attrs['subclass'](impl=impl) 68 else: 69 return Reactor(impl=impl)
70
71 - def __init__(self, *handlers, **kwargs):
72 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 73 for h in handlers: 74 self.handler.add(h)
75
76 - def _init(self):
77 self.errors = []
78
79 - def on_error(self, info):
80 self.errors.append(info) 81 self.yield_()
82
83 - def _get_global(self):
84 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
85
86 - def _set_global(self, handler):
87 impl = _chandler(handler, self.on_error) 88 pn_reactor_set_global_handler(self._impl, impl) 89 pn_decref(impl)
90 91 global_handler = property(_get_global, _set_global) 92
93 - def _get_timeout(self):
94 return millis2timeout(pn_reactor_get_timeout(self._impl))
95
96 - def _set_timeout(self, secs):
97 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
98 99 timeout = property(_get_timeout, _set_timeout) 100
101 - def yield_(self):
102 pn_reactor_yield(self._impl)
103
104 - def mark(self):
105 pn_reactor_mark(self._impl)
106
107 - def _get_handler(self):
108 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
109
110 - def _set_handler(self, handler):
111 impl = _chandler(handler, self.on_error) 112 pn_reactor_set_handler(self._impl, impl) 113 pn_decref(impl)
114 115 handler = property(_get_handler, _set_handler) 116
117 - def run(self):
118 self.timeout = 3.14159265359 119 self.start() 120 while self.process(): pass 121 self.stop()
122
123 - def wakeup(self):
124 n = pn_reactor_wakeup(self._impl) 125 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
126
127 - def start(self):
128 pn_reactor_start(self._impl)
129 130 @property
131 - def quiesced(self):
132 return pn_reactor_quiesced(self._impl)
133
134 - def process(self):
135 result = pn_reactor_process(self._impl) 136 if self.errors: 137 for exc, value, tb in self.errors[:-1]: 138 traceback.print_exception(exc, value, tb) 139 exc, value, tb = self.errors[-1] 140 raise exc, value, tb 141 return result
142
143 - def stop(self):
144 pn_reactor_stop(self._impl)
145
146 - def schedule(self, delay, task):
147 impl = _chandler(task, self.on_error) 148 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 149 pn_decref(impl) 150 return task
151
152 - def acceptor(self, host, port, handler=None):
153 impl = _chandler(handler, self.on_error) 154 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 155 pn_decref(impl) 156 if aimpl: 157 return Acceptor(aimpl) 158 else: 159 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
160
161 - def connection(self, handler=None):
162 impl = _chandler(handler, self.on_error) 163 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 164 pn_decref(impl) 165 return result
166
167 - def selectable(self, handler=None):
168 impl = _chandler(handler, self.on_error) 169 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 170 if impl: 171 record = pn_selectable_attachments(result._impl) 172 pn_record_set_handler(record, impl) 173 pn_decref(impl) 174 return result
175
176 - def update(self, sel):
177 pn_reactor_update(self._impl, sel._impl)
178
179 - def push_event(self, obj, etype):
180 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
181 182 from proton import wrappers as _wrappers 183 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 184 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
185 186 187 -class EventInjector(object):
188 """ 189 Can be added to a reactor to allow events to be triggered by an 190 external thread but handled on the event thread associated with 191 the reactor. An instance of this class can be passed to the 192 Reactor.selectable() method of the reactor in order to activate 193 it. The close() method should be called when it is no longer 194 needed, to allow the event loop to end if needed. 195 """
196 - def __init__(self):
197 self.queue = Queue.Queue() 198 self.pipe = os.pipe() 199 self._closed = False
200
201 - def trigger(self, event):
202 """ 203 Request that the given event be dispatched on the event thread 204 of the reactor to which this EventInjector was added. 205 """ 206 self.queue.put(event) 207 os.write(self.pipe[1], "!")
208
209 - def close(self):
210 """ 211 Request that this EventInjector be closed. Existing events 212 will be dispctahed on the reactors event dispactch thread, 213 then this will be removed from the set of interest. 214 """ 215 self._closed = True 216 os.write(self.pipe[1], "!")
217
218 - def fileno(self):
219 return self.pipe[0]
220
221 - def on_selectable_init(self, event):
222 sel = event.context 223 sel.fileno(self.fileno()) 224 sel.reading = True 225 event.reactor.update(sel)
226
227 - def on_selectable_readable(self, event):
228 os.read(self.pipe[0], 512) 229 while not self.queue.empty(): 230 requested = self.queue.get() 231 event.reactor.push_event(requested.context, requested.type) 232 if self._closed: 233 s = event.context 234 s.terminate() 235 event.reactor.update(s)
236
237 238 -class ApplicationEvent(EventBase):
239 """ 240 Application defined event, which can optionally be associated with 241 an engine object and or an arbitrary subject 242 """
243 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
244 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 245 self.connection = connection 246 self.session = session 247 self.link = link 248 self.delivery = delivery 249 if self.delivery: 250 self.link = self.delivery.link 251 if self.link: 252 self.session = self.link.session 253 if self.session: 254 self.connection = self.session.connection 255 self.subject = subject
256
257 - def __repr__(self):
258 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 259 return "%s(%s)" % (typename, ", ".join([str(o) for o in objects if o is not None]))
260
261 -class Transaction(object):
262 """ 263 Class to track state of an AMQP 1.0 transaction. 264 """
265 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
266 self.txn_ctrl = txn_ctrl 267 self.handler = handler 268 self.id = None 269 self._declare = None 270 self._discharge = None 271 self.failed = False 272 self._pending = [] 273 self.settle_before_discharge = settle_before_discharge 274 self.declare()
275
276 - def commit(self):
277 self.discharge(False)
278
279 - def abort(self):
280 self.discharge(True)
281
282 - def declare(self):
283 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
284
285 - def discharge(self, failed):
286 self.failed = failed 287 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
288
289 - def _send_ctrl(self, descriptor, value):
290 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 291 delivery.transaction = self 292 return delivery
293
294 - def send(self, sender, msg, tag=None):
295 dlv = sender.send(msg, tag=tag) 296 dlv.local.data = [self.id] 297 dlv.update(0x34) 298 return dlv
299
300 - def accept(self, delivery):
301 self.update(delivery, PN_ACCEPTED) 302 if self.settle_before_discharge: 303 delivery.settle() 304 else: 305 self._pending.append(delivery)
306
307 - def update(self, delivery, state=None):
308 if state: 309 delivery.local.data = [self.id, Described(ulong(state), [])] 310 delivery.update(0x34)
311
312 - def _release_pending(self):
313 for d in self._pending: 314 d.update(Delivery.RELEASED) 315 d.settle() 316 self._clear_pending()
317
318 - def _clear_pending(self):
319 self._pending = []
320
321 - def handle_outcome(self, event):
322 if event.delivery == self._declare: 323 if event.delivery.remote.data: 324 self.id = event.delivery.remote.data[0] 325 self.handler.on_transaction_declared(event) 326 elif event.delivery.remote_state == Delivery.REJECTED: 327 self.handler.on_transaction_declare_failed(event) 328 else: 329 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 330 self.handler.on_transaction_declare_failed(event) 331 elif event.delivery == self._discharge: 332 if event.delivery.remote_state == Delivery.REJECTED: 333 if not self.failed: 334 self.handler.on_transaction_commit_failed(event) 335 self._release_pending() # make this optional? 336 else: 337 if self.failed: 338 self.handler.on_transaction_aborted(event) 339 self._release_pending() 340 else: 341 self.handler.on_transaction_committed(event) 342 self._clear_pending()
343
344 -class LinkOption(object):
345 """ 346 Abstract interface for link configuration options 347 """
348 - def apply(self, link):
349 """ 350 Subclasses will implement any configuration logic in this 351 method 352 """ 353 pass
354 - def test(self, link):
355 """ 356 Subclasses can override this to selectively apply an option 357 e.g. based on some link criteria 358 """ 359 return True
360
361 -class AtMostOnce(LinkOption):
362 - def apply(self, link):
364
365 -class AtLeastOnce(LinkOption):
366 - def apply(self, link):
369
370 -class SenderOption(LinkOption):
371 - def apply(self, sender): pass
372 - def test(self, link): return link.is_sender
373
374 -class ReceiverOption(LinkOption):
375 - def apply(self, receiver): pass
376 - def test(self, link): return link.is_receiver
377
378 -class DynamicNodeProperties(LinkOption):
379 - def __init__(self, props={}):
380 self.properties = {} 381 for k in props: 382 if isinstance(k, symbol): 383 self.properties[k] = props[k] 384 else: 385 self.properties[symbol(k)] = props[k]
386
387 - def apply(self, link):
392
393 -class Filter(ReceiverOption):
394 - def __init__(self, filter_set={}):
395 self.filter_set = filter_set
396
397 - def apply(self, receiver):
398 receiver.source.filter.put_dict(self.filter_set)
399
400 -class Selector(Filter):
401 """ 402 Configures a link with a message selector filter 403 """
404 - def __init__(self, value, name='selector'):
405 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
406
407 -class Move(ReceiverOption):
408 - def apply(self, receiver):
410
411 -class Copy(ReceiverOption):
412 - def apply(self, receiver):
414 422
423 -def _create_session(connection, handler=None):
424 session = connection.session() 425 session.open() 426 return session
427
428 429 -def _get_attr(target, name):
430 if hasattr(target, name): 431 return getattr(target, name) 432 else: 433 return None
434
435 -class SessionPerConnection(object):
436 - def __init__(self):
437 self._default_session = None
438
439 - def session(self, connection):
440 if not self._default_session: 441 self._default_session = _create_session(connection) 442 self._default_session.context = self 443 return self._default_session
444
445 - def on_session_remote_close(self, event):
446 event.connection.close() 447 self._default_session = None
448
449 -class GlobalOverrides(object):
450 """ 451 Internal handler that triggers the necessary socket connect for an 452 opened connection. 453 """
454 - def __init__(self, base):
455 self.base = base
456
457 - def on_unhandled(self, name, event):
458 if not self._override(event): 459 event.dispatch(self.base)
460
461 - def _override(self, event):
462 conn = event.connection 463 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
464
465 -class Connector(Handler):
466 """ 467 Internal handler that triggers the necessary socket connect for an 468 opened connection. 469 """
470 - def __init__(self, connection):
471 self.connection = connection 472 self.address = None 473 self.heartbeat = None 474 self.reconnect = None 475 self.ssl_domain = None
476
477 - def _connect(self, connection):
478 url = self.address.next() 479 # IoHandler uses the hostname to determine where to try to connect to 480 connection.hostname = "%s:%i" % (url.host, url.port) 481 logging.info("connecting to %s..." % connection.hostname) 482 483 transport = Transport() 484 transport.bind(connection) 485 if self.heartbeat: 486 transport.idle_timeout = self.heartbeat 487 if url.scheme == 'amqps' and self.ssl_domain: 488 self.ssl = SSL(transport, self.ssl_domain) 489 self.ssl.peer_hostname = url.host 490 if url.username: 491 sasl = transport.sasl() 492 if url.username == 'anonymous': 493 sasl.mechanisms('ANONYMOUS') 494 else: 495 sasl.plain(url.username, url.password)
496
497 - def on_connection_local_open(self, event):
498 self._connect(event.connection)
499
500 - def on_connection_remote_open(self, event):
501 logging.info("connected to %s" % event.connection.hostname) 502 if self.reconnect: 503 self.reconnect.reset() 504 self.transport = None
505
506 - def on_transport_tail_closed(self, event):
507 self.on_transport_closed(event)
508
509 - def on_transport_closed(self, event):
510 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 511 if self.reconnect: 512 event.transport.unbind() 513 delay = self.reconnect.next() 514 if delay == 0: 515 logging.info("Disconnected, reconnecting...") 516 self._connect(self.connection) 517 else: 518 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 519 event.reactor.schedule(delay, self) 520 else: 521 logging.info("Disconnected") 522 self.connection = None
523
524 - def on_timer_task(self, event):
525 self._connect(self.connection)
526
527 - def on_connection_remote_close(self, event):
528 self.connection = None
529
530 -class Backoff(object):
531 """ 532 A reconnect strategy involving an increasing delay between 533 retries, up to a maximum or 10 seconds. 534 """
535 - def __init__(self):
536 self.delay = 0
537
538 - def reset(self):
539 self.delay = 0
540
541 - def next(self):
542 current = self.delay 543 if current == 0: 544 self.delay = 0.1 545 else: 546 self.delay = min(10, 2*current) 547 return current
548
549 -class Urls(object):
550 - def __init__(self, values):
551 self.values = [Url(v) for v in values] 552 self.i = iter(self.values)
553
554 - def __iter__(self):
555 return self
556
557 - def next(self):
558 try: 559 return self.i.next() 560 except StopIteration: 561 self.i = iter(self.values) 562 return self.i.next()
563
564 -class SSLConfig(object):
565 - def __init__(self):
568
569 - def set_credentials(self, cert_file, key_file, password):
570 self.client.set_credentials(cert_file, key_file, password) 571 self.server.set_credentials(cert_file, key_file, password)
572
573 - def set_trusted_ca_db(self, certificate_db):
574 self.client.set_trusted_ca_db(certificate_db) 575 self.server.set_trusted_ca_db(certificate_db)
576
577 578 -class Container(Reactor):
579 """A representation of the AMQP concept of a 'container', which 580 lossely speaking is something that establishes links to or from 581 another container, over which messages are transfered. This is 582 an extension to the Reactor class that adds convenience methods 583 for creating connections and sender- or receiver- links. 584 """
585 - def __init__(self, *handlers, **kwargs):
586 super(Container, self).__init__(*handlers, **kwargs) 587 if "impl" not in kwargs: 588 try: 589 self.ssl = SSLConfig() 590 except SSLUnavailable: 591 self.ssl = None 592 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 593 self.trigger = None 594 self.container_id = str(generate_uuid()) 595 Wrapper.__setattr__(self, 'subclass', self.__class__)
596
597 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
598 """ 599 Initiates the establishment of an AMQP connection. 600 """ 601 conn = self.connection(handler) 602 conn.container = self.container_id or str(generate_uuid()) 603 604 connector = Connector(conn) 605 conn._overrides = connector 606 if url: connector.address = Urls([url]) 607 elif urls: connector.address = Urls(urls) 608 elif address: connector.address = address 609 else: raise ValueError("One of url, urls or address required") 610 if heartbeat: 611 connector.heartbeat = heartbeat 612 if reconnect: 613 connector.reconnect = reconnect 614 elif reconnect is None: 615 connector.reconnect = Backoff() 616 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 617 conn._session_policy = SessionPerConnection() #todo: make configurable 618 conn.open() 619 return conn
620
621 - def _get_id(self, container, remote, local):
622 if local and remote: "%s-%s-%s" % (container, remote, local) 623 elif local: return "%s-%s" % (container, local) 624 elif remote: return "%s-%s" % (container, remote) 625 else: return "%s-%s" % (container, str(generate_uuid()))
626
627 - def _get_session(self, context):
628 if isinstance(context, Url): 629 return self._get_session(self.connect(url=context)) 630 elif isinstance(context, Session): 631 return context 632 elif isinstance(context, Connection): 633 if hasattr(context, '_session_policy'): 634 return context._session_policy.session(context) 635 else: 636 return _create_session(context) 637 else: 638 return context.session()
639
640 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
641 """ 642 Initiates the establishment of a link over which messages can be sent. 643 """ 644 if isinstance(context, basestring): 645 context = Url(context) 646 if isinstance(context, Url) and not target: 647 target = context.path 648 session = self._get_session(context) 649 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 650 if source: 651 snd.source.address = source 652 if target: 653 snd.target.address = target 654 if handler: 655 snd.handler = handler 656 if tags: 657 snd.tag_generator = tags 658 _apply_link_options(options, snd) 659 snd.open() 660 return snd
661
662 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
663 """ 664 Initiates the establishment of a link over which messages can be received (aka a subscription). 665 """ 666 if isinstance(context, basestring): 667 context = Url(context) 668 if isinstance(context, Url) and not source: 669 source = context.path 670 session = self._get_session(context) 671 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 672 if source: 673 rcv.source.address = source 674 if dynamic: 675 rcv.source.dynamic = True 676 if target: 677 rcv.target.address = target 678 if handler: 679 rcv.handler = handler 680 _apply_link_options(options, rcv) 681 rcv.open() 682 return rcv
683
684 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
685 if not _get_attr(context, '_txn_ctrl'): 686 class InternalTransactionHandler(OutgoingMessageHandler): 687 def __init__(self): 688 super(InternalTransactionHandler, self).__init__(auto_settle=True)
689 690 def on_settled(self, event): 691 if hasattr(event.delivery, "transaction"): 692 event.transaction = event.delivery.transaction 693 event.delivery.transaction.handle_outcome(event)
694 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 695 context._txn_ctrl.target.type = Terminus.COORDINATOR 696 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 697 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 698
699 - def listen(self, url, ssl_domain=None):
700 """ 701 Initiates a server socket, accepting incoming AMQP connections 702 on the interface and port specified. 703 """ 704 url = Url(url) 705 ssl_config = ssl_domain 706 if not ssl_config and url.scheme == 'amqps': 707 ssl_config = self.ssl_domain 708 return self.acceptor(url.host, url.port)
709
710 - def do_work(self, timeout=None):
711 if timeout: 712 self.timeout = timeout 713 return self.process()
714