1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import heapq, logging, os, Queue, re, socket, time, types
20 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
21 from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
22 from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
23 from select import select
27 """
28 A utility for simpler and more intuitive handling of delivery
29 events related to outgoing i.e. sent messages.
30 """
31 - def __init__(self, auto_settle=True, delegate=None):
32 self.auto_settle = auto_settle
33 self.delegate = delegate
34
38
52
54 """
55 Called when the sender link has credit and messages can
56 therefore be transferred.
57 """
58 if self.delegate:
59 dispatch(self.delegate, 'on_sendable', event)
60
62 """
63 Called when the remote peer accepts an outgoing message.
64 """
65 if self.delegate:
66 dispatch(self.delegate, 'on_accepted', event)
67
69 """
70 Called when the remote peer rejects an outgoing message.
71 """
72 if self.delegate:
73 dispatch(self.delegate, 'on_rejected', event)
74
76 """
77 Called when the remote peer releases an outgoing message. Note
78 that this may be in response to either the RELEASE or MODIFIED
79 state as defined by the AMQP specification.
80 """
81 if self.delegate:
82 dispatch(self.delegate, 'on_released', event)
83
85 """
86 Called when the remote peer has settled the outgoing
87 message. This is the point at which it shouod never be
88 retransmitted.
89 """
90 if self.delegate:
91 dispatch(self.delegate, 'on_settled', event)
92
98
100 """
101 An exception that indicate a message should be rejected
102 """
103 pass
104
106 """
107 An exception that indicate a message should be rejected
108 """
109 pass
110
117
124
125 - def release(self, delivery, delivered=True):
126 """
127 Releases a received message, making it available at the source
128 for any (other) interested receiver. The ``delivered``
129 parameter indicates whether this should be considered a
130 delivery attempt (and the delivery count updated) or not.
131 """
132 if delivered:
133 self.settle(delivery, Delivery.MODIFIED)
134 else:
135 self.settle(delivery, Delivery.RELEASED)
136
137 - def settle(self, delivery, state=None):
141
143 """
144 A utility for simpler and more intuitive handling of delivery
145 events related to incoming i.e. received messages.
146 """
147
148 - def __init__(self, auto_accept=True, delegate=None):
149 self.delegate = delegate
150 self.auto_accept = auto_accept
151
175
177 """
178 Called when a message is received. The message itself can be
179 obtained as a property on the event. For the purpose of
180 refering to this message in further actions (e.g. if
181 explicitly accepting it, the ``delivery`` should be used, also
182 obtainable via a property on the event.
183 """
184 if self.delegate:
185 dispatch(self.delegate, 'on_message', event)
186
188 if self.delegate:
189 dispatch(self.delegate, 'on_settled', event)
190
192 """
193 A utility that exposes 'endpoint' events i.e. the open/close for
194 links, sessions and connections in a more intuitive manner. A
195 XXX_opened method will be called when both local and remote peers
196 have opened the link, session or connection. This can be used to
197 confirm a locally initiated action for example. A XXX_opening
198 method will be called when the remote peer has requested an open
199 that was not initiated locally. By default this will simply open
200 locally, which then triggers the XXX_opened call. The same applies
201 to close.
202 """
203
204 - def __init__(self, peer_close_is_error=False, delegate=None):
205 self.delegate = delegate
206 self.peer_close_is_error = peer_close_is_error
207
208 @classmethod
211
212 @classmethod
215
216 @classmethod
219
220 @classmethod
223
224 @classmethod
227
228 @classmethod
234
243
252
261
265
272
276
283
287
294
296 if self.delegate:
297 dispatch(self.delegate, 'on_connection_opened', event)
298
300 if self.delegate:
301 dispatch(self.delegate, 'on_session_opened', event)
302
304 if self.delegate:
305 dispatch(self.delegate, 'on_link_opened', event)
306
308 if self.delegate:
309 dispatch(self.delegate, 'on_connection_opening', event)
310
312 if self.delegate:
313 dispatch(self.delegate, 'on_session_opening', event)
314
316 if self.delegate:
317 dispatch(self.delegate, 'on_link_opening', event)
318
320 if self.delegate:
321 dispatch(self.delegate, 'on_connection_error', event)
322 else:
323 self.log_error(event.connection, "connection")
324
326 if self.delegate:
327 dispatch(self.delegate, 'on_session_error', event)
328 else:
329 self.log_error(event.session, "session")
330 event.connection.close()
331
333 if self.delegate:
334 dispatch(self.delegate, 'on_link_error', event)
335 else:
336 self.log_error(event.link, "link")
337 event.connection.close()
338
340 if self.delegate:
341 dispatch(self.delegate, 'on_connection_closed', event)
342
344 if self.delegate:
345 dispatch(self.delegate, 'on_session_closed', event)
346
348 if self.delegate:
349 dispatch(self.delegate, 'on_link_closed', event)
350
352 if self.delegate:
353 dispatch(self.delegate, 'on_connection_closing', event)
354 elif self.peer_close_is_error:
355 self.on_connection_error(event)
356
358 if self.delegate:
359 dispatch(self.delegate, 'on_session_closing', event)
360 elif self.peer_close_is_error:
361 self.on_session_error(event)
362
364 if self.delegate:
365 dispatch(self.delegate, 'on_link_closing', event)
366 elif self.peer_close_is_error:
367 self.on_link_error(event)
368
371
373 if self.delegate:
374 dispatch(self.delegate, 'on_disconnected', event)
375
377 """
378 A general purpose handler that makes the proton-c events somewhat
379 simpler to deal with and/or avoids repetitive tasks for common use
380 cases.
381 """
382 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
389
395
402
409
411 """
412 Called when the event loop - the reactor - starts.
413 """
414 if hasattr(event.reactor, 'subclass'):
415 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
416 self.on_start(event)
417
419 """
420 Called when the event loop starts. (Just an alias for on_reactor_init)
421 """
422 pass
424 """
425 Called when the connection is closed.
426 """
427 pass
429 """
430 Called when the session is closed.
431 """
432 pass
434 """
435 Called when the link is closed.
436 """
437 pass
439 """
440 Called when the peer initiates the closing of the connection.
441 """
442 pass
444 """
445 Called when the peer initiates the closing of the session.
446 """
447 pass
449 """
450 Called when the peer initiates the closing of the link.
451 """
452 pass
454 """
455 Called when the socket is disconnected.
456 """
457 pass
458
460 """
461 Called when the sender link has credit and messages can
462 therefore be transferred.
463 """
464 pass
465
467 """
468 Called when the remote peer accepts an outgoing message.
469 """
470 pass
471
473 """
474 Called when the remote peer rejects an outgoing message.
475 """
476 pass
477
479 """
480 Called when the remote peer releases an outgoing message. Note
481 that this may be in response to either the RELEASE or MODIFIED
482 state as defined by the AMQP specification.
483 """
484 pass
485
487 """
488 Called when the remote peer has settled the outgoing
489 message. This is the point at which it shouod never be
490 retransmitted.
491 """
492 pass
494 """
495 Called when a message is received. The message itself can be
496 obtained as a property on the event. For the purpose of
497 refering to this message in further actions (e.g. if
498 explicitly accepting it, the ``delivery`` should be used, also
499 obtainable via a property on the event.
500 """
501 pass
502
504 """
505 The interface for transaction handlers, i.e. objects that want to
506 be notified of state changes related to a transaction.
507 """
510
513
516
519
522
524 """
525 An extension to the MessagingHandler for applications using
526 transactions.
527 """
528
529 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
531
532 - def accept(self, delivery, transaction=None):
537
538 from proton import WrappedHandler
539 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
542
544 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
545
547
549 WrappedHandler.__init__(self, pn_handshaker)
550
552
554 WrappedHandler.__init__(self, pn_iohandler)
555
557
559 self.selectables = []
560 self.delegate = IOHandler()
561
564
566 self.selectables.append(event.context)
567
570
572 sel = event.context
573 if sel.is_terminal:
574 self.selectables.remove(sel)
575 sel.release()
576
618