1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import logging, os, Queue, socket, time, types
20 from heapq import heappush, heappop, nsmallest
21 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
22 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
23 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
24 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
25 from select import select
26 from proton.handlers import OutgoingMessageHandler
27 from proton import unicode2utf8, utf82unicode
28
29 import traceback
30 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
31 from wrapper import Wrapper, PYCTX
32 from cproton import *
33
34 -class Task(Wrapper):
35
36 @staticmethod
38 if impl is None:
39 return None
40 else:
41 return Task(impl)
42
45
48
50
53
55 pn_acceptor_close(self._impl)
56
58
59 @staticmethod
61 if impl is None:
62 return None
63 else:
64 record = pn_reactor_attachments(impl)
65 attrs = pn_void2py(pn_record_get(record, PYCTX))
66 if attrs and 'subclass' in attrs:
67 return attrs['subclass'](impl=impl)
68 else:
69 return Reactor(impl=impl)
70
71 - def __init__(self, *handlers, **kwargs):
75
78
80 self.errors.append(info)
81 self.yield_()
82
84 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
85
87 impl = _chandler(handler, self.on_error)
88 pn_reactor_set_global_handler(self._impl, impl)
89 pn_decref(impl)
90
91 global_handler = property(_get_global, _set_global)
92
94 return millis2timeout(pn_reactor_get_timeout(self._impl))
95
97 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
98
99 timeout = property(_get_timeout, _set_timeout)
100
102 pn_reactor_yield(self._impl)
103
105 pn_reactor_mark(self._impl)
106
108 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
109
111 impl = _chandler(handler, self.on_error)
112 pn_reactor_set_handler(self._impl, impl)
113 pn_decref(impl)
114
115 handler = property(_get_handler, _set_handler)
116
122
124 n = pn_reactor_wakeup(self._impl)
125 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
126
128 pn_reactor_start(self._impl)
129
130 @property
132 return pn_reactor_quiesced(self._impl)
133
135 result = pn_reactor_process(self._impl)
136 if self.errors:
137 for exc, value, tb in self.errors[:-1]:
138 traceback.print_exception(exc, value, tb)
139 exc, value, tb = self.errors[-1]
140 raise exc, value, tb
141 return result
142
144 pn_reactor_stop(self._impl)
145
147 impl = _chandler(task, self.on_error)
148 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
149 pn_decref(impl)
150 return task
151
152 - def acceptor(self, host, port, handler=None):
153 impl = _chandler(handler, self.on_error)
154 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
155 pn_decref(impl)
156 if aimpl:
157 return Acceptor(aimpl)
158 else:
159 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
160
162 impl = _chandler(handler, self.on_error)
163 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
164 pn_decref(impl)
165 return result
166
168 impl = _chandler(handler, self.on_error)
169 result = Selectable.wrap(pn_reactor_selectable(self._impl))
170 if impl:
171 record = pn_selectable_attachments(result._impl)
172 pn_record_set_handler(record, impl)
173 pn_decref(impl)
174 return result
175
177 pn_reactor_update(self._impl, sel._impl)
178
180 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
181
182 from proton import wrappers as _wrappers
183 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
184 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
188 """
189 Can be added to a reactor to allow events to be triggered by an
190 external thread but handled on the event thread associated with
191 the reactor. An instance of this class can be passed to the
192 Reactor.selectable() method of the reactor in order to activate
193 it. The close() method should be called when it is no longer
194 needed, to allow the event loop to end if needed.
195 """
197 self.queue = Queue.Queue()
198 self.pipe = os.pipe()
199 self._closed = False
200
202 """
203 Request that the given event be dispatched on the event thread
204 of the reactor to which this EventInjector was added.
205 """
206 self.queue.put(event)
207 os.write(self.pipe[1], "!")
208
210 """
211 Request that this EventInjector be closed. Existing events
212 will be dispctahed on the reactors event dispactch thread,
213 then this will be removed from the set of interest.
214 """
215 self._closed = True
216 os.write(self.pipe[1], "!")
217
220
226
236
239 """
240 Application defined event, which can optionally be associated with
241 an engine object and or an arbitrary subject
242 """
243 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
256
260
262 """
263 Class to track state of an AMQP 1.0 transaction.
264 """
265 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
266 self.txn_ctrl = txn_ctrl
267 self.handler = handler
268 self.id = None
269 self._declare = None
270 self._discharge = None
271 self.failed = False
272 self._pending = []
273 self.settle_before_discharge = settle_before_discharge
274 self.declare()
275
278
281
283 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
284
288
293
294 - def send(self, sender, msg, tag=None):
299
306
307 - def update(self, delivery, state=None):
311
317
320
343
345 """
346 Abstract interface for link configuration options
347 """
349 """
350 Subclasses will implement any configuration logic in this
351 method
352 """
353 pass
354 - def test(self, link):
355 """
356 Subclasses can override this to selectively apply an option
357 e.g. based on some link criteria
358 """
359 return True
360
364
369
371 - def apply(self, sender): pass
373
375 - def apply(self, receiver): pass
377
392
395 self.filter_set = filter_set
396
397 - def apply(self, receiver):
399
401 """
402 Configures a link with a message selector filter
403 """
404 - def __init__(self, value, name='selector'):
406
407 -class Move(ReceiverOption):
408 - def apply(self, receiver):
410
411 -class Copy(ReceiverOption):
412 - def apply(self, receiver):
414
422
427
434
437 self._default_session = None
438
440 if not self._default_session:
441 self._default_session = _create_session(connection)
442 self._default_session.context = self
443 return self._default_session
444
448
450 """
451 Internal handler that triggers the necessary socket connect for an
452 opened connection.
453 """
456
458 if not self._override(event):
459 event.dispatch(self.base)
460
462 conn = event.connection
463 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
464
466 """
467 Internal handler that triggers the necessary socket connect for an
468 opened connection.
469 """
471 self.connection = connection
472 self.address = None
473 self.heartbeat = None
474 self.reconnect = None
475 self.ssl_domain = None
476
496
499
505
508
523
526
529
531 """
532 A reconnect strategy involving an increasing delay between
533 retries, up to a maximum or 10 seconds.
534 """
537
540
548
551 self.values = [Url(v) for v in values]
552 self.i = iter(self.values)
553
556
558 try:
559 return self.i.next()
560 except StopIteration:
561 self.i = iter(self.values)
562 return self.i.next()
563
576
579 """A representation of the AMQP concept of a 'container', which
580 lossely speaking is something that establishes links to or from
581 another container, over which messages are transfered. This is
582 an extension to the Reactor class that adds convenience methods
583 for creating connections and sender- or receiver- links.
584 """
585 - def __init__(self, *handlers, **kwargs):
596
597 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
598 """
599 Initiates the establishment of an AMQP connection.
600 """
601 conn = self.connection(handler)
602 conn.container = self.container_id or str(generate_uuid())
603
604 connector = Connector(conn)
605 conn._overrides = connector
606 if url: connector.address = Urls([url])
607 elif urls: connector.address = Urls(urls)
608 elif address: connector.address = address
609 else: raise ValueError("One of url, urls or address required")
610 if heartbeat:
611 connector.heartbeat = heartbeat
612 if reconnect:
613 connector.reconnect = reconnect
614 elif reconnect is None:
615 connector.reconnect = Backoff()
616 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
617 conn._session_policy = SessionPerConnection()
618 conn.open()
619 return conn
620
621 - def _get_id(self, container, remote, local):
622 if local and remote: "%s-%s-%s" % (container, remote, local)
623 elif local: return "%s-%s" % (container, local)
624 elif remote: return "%s-%s" % (container, remote)
625 else: return "%s-%s" % (container, str(generate_uuid()))
626
639
640 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
661
662 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
683
685 if not _get_attr(context, '_txn_ctrl'):
686 class InternalTransactionHandler(OutgoingMessageHandler):
687 def __init__(self):
688 super(InternalTransactionHandler, self).__init__(auto_settle=True)
689
690 def on_settled(self, event):
691 if hasattr(event.delivery, "transaction"):
692 event.transaction = event.delivery.transaction
693 event.delivery.transaction.handle_outcome(event)
694 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
695 context._txn_ctrl.target.type = Terminus.COORDINATOR
696 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
697 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
698
699 - def listen(self, url, ssl_domain=None):
700 """
701 Initiates a server socket, accepting incoming AMQP connections
702 on the interface and port specified.
703 """
704 url = Url(url)
705 ssl_config = ssl_domain
706 if not ssl_config and url.scheme == 'amqps':
707 ssl_config = self.ssl_domain
708 return self.acceptor(url.host, url.port)
709
714