Package proton :: Module handlers
[frames] | no frames]

Source Code for Module proton.handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import heapq, logging, os, Queue, re, socket, time, types 
 20  from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url 
 21  from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout 
 22  from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException 
 23  from select import select 
24 25 26 -class OutgoingMessageHandler(Handler):
27 """ 28 A utility for simpler and more intuitive handling of delivery 29 events related to outgoing i.e. sent messages. 30 """
31 - def __init__(self, auto_settle=True, delegate=None):
32 self.auto_settle = auto_settle 33 self.delegate = delegate
34 38
39 - def on_delivery(self, event):
40 dlv = event.delivery 41 if dlv.link.is_sender and dlv.updated: 42 if dlv.remote_state == Delivery.ACCEPTED: 43 self.on_accepted(event) 44 elif dlv.remote_state == Delivery.REJECTED: 45 self.on_rejected(event) 46 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 47 self.on_released(event) 48 if dlv.settled: 49 self.on_settled(event) 50 if self.auto_settle: 51 dlv.settle()
52
53 - def on_sendable(self, event):
54 """ 55 Called when the sender link has credit and messages can 56 therefore be transferred. 57 """ 58 if self.delegate: 59 dispatch(self.delegate, 'on_sendable', event)
60
61 - def on_accepted(self, event):
62 """ 63 Called when the remote peer accepts an outgoing message. 64 """ 65 if self.delegate: 66 dispatch(self.delegate, 'on_accepted', event)
67
68 - def on_rejected(self, event):
69 """ 70 Called when the remote peer rejects an outgoing message. 71 """ 72 if self.delegate: 73 dispatch(self.delegate, 'on_rejected', event)
74
75 - def on_released(self, event):
76 """ 77 Called when the remote peer releases an outgoing message. Note 78 that this may be in response to either the RELEASE or MODIFIED 79 state as defined by the AMQP specification. 80 """ 81 if self.delegate: 82 dispatch(self.delegate, 'on_released', event)
83
84 - def on_settled(self, event):
85 """ 86 Called when the remote peer has settled the outgoing 87 message. This is the point at which it shouod never be 88 retransmitted. 89 """ 90 if self.delegate: 91 dispatch(self.delegate, 'on_settled', event)
92
93 -def recv_msg(delivery):
94 msg = Message() 95 msg.decode(delivery.link.recv(delivery.pending)) 96 delivery.link.advance() 97 return msg
98
99 -class Reject(ProtonException):
100 """ 101 An exception that indicate a message should be rejected 102 """ 103 pass
104
105 -class Release(ProtonException):
106 """ 107 An exception that indicate a message should be rejected 108 """ 109 pass
110
111 -class Acking(object):
112 - def accept(self, delivery):
113 """ 114 Accepts a received message. 115 """ 116 self.settle(delivery, Delivery.ACCEPTED)
117
118 - def reject(self, delivery):
119 """ 120 Rejects a received message that is considered invalid or 121 unprocessable. 122 """ 123 self.settle(delivery, Delivery.REJECTED)
124
125 - def release(self, delivery, delivered=True):
126 """ 127 Releases a received message, making it available at the source 128 for any (other) interested receiver. The ``delivered`` 129 parameter indicates whether this should be considered a 130 delivery attempt (and the delivery count updated) or not. 131 """ 132 if delivered: 133 self.settle(delivery, Delivery.MODIFIED) 134 else: 135 self.settle(delivery, Delivery.RELEASED)
136
137 - def settle(self, delivery, state=None):
141
142 -class IncomingMessageHandler(Handler, Acking):
143 """ 144 A utility for simpler and more intuitive handling of delivery 145 events related to incoming i.e. received messages. 146 """ 147
148 - def __init__(self, auto_accept=True, delegate=None):
149 self.delegate = delegate 150 self.auto_accept = auto_accept
151
152 - def on_delivery(self, event):
153 dlv = event.delivery 154 if not dlv.link.is_receiver: return 155 if dlv.readable and not dlv.partial: 156 event.message = recv_msg(dlv) 157 if event.link.state & Endpoint.LOCAL_CLOSED: 158 if self.auto_accept: 159 dlv.update(Delivery.RELEASED) 160 dlv.settle() 161 else: 162 try: 163 self.on_message(event) 164 if self.auto_accept: 165 dlv.update(Delivery.ACCEPTED) 166 dlv.settle() 167 except Reject: 168 dlv.update(Delivery.REJECTED) 169 dlv.settle() 170 except Release: 171 dlv.update(Delivery.MODIFIED) 172 dlv.settle() 173 elif dlv.updated and dlv.settled: 174 self.on_settled(event)
175
176 - def on_message(self, event):
177 """ 178 Called when a message is received. The message itself can be 179 obtained as a property on the event. For the purpose of 180 refering to this message in further actions (e.g. if 181 explicitly accepting it, the ``delivery`` should be used, also 182 obtainable via a property on the event. 183 """ 184 if self.delegate: 185 dispatch(self.delegate, 'on_message', event)
186
187 - def on_settled(self, event):
188 if self.delegate: 189 dispatch(self.delegate, 'on_settled', event)
190
191 -class EndpointStateHandler(Handler):
192 """ 193 A utility that exposes 'endpoint' events i.e. the open/close for 194 links, sessions and connections in a more intuitive manner. A 195 XXX_opened method will be called when both local and remote peers 196 have opened the link, session or connection. This can be used to 197 confirm a locally initiated action for example. A XXX_opening 198 method will be called when the remote peer has requested an open 199 that was not initiated locally. By default this will simply open 200 locally, which then triggers the XXX_opened call. The same applies 201 to close. 202 """ 203
204 - def __init__(self, peer_close_is_error=False, delegate=None):
205 self.delegate = delegate 206 self.peer_close_is_error = peer_close_is_error
207 208 @classmethod
209 - def is_local_open(cls, endpoint):
210 return endpoint.state & Endpoint.LOCAL_ACTIVE
211 212 @classmethod
213 - def is_local_uninitialised(cls, endpoint):
214 return endpoint.state & Endpoint.LOCAL_UNINIT
215 216 @classmethod
217 - def is_local_closed(cls, endpoint):
218 return endpoint.state & Endpoint.LOCAL_CLOSED
219 220 @classmethod
221 - def is_remote_open(cls, endpoint):
222 return endpoint.state & Endpoint.REMOTE_ACTIVE
223 224 @classmethod
225 - def is_remote_closed(cls, endpoint):
226 return endpoint.state & Endpoint.REMOTE_CLOSED
227 228 @classmethod
229 - def print_error(cls, endpoint, endpoint_type):
230 if endpoint.remote_condition: 231 logging.error(endpoint.remote_condition.description) 232 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 233 logging.error("%s closed by peer" % endpoint_type)
234 243
244 - def on_session_remote_close(self, event):
245 if event.session.remote_condition: 246 self.on_session_error(event) 247 elif self.is_local_closed(event.session): 248 self.on_session_closed(event) 249 else: 250 self.on_session_closing(event) 251 event.session.close()
252
253 - def on_connection_remote_close(self, event):
254 if event.connection.remote_condition: 255 self.on_connection_error(event) 256 elif self.is_local_closed(event.connection): 257 self.on_connection_closed(event) 258 else: 259 self.on_connection_closing(event) 260 event.connection.close()
261
262 - def on_connection_local_open(self, event):
263 if self.is_remote_open(event.connection): 264 self.on_connection_opened(event)
265
266 - def on_connection_remote_open(self, event):
267 if self.is_local_open(event.connection): 268 self.on_connection_opened(event) 269 elif self.is_local_uninitialised(event.connection): 270 self.on_connection_opening(event) 271 event.connection.open()
272
273 - def on_session_local_open(self, event):
274 if self.is_remote_open(event.session): 275 self.on_session_opened(event)
276
277 - def on_session_remote_open(self, event):
278 if self.is_local_open(event.session): 279 self.on_session_opened(event) 280 elif self.is_local_uninitialised(event.session): 281 self.on_session_opening(event) 282 event.session.open()
283 287 294
295 - def on_connection_opened(self, event):
296 if self.delegate: 297 dispatch(self.delegate, 'on_connection_opened', event)
298
299 - def on_session_opened(self, event):
300 if self.delegate: 301 dispatch(self.delegate, 'on_session_opened', event)
302 306
307 - def on_connection_opening(self, event):
308 if self.delegate: 309 dispatch(self.delegate, 'on_connection_opening', event)
310
311 - def on_session_opening(self, event):
312 if self.delegate: 313 dispatch(self.delegate, 'on_session_opening', event)
314 318
319 - def on_connection_error(self, event):
320 if self.delegate: 321 dispatch(self.delegate, 'on_connection_error', event) 322 else: 323 self.log_error(event.connection, "connection")
324
325 - def on_session_error(self, event):
326 if self.delegate: 327 dispatch(self.delegate, 'on_session_error', event) 328 else: 329 self.log_error(event.session, "session") 330 event.connection.close()
331 338
339 - def on_connection_closed(self, event):
340 if self.delegate: 341 dispatch(self.delegate, 'on_connection_closed', event)
342
343 - def on_session_closed(self, event):
344 if self.delegate: 345 dispatch(self.delegate, 'on_session_closed', event)
346 350
351 - def on_connection_closing(self, event):
352 if self.delegate: 353 dispatch(self.delegate, 'on_connection_closing', event) 354 elif self.peer_close_is_error: 355 self.on_connection_error(event)
356
357 - def on_session_closing(self, event):
358 if self.delegate: 359 dispatch(self.delegate, 'on_session_closing', event) 360 elif self.peer_close_is_error: 361 self.on_session_error(event)
362 368
369 - def on_transport_tail_closed(self, event):
370 self.on_transport_closed(event)
371
372 - def on_transport_closed(self, event):
373 if self.delegate: 374 dispatch(self.delegate, 'on_disconnected', event)
375
376 -class MessagingHandler(Handler, Acking):
377 """ 378 A general purpose handler that makes the proton-c events somewhat 379 simpler to deal with and/or avoids repetitive tasks for common use 380 cases. 381 """
382 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
383 self.handlers = [] 384 if prefetch: 385 self.handlers.append(CFlowController(prefetch)) 386 self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) 387 self.handlers.append(IncomingMessageHandler(auto_accept, self)) 388 self.handlers.append(OutgoingMessageHandler(auto_settle, self))
389
390 - def on_connection_error(self, event):
391 """ 392 Called when the peer closes the connection with an error condition. 393 """ 394 EndpointStateHandler.print_error(event.connection, "connection")
395
396 - def on_session_error(self, event):
397 """ 398 Called when the peer closes the session with an error condition. 399 """ 400 EndpointStateHandler.print_error(event.session, "session") 401 event.connection.close()
402 409
410 - def on_reactor_init(self, event):
411 """ 412 Called when the event loop - the reactor - starts. 413 """ 414 if hasattr(event.reactor, 'subclass'): 415 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 416 self.on_start(event)
417
418 - def on_start(self, event):
419 """ 420 Called when the event loop starts. (Just an alias for on_reactor_init) 421 """ 422 pass
423 - def on_connection_closed(self, event):
424 """ 425 Called when the connection is closed. 426 """ 427 pass
428 - def on_session_closed(self, event):
429 """ 430 Called when the session is closed. 431 """ 432 pass
438 - def on_connection_closing(self, event):
439 """ 440 Called when the peer initiates the closing of the connection. 441 """ 442 pass
443 - def on_session_closing(self, event):
444 """ 445 Called when the peer initiates the closing of the session. 446 """ 447 pass
453 - def on_disconnected(self, event):
454 """ 455 Called when the socket is disconnected. 456 """ 457 pass
458
459 - def on_sendable(self, event):
460 """ 461 Called when the sender link has credit and messages can 462 therefore be transferred. 463 """ 464 pass
465
466 - def on_accepted(self, event):
467 """ 468 Called when the remote peer accepts an outgoing message. 469 """ 470 pass
471
472 - def on_rejected(self, event):
473 """ 474 Called when the remote peer rejects an outgoing message. 475 """ 476 pass
477
478 - def on_released(self, event):
479 """ 480 Called when the remote peer releases an outgoing message. Note 481 that this may be in response to either the RELEASE or MODIFIED 482 state as defined by the AMQP specification. 483 """ 484 pass
485
486 - def on_settled(self, event):
487 """ 488 Called when the remote peer has settled the outgoing 489 message. This is the point at which it shouod never be 490 retransmitted. 491 """ 492 pass
493 - def on_message(self, event):
494 """ 495 Called when a message is received. The message itself can be 496 obtained as a property on the event. For the purpose of 497 refering to this message in further actions (e.g. if 498 explicitly accepting it, the ``delivery`` should be used, also 499 obtainable via a property on the event. 500 """ 501 pass
502
503 -class TransactionHandler(object):
504 """ 505 The interface for transaction handlers, i.e. objects that want to 506 be notified of state changes related to a transaction. 507 """
508 - def on_transaction_declared(self, event):
509 pass
510
511 - def on_transaction_committed(self, event):
512 pass
513
514 - def on_transaction_aborted(self, event):
515 pass
516
517 - def on_transaction_declare_failed(self, event):
518 pass
519
520 - def on_transaction_commit_failed(self, event):
521 pass
522
523 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
524 """ 525 An extension to the MessagingHandler for applications using 526 transactions. 527 """ 528
529 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
530 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
531
532 - def accept(self, delivery, transaction=None):
533 if transaction: 534 transaction.accept(delivery) 535 else: 536 super(TransactionalClientHandler, self).accept(delivery)
537 538 from proton import WrappedHandler 539 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
540 541 -class CFlowController(WrappedHandler):
542
543 - def __init__(self, window=1024):
544 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
545
546 -class CHandshaker(WrappedHandler):
547
548 - def __init__(self):
549 WrappedHandler.__init__(self, pn_handshaker)
550
551 -class IOHandler(WrappedHandler):
552
553 - def __init__(self):
554 WrappedHandler.__init__(self, pn_iohandler)
555
556 -class PythonIO:
557
558 - def __init__(self):
559 self.selectables = [] 560 self.delegate = IOHandler()
561
562 - def on_unhandled(self, method, event):
563 event.dispatch(self.delegate)
564
565 - def on_selectable_init(self, event):
566 self.selectables.append(event.context)
567
568 - def on_selectable_updated(self, event):
569 pass
570
571 - def on_selectable_final(self, event):
572 sel = event.context 573 if sel.is_terminal: 574 self.selectables.remove(sel) 575 sel.release()
576
577 - def on_reactor_quiesced(self, event):
578 reactor = event.reactor 579 # check if we are still quiesced, other handlers of 580 # on_reactor_quiesced could have produced events to process 581 if not reactor.quiesced: return 582 583 reading = [] 584 writing = [] 585 deadline = None 586 for sel in self.selectables: 587 if sel.reading: 588 reading.append(sel) 589 if sel.writing: 590 writing.append(sel) 591 if sel.deadline: 592 if deadline is None: 593 deadline = sel.deadline 594 else: 595 deadline = min(sel.deadline, deadline) 596 597 if deadline is not None: 598 timeout = deadline - time.time() 599 else: 600 timeout = reactor.timeout 601 if (timeout < 0): timeout = 0 602 timeout = min(timeout, reactor.timeout) 603 readable, writable, _ = select(reading, writing, [], timeout) 604 605 reactor.mark() 606 607 now = time.time() 608 609 for s in readable: 610 s.readable() 611 for s in writable: 612 s.writable() 613 for s in self.selectables: 614 if s.deadline and now > s.deadline: 615 s.expired() 616 617 reactor.yield_()
618