Package proton
[frames] | no frames]

Source Code for Package proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32   
  33  from cproton import * 
  34  from wrapper import Wrapper 
  35   
  36  import weakref, socket, sys, threading 
  37  try: 
  38    import uuid 
39 40 - def generate_uuid():
41 return uuid.uuid4()
42 43 except ImportError: 44 """ 45 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 46 """ 47 import struct
48 - class uuid:
49 - class UUID:
50 - def __init__(self, hex=None, bytes=None):
51 if [hex, bytes].count(None) != 1: 52 raise TypeError("need one of hex or bytes") 53 if bytes is not None: 54 self.bytes = bytes 55 elif hex is not None: 56 fields=hex.split("-") 57 fields[4:5] = [fields[4][:4], fields[4][4:]] 58 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
59
60 - def __cmp__(self, other):
61 if isinstance(other, uuid.UUID): 62 return cmp(self.bytes, other.bytes) 63 else: 64 return -1
65
66 - def __str__(self):
67 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
68
69 - def __repr__(self):
70 return "UUID(%r)" % str(self)
71
72 - def __hash__(self):
73 return self.bytes.__hash__()
74 75 import os, random, time 76 rand = random.Random() 77 rand.seed((os.getpid(), time.time(), socket.gethostname()))
78 - def random_uuid():
79 bytes = [rand.randint(0, 255) for i in xrange(16)] 80 81 # From RFC4122, the version bits are set to 0100 82 bytes[7] &= 0x0F 83 bytes[7] |= 0x40 84 85 # From RFC4122, the top two bits of byte 8 get set to 01 86 bytes[8] &= 0x3F 87 bytes[8] |= 0x80 88 return "".join(map(chr, bytes))
89
90 - def uuid4():
91 return uuid.UUID(bytes=random_uuid())
92
93 - def generate_uuid():
94 return uuid4()
95 96 try: 97 bytes() 98 except NameError: 99 bytes = str 100 101 VERSION_MAJOR = PN_VERSION_MAJOR 102 VERSION_MINOR = PN_VERSION_MINOR 103 API_LANGUAGE = "C" 104 IMPLEMENTATION_LANGUAGE = "C"
105 106 -class Constant(object):
107
108 - def __init__(self, name):
109 self.name = name
110
111 - def __repr__(self):
112 return self.name
113
114 -class ProtonException(Exception):
115 """ 116 The root of the proton exception hierarchy. All proton exception 117 classes derive from this exception. 118 """ 119 pass
120
121 -class Timeout(ProtonException):
122 """ 123 A timeout exception indicates that a blocking operation has timed 124 out. 125 """ 126 pass
127
128 -class Interrupt(ProtonException):
129 """ 130 An interrupt exception indicaes that a blocking operation was interrupted. 131 """ 132 pass
133
134 -class MessengerException(ProtonException):
135 """ 136 The root of the messenger exception hierarchy. All exceptions 137 generated by the messenger class derive from this exception. 138 """ 139 pass
140
141 -class MessageException(ProtonException):
142 """ 143 The MessageException class is the root of the message exception 144 hierarhcy. All exceptions generated by the Message class derive from 145 this exception. 146 """ 147 pass
148 149 EXCEPTIONS = { 150 PN_TIMEOUT: Timeout, 151 PN_INTR: Interrupt 152 } 153 154 PENDING = Constant("PENDING") 155 ACCEPTED = Constant("ACCEPTED") 156 REJECTED = Constant("REJECTED") 157 RELEASED = Constant("RELEASED") 158 MODIFIED = Constant("MODIFIED") 159 ABORTED = Constant("ABORTED") 160 SETTLED = Constant("SETTLED") 161 162 STATUSES = { 163 PN_STATUS_ABORTED: ABORTED, 164 PN_STATUS_ACCEPTED: ACCEPTED, 165 PN_STATUS_REJECTED: REJECTED, 166 PN_STATUS_RELEASED: RELEASED, 167 PN_STATUS_MODIFIED: MODIFIED, 168 PN_STATUS_PENDING: PENDING, 169 PN_STATUS_SETTLED: SETTLED, 170 PN_STATUS_UNKNOWN: None 171 } 172 173 AUTOMATIC = Constant("AUTOMATIC") 174 MANUAL = Constant("MANUAL")
175 176 -class Messenger(object):
177 """ 178 The L{Messenger} class defines a high level interface for sending 179 and receiving L{Messages<Message>}. Every L{Messenger} contains a 180 single logical queue of incoming messages and a single logical queue 181 of outgoing messages. These messages in these queues may be destined 182 for, or originate from, a variety of addresses. 183 184 The messenger interface is single-threaded. All methods 185 except one (L{interrupt}) are intended to be used from within 186 the messenger thread. 187 188 189 Address Syntax 190 ============== 191 192 An address has the following form:: 193 194 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 195 196 Where domain can be one of:: 197 198 host | host:port | ip | ip:port | name 199 200 The following are valid examples of addresses: 201 202 - example.org 203 - example.org:1234 204 - amqp://example.org 205 - amqps://example.org 206 - example.org/incoming 207 - amqps://example.org/outgoing 208 - amqps://fred:trustno1@example.org 209 - 127.0.0.1:1234 210 - amqps://127.0.0.1:1234 211 212 Sending & Receiving Messages 213 ============================ 214 215 The L{Messenger} class works in conjuction with the L{Message} class. The 216 L{Message} class is a mutable holder of message content. 217 218 The L{put} method copies its L{Message} to the outgoing queue, and may 219 send queued messages if it can do so without blocking. The L{send} 220 method blocks until it has sent the requested number of messages, 221 or until a timeout interrupts the attempt. 222 223 224 >>> message = Message() 225 >>> for i in range(3): 226 ... message.address = "amqp://host/queue" 227 ... message.subject = "Hello World %i" % i 228 ... messenger.put(message) 229 >>> messenger.send() 230 231 Similarly, the L{recv} method receives messages into the incoming 232 queue, and may block as it attempts to receive the requested number 233 of messages, or until timeout is reached. It may receive fewer 234 than the requested number. The L{get} method pops the 235 eldest L{Message} off the incoming queue and copies it into the L{Message} 236 object that you supply. It will not block. 237 238 239 >>> message = Message() 240 >>> messenger.recv(10): 241 >>> while messenger.incoming > 0: 242 ... messenger.get(message) 243 ... print message.subject 244 Hello World 0 245 Hello World 1 246 Hello World 2 247 248 The blocking flag allows you to turn off blocking behavior entirely, 249 in which case L{send} and L{recv} will do whatever they can without 250 blocking, and then return. You can then look at the number 251 of incoming and outgoing messages to see how much outstanding work 252 still remains. 253 """ 254
255 - def __init__(self, name=None):
256 """ 257 Construct a new L{Messenger} with the given name. The name has 258 global scope. If a NULL name is supplied, a UUID based name will 259 be chosen. 260 261 @type name: string 262 @param name: the name of the messenger or None 263 264 """ 265 self._mng = pn_messenger(name) 266 self._selectables = {}
267
268 - def __del__(self):
269 """ 270 Destroy the L{Messenger}. This will close all connections that 271 are managed by the L{Messenger}. Call the L{stop} method before 272 destroying the L{Messenger}. 273 """ 274 if hasattr(self, "_mng"): 275 pn_messenger_free(self._mng) 276 del self._mng
277
278 - def _check(self, err):
279 if err < 0: 280 if (err == PN_INPROGRESS): 281 return 282 exc = EXCEPTIONS.get(err, MessengerException) 283 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 284 else: 285 return err
286 287 @property
288 - def name(self):
289 """ 290 The name of the L{Messenger}. 291 """ 292 return pn_messenger_name(self._mng)
293
294 - def _get_certificate(self):
295 return pn_messenger_get_certificate(self._mng)
296
297 - def _set_certificate(self, value):
298 self._check(pn_messenger_set_certificate(self._mng, value))
299 300 certificate = property(_get_certificate, _set_certificate, 301 doc=""" 302 Path to a certificate file for the L{Messenger}. This certificate is 303 used when the L{Messenger} accepts or establishes SSL/TLS connections. 304 This property must be specified for the L{Messenger} to accept 305 incoming SSL/TLS connections and to establish client authenticated 306 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 307 connections do not require this property. 308 """) 309
310 - def _get_private_key(self):
311 return pn_messenger_get_private_key(self._mng)
312
313 - def _set_private_key(self, value):
314 self._check(pn_messenger_set_private_key(self._mng, value))
315 316 private_key = property(_get_private_key, _set_private_key, 317 doc=""" 318 Path to a private key file for the L{Messenger's<Messenger>} 319 certificate. This property must be specified for the L{Messenger} to 320 accept incoming SSL/TLS connections and to establish client 321 authenticated outgoing SSL/TLS connection. Non client authenticated 322 SSL/TLS connections do not require this property. 323 """) 324
325 - def _get_password(self):
326 return pn_messenger_get_password(self._mng)
327
328 - def _set_password(self, value):
329 self._check(pn_messenger_set_password(self._mng, value))
330 331 password = property(_get_password, _set_password, 332 doc=""" 333 This property contains the password for the L{Messenger.private_key} 334 file, or None if the file is not encrypted. 335 """) 336
337 - def _get_trusted_certificates(self):
338 return pn_messenger_get_trusted_certificates(self._mng)
339
340 - def _set_trusted_certificates(self, value):
341 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
342 343 trusted_certificates = property(_get_trusted_certificates, 344 _set_trusted_certificates, 345 doc=""" 346 A path to a database of trusted certificates for use in verifying the 347 peer on an SSL/TLS connection. If this property is None, then the peer 348 will not be verified. 349 """) 350
351 - def _get_timeout(self):
352 t = pn_messenger_get_timeout(self._mng) 353 if t == -1: 354 return None 355 else: 356 return millis2secs(t)
357
358 - def _set_timeout(self, value):
359 if value is None: 360 t = -1 361 else: 362 t = secs2millis(value) 363 self._check(pn_messenger_set_timeout(self._mng, t))
364 365 timeout = property(_get_timeout, _set_timeout, 366 doc=""" 367 The timeout property contains the default timeout for blocking 368 operations performed by the L{Messenger}. 369 """) 370
371 - def _is_blocking(self):
372 return pn_messenger_is_blocking(self._mng)
373
374 - def _set_blocking(self, b):
375 self._check(pn_messenger_set_blocking(self._mng, b))
376 377 blocking = property(_is_blocking, _set_blocking, 378 doc=""" 379 Enable or disable blocking behavior during L{Message} sending 380 and receiving. This affects every blocking call, with the 381 exception of L{work}. Currently, the affected calls are 382 L{send}, L{recv}, and L{stop}. 383 """) 384
385 - def _is_passive(self):
386 return pn_messenger_is_passive(self._mng)
387
388 - def _set_passive(self, b):
389 self._check(pn_messenger_set_passive(self._mng, b))
390 391 passive = property(_is_passive, _set_passive, 392 doc=""" 393 When passive is set to true, Messenger will not attempt to perform I/O 394 internally. In this mode it is necessary to use the selectables API to 395 drive any I/O needed to perform requested actions. In this mode 396 Messenger will never block. 397 """) 398
399 - def _get_incoming_window(self):
400 return pn_messenger_get_incoming_window(self._mng)
401
402 - def _set_incoming_window(self, window):
403 self._check(pn_messenger_set_incoming_window(self._mng, window))
404 405 incoming_window = property(_get_incoming_window, _set_incoming_window, 406 doc=""" 407 The incoming tracking window for the messenger. The messenger will 408 track the remote status of this many incoming deliveries after they 409 have been accepted or rejected. Defaults to zero. 410 411 L{Messages<Message>} enter this window only when you take them into your application 412 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 413 without explicitly accepting or rejecting the oldest message, then the 414 message that passes beyond the edge of the incoming window will be assigned 415 the default disposition of its link. 416 """) 417
418 - def _get_outgoing_window(self):
419 return pn_messenger_get_outgoing_window(self._mng)
420
421 - def _set_outgoing_window(self, window):
422 self._check(pn_messenger_set_outgoing_window(self._mng, window))
423 424 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 425 doc=""" 426 The outgoing tracking window for the messenger. The messenger will 427 track the remote status of this many outgoing deliveries after calling 428 send. Defaults to zero. 429 430 A L{Message} enters this window when you call the put() method with the 431 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 432 times, status information will no longer be available for the 433 first message. 434 """) 435
436 - def start(self):
437 """ 438 Currently a no-op placeholder. 439 For future compatibility, do not L{send} or L{recv} messages 440 before starting the L{Messenger}. 441 """ 442 self._check(pn_messenger_start(self._mng))
443
444 - def stop(self):
445 """ 446 Transitions the L{Messenger} to an inactive state. An inactive 447 L{Messenger} will not send or receive messages from its internal 448 queues. A L{Messenger} should be stopped before being discarded to 449 ensure a clean shutdown handshake occurs on any internally managed 450 connections. 451 """ 452 self._check(pn_messenger_stop(self._mng))
453 454 @property
455 - def stopped(self):
456 """ 457 Returns true iff a L{Messenger} is in the stopped state. 458 This function does not block. 459 """ 460 return pn_messenger_stopped(self._mng)
461
462 - def subscribe(self, source):
463 """ 464 Subscribes the L{Messenger} to messages originating from the 465 specified source. The source is an address as specified in the 466 L{Messenger} introduction with the following addition. If the 467 domain portion of the address begins with the '~' character, the 468 L{Messenger} will interpret the domain as host/port, bind to it, 469 and listen for incoming messages. For example "~0.0.0.0", 470 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 471 local interface and listen for incoming messages with the last 472 variant only permitting incoming SSL connections. 473 474 @type source: string 475 @param source: the source of messages to subscribe to 476 """ 477 sub_impl = pn_messenger_subscribe(self._mng, source) 478 if not sub_impl: 479 self._check(pn_error_code(pn_messenger_error(self._mng))) 480 raise MessengerException("Cannot subscribe to %s"%source) 481 return Subscription(sub_impl)
482
483 - def put(self, message):
484 """ 485 Places the content contained in the message onto the outgoing 486 queue of the L{Messenger}. This method will never block, however 487 it will send any unblocked L{Messages<Message>} in the outgoing 488 queue immediately and leave any blocked L{Messages<Message>} 489 remaining in the outgoing queue. The L{send} call may be used to 490 block until the outgoing queue is empty. The L{outgoing} property 491 may be used to check the depth of the outgoing queue. 492 493 When the content in a given L{Message} object is copied to the outgoing 494 message queue, you may then modify or discard the L{Message} object 495 without having any impact on the content in the outgoing queue. 496 497 This method returns an outgoing tracker for the L{Message}. The tracker 498 can be used to determine the delivery status of the L{Message}. 499 500 @type message: Message 501 @param message: the message to place in the outgoing queue 502 @return: a tracker 503 """ 504 message._pre_encode() 505 self._check(pn_messenger_put(self._mng, message._msg)) 506 return pn_messenger_outgoing_tracker(self._mng)
507
508 - def status(self, tracker):
509 """ 510 Gets the last known remote state of the delivery associated with 511 the given tracker. 512 513 @type tracker: tracker 514 @param tracker: the tracker whose status is to be retrieved 515 516 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED 517 """ 518 disp = pn_messenger_status(self._mng, tracker); 519 return STATUSES.get(disp, disp)
520
521 - def buffered(self, tracker):
522 """ 523 Checks if the delivery associated with the given tracker is still 524 waiting to be sent. 525 526 @type tracker: tracker 527 @param tracker: the tracker whose status is to be retrieved 528 529 @return: true if delivery is still buffered 530 """ 531 return pn_messenger_buffered(self._mng, tracker);
532
533 - def settle(self, tracker=None):
534 """ 535 Frees a L{Messenger} from tracking the status associated with a given 536 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 537 to the most recent will be settled. 538 """ 539 if tracker is None: 540 tracker = pn_messenger_outgoing_tracker(self._mng) 541 flags = PN_CUMULATIVE 542 else: 543 flags = 0 544 self._check(pn_messenger_settle(self._mng, tracker, flags))
545
546 - def send(self, n=-1):
547 """ 548 This call will block until the indicated number of L{messages<Message>} 549 have been sent, or until the operation times out. If n is -1 this call will 550 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 551 this call will send whatever it can without blocking. 552 """ 553 self._check(pn_messenger_send(self._mng, n))
554
555 - def recv(self, n=None):
556 """ 557 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 558 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 559 can buffer internally. If the L{Messenger} is in blocking mode, this 560 call will block until at least one L{Message} is available in the 561 incoming queue. 562 """ 563 if n is None: 564 n = -1 565 self._check(pn_messenger_recv(self._mng, n))
566
567 - def work(self, timeout=None):
568 """ 569 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 570 This will block for the indicated timeout. 571 This method may also do I/O work other than sending and receiving 572 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 573 has been called. 574 """ 575 if timeout is None: 576 t = -1 577 else: 578 t = secs2millis(timeout) 579 err = pn_messenger_work(self._mng, t) 580 if (err == PN_TIMEOUT): 581 return False 582 else: 583 self._check(err) 584 return True
585 586 @property
587 - def receiving(self):
588 return pn_messenger_receiving(self._mng)
589
590 - def interrupt(self):
591 """ 592 The L{Messenger} interface is single-threaded. 593 This is the only L{Messenger} function intended to be called 594 from outside of the L{Messenger} thread. 595 Call this from a non-messenger thread to interrupt 596 a L{Messenger} that is blocking. 597 This will cause any in-progress blocking call to throw 598 the L{Interrupt} exception. If there is no currently blocking 599 call, then the next blocking call will be affected, even if it 600 is within the same thread that interrupt was called from. 601 """ 602 self._check(pn_messenger_interrupt(self._mng))
603
604 - def get(self, message=None):
605 """ 606 Moves the message from the head of the incoming message queue into 607 the supplied message object. Any content in the message will be 608 overwritten. 609 610 A tracker for the incoming L{Message} is returned. The tracker can 611 later be used to communicate your acceptance or rejection of the 612 L{Message}. 613 614 If None is passed in for the L{Message} object, the L{Message} 615 popped from the head of the queue is discarded. 616 617 @type message: Message 618 @param message: the destination message object 619 @return: a tracker 620 """ 621 if message is None: 622 impl = None 623 else: 624 impl = message._msg 625 self._check(pn_messenger_get(self._mng, impl)) 626 if message is not None: 627 message._post_decode() 628 return pn_messenger_incoming_tracker(self._mng)
629
630 - def accept(self, tracker=None):
631 """ 632 Signal the sender that you have acted on the L{Message} 633 pointed to by the tracker. If no tracker is supplied, 634 then all messages that have been returned by the L{get} 635 method are accepted, except those that have already been 636 auto-settled by passing beyond your incoming window size. 637 638 @type tracker: tracker 639 @param tracker: a tracker as returned by get 640 """ 641 if tracker is None: 642 tracker = pn_messenger_incoming_tracker(self._mng) 643 flags = PN_CUMULATIVE 644 else: 645 flags = 0 646 self._check(pn_messenger_accept(self._mng, tracker, flags))
647
648 - def reject(self, tracker=None):
649 """ 650 Rejects the L{Message} indicated by the tracker. If no tracker 651 is supplied, all messages that have been returned by the L{get} 652 method are rejected, except those that have already been auto-settled 653 by passing beyond your outgoing window size. 654 655 @type tracker: tracker 656 @param tracker: a tracker as returned by get 657 """ 658 if tracker is None: 659 tracker = pn_messenger_incoming_tracker(self._mng) 660 flags = PN_CUMULATIVE 661 else: 662 flags = 0 663 self._check(pn_messenger_reject(self._mng, tracker, flags))
664 665 @property
666 - def outgoing(self):
667 """ 668 The outgoing queue depth. 669 """ 670 return pn_messenger_outgoing(self._mng)
671 672 @property
673 - def incoming(self):
674 """ 675 The incoming queue depth. 676 """ 677 return pn_messenger_incoming(self._mng)
678
679 - def route(self, pattern, address):
680 """ 681 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 682 683 The route procedure may be used to influence how a L{Messenger} will 684 internally treat a given address or class of addresses. Every call 685 to the route procedure will result in L{Messenger} appending a routing 686 rule to its internal routing table. 687 688 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 689 will match the address of this message against the set of routing 690 rules in order. The first rule to match will be triggered, and 691 instead of routing based on the address presented in the message, 692 the L{Messenger} will route based on the address supplied in the rule. 693 694 The pattern matching syntax supports two types of matches, a '%' 695 will match any character except a '/', and a '*' will match any 696 character including a '/'. 697 698 A routing address is specified as a normal AMQP address, however it 699 may additionally use substitution variables from the pattern match 700 that triggered the rule. 701 702 Any message sent to "foo" will be routed to "amqp://foo.com": 703 704 >>> messenger.route("foo", "amqp://foo.com"); 705 706 Any message sent to "foobar" will be routed to 707 "amqp://foo.com/bar": 708 709 >>> messenger.route("foobar", "amqp://foo.com/bar"); 710 711 Any message sent to bar/<path> will be routed to the corresponding 712 path within the amqp://bar.com domain: 713 714 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 715 716 Route all L{messages<Message>} over TLS: 717 718 >>> messenger.route("amqp:*", "amqps:$1") 719 720 Supply credentials for foo.com: 721 722 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 723 724 Supply credentials for all domains: 725 726 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 727 728 Route all addresses through a single proxy while preserving the 729 original destination: 730 731 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 732 733 Route any address through a single broker: 734 735 >>> messenger.route("*", "amqp://user:password@broker/$1"); 736 """ 737 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
738
739 - def rewrite(self, pattern, address):
740 """ 741 Similar to route(), except that the destination of 742 the L{Message} is determined before the message address is rewritten. 743 744 The outgoing address is only rewritten after routing has been 745 finalized. If a message has an outgoing address of 746 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 747 outgoing address to "foo", it will still arrive at the peer that 748 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 749 the receiver will see its outgoing address as "foo". 750 751 The default rewrite rule removes username and password from addresses 752 before they are transmitted. 753 """ 754 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
755
756 - def selectable(self):
757 return Selectable.wrap(pn_messenger_selectable(self._mng))
758 759 @property
760 - def deadline(self):
761 tstamp = pn_messenger_deadline(self._mng) 762 if tstamp: 763 return millis2secs(tstamp) 764 else: 765 return None
766
767 -class Message(object):
768 """The L{Message} class is a mutable holder of message content. 769 770 @ivar instructions: delivery instructions for the message 771 @type instructions: dict 772 @ivar annotations: infrastructure defined message annotations 773 @type annotations: dict 774 @ivar properties: application defined message properties 775 @type properties: dict 776 @ivar body: message body 777 @type body: bytes | unicode | dict | list | int | long | float | UUID 778 """ 779 780 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 781
782 - def __init__(self, body=None, **kwargs):
783 """ 784 @param kwargs: Message property name/value pairs to initialise the Message 785 """ 786 self._msg = pn_message() 787 self._id = Data(pn_message_id(self._msg)) 788 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 789 self.instructions = None 790 self.annotations = None 791 self.properties = None 792 self.body = body 793 for k,v in kwargs.iteritems(): 794 getattr(self, k) # Raise exception if it's not a valid attribute. 795 setattr(self, k, v)
796
797 - def __del__(self):
798 if hasattr(self, "_msg"): 799 pn_message_free(self._msg) 800 del self._msg
801
802 - def _check(self, err):
803 if err < 0: 804 exc = EXCEPTIONS.get(err, MessageException) 805 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 806 else: 807 return err
808
809 - def _pre_encode(self):
810 inst = Data(pn_message_instructions(self._msg)) 811 ann = Data(pn_message_annotations(self._msg)) 812 props = Data(pn_message_properties(self._msg)) 813 body = Data(pn_message_body(self._msg)) 814 815 inst.clear() 816 if self.instructions is not None: 817 inst.put_object(self.instructions) 818 ann.clear() 819 if self.annotations is not None: 820 ann.put_object(self.annotations) 821 props.clear() 822 if self.properties is not None: 823 props.put_object(self.properties) 824 body.clear() 825 if self.body is not None: 826 body.put_object(self.body)
827
828 - def _post_decode(self):
829 inst = Data(pn_message_instructions(self._msg)) 830 ann = Data(pn_message_annotations(self._msg)) 831 props = Data(pn_message_properties(self._msg)) 832 body = Data(pn_message_body(self._msg)) 833 834 if inst.next(): 835 self.instructions = inst.get_object() 836 else: 837 self.instructions = None 838 if ann.next(): 839 self.annotations = ann.get_object() 840 else: 841 self.annotations = None 842 if props.next(): 843 self.properties = props.get_object() 844 else: 845 self.properties = None 846 if body.next(): 847 self.body = body.get_object() 848 else: 849 self.body = None
850
851 - def clear(self):
852 """ 853 Clears the contents of the L{Message}. All fields will be reset to 854 their default values. 855 """ 856 pn_message_clear(self._msg) 857 self.instructions = None 858 self.annotations = None 859 self.properties = None 860 self.body = None
861
862 - def _is_inferred(self):
863 return pn_message_is_inferred(self._msg)
864
865 - def _set_inferred(self, value):
866 self._check(pn_message_set_inferred(self._msg, bool(value)))
867 868 inferred = property(_is_inferred, _set_inferred, doc=""" 869 The inferred flag for a message indicates how the message content 870 is encoded into AMQP sections. If inferred is true then binary and 871 list values in the body of the message will be encoded as AMQP DATA 872 and AMQP SEQUENCE sections, respectively. If inferred is false, 873 then all values in the body of the message will be encoded as AMQP 874 VALUE sections regardless of their type. 875 """) 876
877 - def _is_durable(self):
878 return pn_message_is_durable(self._msg)
879
880 - def _set_durable(self, value):
881 self._check(pn_message_set_durable(self._msg, bool(value)))
882 883 durable = property(_is_durable, _set_durable, 884 doc=""" 885 The durable property indicates that the message should be held durably 886 by any intermediaries taking responsibility for the message. 887 """) 888
889 - def _get_priority(self):
890 return pn_message_get_priority(self._msg)
891
892 - def _set_priority(self, value):
893 self._check(pn_message_set_priority(self._msg, value))
894 895 priority = property(_get_priority, _set_priority, 896 doc=""" 897 The priority of the message. 898 """) 899
900 - def _get_ttl(self):
901 return millis2secs(pn_message_get_ttl(self._msg))
902
903 - def _set_ttl(self, value):
904 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
905 906 ttl = property(_get_ttl, _set_ttl, 907 doc=""" 908 The time to live of the message measured in seconds. Expired messages 909 may be dropped. 910 """) 911
912 - def _is_first_acquirer(self):
913 return pn_message_is_first_acquirer(self._msg)
914
915 - def _set_first_acquirer(self, value):
916 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
917 918 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 919 doc=""" 920 True iff the recipient is the first to acquire the message. 921 """) 922
923 - def _get_delivery_count(self):
924 return pn_message_get_delivery_count(self._msg)
925
926 - def _set_delivery_count(self, value):
927 self._check(pn_message_set_delivery_count(self._msg, value))
928 929 delivery_count = property(_get_delivery_count, _set_delivery_count, 930 doc=""" 931 The number of delivery attempts made for this message. 932 """) 933 934
935 - def _get_id(self):
936 return self._id.get_object()
937 - def _set_id(self, value):
938 if type(value) in (int, long): 939 value = ulong(value) 940 self._id.rewind() 941 self._id.put_object(value)
942 id = property(_get_id, _set_id, 943 doc=""" 944 The id of the message. 945 """) 946
947 - def _get_user_id(self):
948 return pn_message_get_user_id(self._msg)
949
950 - def _set_user_id(self, value):
951 self._check(pn_message_set_user_id(self._msg, value))
952 953 user_id = property(_get_user_id, _set_user_id, 954 doc=""" 955 The user id of the message creator. 956 """) 957
958 - def _get_address(self):
959 return utf82unicode(pn_message_get_address(self._msg))
960
961 - def _set_address(self, value):
962 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
963 964 address = property(_get_address, _set_address, 965 doc=""" 966 The address of the message. 967 """) 968
969 - def _get_subject(self):
970 return pn_message_get_subject(self._msg)
971
972 - def _set_subject(self, value):
973 self._check(pn_message_set_subject(self._msg, value))
974 975 subject = property(_get_subject, _set_subject, 976 doc=""" 977 The subject of the message. 978 """) 979
980 - def _get_reply_to(self):
981 return utf82unicode(pn_message_get_reply_to(self._msg))
982
983 - def _set_reply_to(self, value):
984 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
985 986 reply_to = property(_get_reply_to, _set_reply_to, 987 doc=""" 988 The reply-to address for the message. 989 """) 990
991 - def _get_correlation_id(self):
992 return self._correlation_id.get_object()
993 - def _set_correlation_id(self, value):
994 if type(value) in (int, long): 995 value = ulong(value) 996 self._correlation_id.rewind() 997 self._correlation_id.put_object(value)
998 999 correlation_id = property(_get_correlation_id, _set_correlation_id, 1000 doc=""" 1001 The correlation-id for the message. 1002 """) 1003
1004 - def _get_content_type(self):
1005 return pn_message_get_content_type(self._msg)
1006
1007 - def _set_content_type(self, value):
1008 self._check(pn_message_set_content_type(self._msg, value))
1009 1010 content_type = property(_get_content_type, _set_content_type, 1011 doc=""" 1012 The content-type of the message. 1013 """) 1014
1015 - def _get_content_encoding(self):
1016 return pn_message_get_content_encoding(self._msg)
1017
1018 - def _set_content_encoding(self, value):
1019 self._check(pn_message_set_content_encoding(self._msg, value))
1020 1021 content_encoding = property(_get_content_encoding, _set_content_encoding, 1022 doc=""" 1023 The content-encoding of the message. 1024 """) 1025
1026 - def _get_expiry_time(self):
1027 return millis2secs(pn_message_get_expiry_time(self._msg))
1028
1029 - def _set_expiry_time(self, value):
1030 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1031 1032 expiry_time = property(_get_expiry_time, _set_expiry_time, 1033 doc=""" 1034 The expiry time of the message. 1035 """) 1036
1037 - def _get_creation_time(self):
1038 return millis2secs(pn_message_get_creation_time(self._msg))
1039
1040 - def _set_creation_time(self, value):
1041 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1042 1043 creation_time = property(_get_creation_time, _set_creation_time, 1044 doc=""" 1045 The creation time of the message. 1046 """) 1047
1048 - def _get_group_id(self):
1049 return pn_message_get_group_id(self._msg)
1050
1051 - def _set_group_id(self, value):
1052 self._check(pn_message_set_group_id(self._msg, value))
1053 1054 group_id = property(_get_group_id, _set_group_id, 1055 doc=""" 1056 The group id of the message. 1057 """) 1058
1059 - def _get_group_sequence(self):
1060 return pn_message_get_group_sequence(self._msg)
1061
1062 - def _set_group_sequence(self, value):
1063 self._check(pn_message_set_group_sequence(self._msg, value))
1064 1065 group_sequence = property(_get_group_sequence, _set_group_sequence, 1066 doc=""" 1067 The sequence of the message within its group. 1068 """) 1069
1070 - def _get_reply_to_group_id(self):
1071 return pn_message_get_reply_to_group_id(self._msg)
1072
1073 - def _set_reply_to_group_id(self, value):
1074 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1075 1076 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1077 doc=""" 1078 The group-id for any replies. 1079 """) 1080
1081 - def encode(self):
1082 self._pre_encode() 1083 sz = 16 1084 while True: 1085 err, data = pn_message_encode(self._msg, sz) 1086 if err == PN_OVERFLOW: 1087 sz *= 2 1088 continue 1089 else: 1090 self._check(err) 1091 return data
1092
1093 - def decode(self, data):
1094 self._check(pn_message_decode(self._msg, data, len(data))) 1095 self._post_decode()
1096
1097 - def send(self, sender, tag=None):
1098 dlv = sender.delivery(tag or sender.delivery_tag()) 1099 encoded = self.encode() 1100 sender.stream(encoded) 1101 sender.advance() 1102 if sender.snd_settle_mode == Link.SND_SETTLED: 1103 dlv.settle() 1104 return dlv
1105
1106 - def recv(self, link):
1107 """ 1108 Receives and decodes the message content for the current delivery 1109 from the link. Upon success it will return the current delivery 1110 for the link. If there is no current delivery, or if the current 1111 delivery is incomplete, or if the link is not a receiver, it will 1112 return None. 1113 1114 @type link: Link 1115 @param link: the link to receive a message from 1116 @return the delivery associated with the decoded message (or None) 1117 1118 """ 1119 if link.is_sender: return None 1120 dlv = link.current 1121 if not dlv or dlv.partial: return None 1122 encoded = link.recv(dlv.pending) 1123 link.advance() 1124 # the sender has already forgotten about the delivery, so we might 1125 # as well too 1126 if link.remote_snd_settle_mode == Link.SND_SETTLED: 1127 dlv.settle() 1128 self.decode(encoded) 1129 return dlv
1130
1131 - def __repr2__(self):
1132 props = [] 1133 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1134 "priority", "first_acquirer", "delivery_count", "id", 1135 "correlation_id", "user_id", "group_id", "group_sequence", 1136 "reply_to_group_id", "instructions", "annotations", 1137 "properties", "body"): 1138 value = getattr(self, attr) 1139 if value: props.append("%s=%r" % (attr, value)) 1140 return "Message(%s)" % ", ".join(props)
1141
1142 - def __repr__(self):
1143 tmp = pn_string(None) 1144 err = pn_inspect(self._msg, tmp) 1145 result = pn_string_get(tmp) 1146 pn_free(tmp) 1147 self._check(err) 1148 return result
1149
1150 -class Subscription(object):
1151
1152 - def __init__(self, impl):
1153 self._impl = impl
1154 1155 @property
1156 - def address(self):
1157 return pn_subscription_address(self._impl)
1158 1159 _DEFAULT = object()
1160 1161 -class Selectable(Wrapper):
1162 1163 @staticmethod
1164 - def wrap(impl):
1165 if impl is None: 1166 return None 1167 else: 1168 return Selectable(impl)
1169
1170 - def __init__(self, impl):
1171 Wrapper.__init__(self, impl, pn_selectable_attachments)
1172
1173 - def _init(self):
1174 pass
1175
1176 - def fileno(self, fd = _DEFAULT):
1177 if fd is _DEFAULT: 1178 return pn_selectable_get_fd(self._impl) 1179 elif fd is None: 1180 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) 1181 else: 1182 pn_selectable_set_fd(self._impl, fd)
1183
1184 - def _is_reading(self):
1185 return pn_selectable_is_reading(self._impl)
1186
1187 - def _set_reading(self, val):
1188 pn_selectable_set_reading(self._impl, bool(val))
1189 1190 reading = property(_is_reading, _set_reading) 1191
1192 - def _is_writing(self):
1193 return pn_selectable_is_writing(self._impl)
1194
1195 - def _set_writing(self, val):
1196 pn_selectable_set_writing(self._impl, bool(val))
1197 1198 writing = property(_is_writing, _set_writing) 1199
1200 - def _get_deadline(self):
1201 tstamp = pn_selectable_get_deadline(self._impl) 1202 if tstamp: 1203 return millis2secs(tstamp) 1204 else: 1205 return None
1206
1207 - def _set_deadline(self, deadline):
1208 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1209 1210 deadline = property(_get_deadline, _set_deadline) 1211
1212 - def readable(self):
1213 pn_selectable_readable(self._impl)
1214
1215 - def writable(self):
1216 pn_selectable_writable(self._impl)
1217
1218 - def expired(self):
1219 pn_selectable_expired(self._impl)
1220
1221 - def _is_registered(self):
1222 return pn_selectable_is_registered(self._impl)
1223
1224 - def _set_registered(self, registered):
1225 pn_selectable_set_registered(self._impl, registered)
1226 1227 registered = property(_is_registered, _set_registered, 1228 doc=""" 1229 The registered property may be get/set by an I/O polling system to 1230 indicate whether the fd has been registered or not. 1231 """) 1232 1233 @property
1234 - def is_terminal(self):
1235 return pn_selectable_is_terminal(self._impl)
1236
1237 - def terminate(self):
1238 pn_selectable_terminate(self._impl)
1239
1240 - def release(self):
1241 pn_selectable_release(self._impl)
1242
1243 -class DataException(ProtonException):
1244 """ 1245 The DataException class is the root of the Data exception hierarchy. 1246 All exceptions raised by the Data class extend this exception. 1247 """ 1248 pass
1249
1250 -class UnmappedType:
1251
1252 - def __init__(self, msg):
1253 self.msg = msg
1254
1255 - def __repr__(self):
1256 return "UnmappedType(%s)" % self.msg
1257
1258 -class ulong(long):
1259
1260 - def __repr__(self):
1261 return "ulong(%s)" % long.__repr__(self)
1262
1263 -class timestamp(long):
1264
1265 - def __repr__(self):
1266 return "timestamp(%s)" % long.__repr__(self)
1267
1268 -class symbol(unicode):
1269
1270 - def __repr__(self):
1271 return "symbol(%s)" % unicode.__repr__(self)
1272
1273 -class char(unicode):
1274
1275 - def __repr__(self):
1276 return "char(%s)" % unicode.__repr__(self)
1277
1278 -class Described(object):
1279
1280 - def __init__(self, descriptor, value):
1281 self.descriptor = descriptor 1282 self.value = value
1283
1284 - def __repr__(self):
1285 return "Described(%r, %r)" % (self.descriptor, self.value)
1286
1287 - def __eq__(self, o):
1288 if isinstance(o, Described): 1289 return self.descriptor == o.descriptor and self.value == o.value 1290 else: 1291 return False
1292 1293 UNDESCRIBED = Constant("UNDESCRIBED")
1294 1295 -class Array(object):
1296
1297 - def __init__(self, descriptor, type, *elements):
1298 self.descriptor = descriptor 1299 self.type = type 1300 self.elements = elements
1301
1302 - def __repr__(self):
1303 if self.elements: 1304 els = ", %s" % (", ".join(map(repr, self.elements))) 1305 else: 1306 els = "" 1307 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1308
1309 - def __eq__(self, o):
1310 if isinstance(o, Array): 1311 return self.descriptor == o.descriptor and \ 1312 self.type == o.type and self.elements == o.elements 1313 else: 1314 return False
1315
1316 -class Data:
1317 """ 1318 The L{Data} class provides an interface for decoding, extracting, 1319 creating, and encoding arbitrary AMQP data. A L{Data} object 1320 contains a tree of AMQP values. Leaf nodes in this tree correspond 1321 to scalars in the AMQP type system such as L{ints<INT>} or 1322 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1323 compound values in the AMQP type system such as L{lists<LIST>}, 1324 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1325 The root node of the tree is the L{Data} object itself and can have 1326 an arbitrary number of children. 1327 1328 A L{Data} object maintains the notion of the current sibling node 1329 and a current parent node. Siblings are ordered within their parent. 1330 Values are accessed and/or added by using the L{next}, L{prev}, 1331 L{enter}, and L{exit} methods to navigate to the desired location in 1332 the tree and using the supplied variety of put_*/get_* methods to 1333 access or add a value of the desired type. 1334 1335 The put_* methods will always add a value I{after} the current node 1336 in the tree. If the current node has a next sibling the put_* method 1337 will overwrite the value on this node. If there is no current node 1338 or the current node has no next sibling then one will be added. The 1339 put_* methods always set the added/modified node to the current 1340 node. The get_* methods read the value of the current node and do 1341 not change which node is current. 1342 1343 The following types of scalar values are supported: 1344 1345 - L{NULL} 1346 - L{BOOL} 1347 - L{UBYTE} 1348 - L{USHORT} 1349 - L{SHORT} 1350 - L{UINT} 1351 - L{INT} 1352 - L{ULONG} 1353 - L{LONG} 1354 - L{FLOAT} 1355 - L{DOUBLE} 1356 - L{BINARY} 1357 - L{STRING} 1358 - L{SYMBOL} 1359 1360 The following types of compound values are supported: 1361 1362 - L{DESCRIBED} 1363 - L{ARRAY} 1364 - L{LIST} 1365 - L{MAP} 1366 """ 1367 1368 NULL = PN_NULL; "A null value." 1369 BOOL = PN_BOOL; "A boolean value." 1370 UBYTE = PN_UBYTE; "An unsigned byte value." 1371 BYTE = PN_BYTE; "A signed byte value." 1372 USHORT = PN_USHORT; "An unsigned short value." 1373 SHORT = PN_SHORT; "A short value." 1374 UINT = PN_UINT; "An unsigned int value." 1375 INT = PN_INT; "A signed int value." 1376 CHAR = PN_CHAR; "A character value." 1377 ULONG = PN_ULONG; "An unsigned long value." 1378 LONG = PN_LONG; "A signed long value." 1379 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1380 FLOAT = PN_FLOAT; "A float value." 1381 DOUBLE = PN_DOUBLE; "A double value." 1382 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1383 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1384 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1385 UUID = PN_UUID; "A UUID value." 1386 BINARY = PN_BINARY; "A binary string." 1387 STRING = PN_STRING; "A unicode string." 1388 SYMBOL = PN_SYMBOL; "A symbolic string." 1389 DESCRIBED = PN_DESCRIBED; "A described value." 1390 ARRAY = PN_ARRAY; "An array value." 1391 LIST = PN_LIST; "A list value." 1392 MAP = PN_MAP; "A map value." 1393 1394 type_names = { 1395 NULL: "null", 1396 BOOL: "bool", 1397 BYTE: "byte", 1398 UBYTE: "ubyte", 1399 SHORT: "short", 1400 USHORT: "ushort", 1401 INT: "int", 1402 UINT: "uint", 1403 CHAR: "char", 1404 LONG: "long", 1405 ULONG: "ulong", 1406 TIMESTAMP: "timestamp", 1407 FLOAT: "float", 1408 DOUBLE: "double", 1409 DECIMAL32: "decimal32", 1410 DECIMAL64: "decimal64", 1411 DECIMAL128: "decimal128", 1412 UUID: "uuid", 1413 BINARY: "binary", 1414 STRING: "string", 1415 SYMBOL: "symbol", 1416 DESCRIBED: "described", 1417 ARRAY: "array", 1418 LIST: "list", 1419 MAP: "map" 1420 } 1421 1422 @classmethod
1423 - def type_name(type): return Data.type_names[type]
1424
1425 - def __init__(self, capacity=16):
1426 if type(capacity) in (int, long): 1427 self._data = pn_data(capacity) 1428 self._free = True 1429 else: 1430 self._data = capacity 1431 self._free = False
1432
1433 - def __del__(self):
1434 if self._free and hasattr(self, "_data"): 1435 pn_data_free(self._data) 1436 del self._data
1437
1438 - def _check(self, err):
1439 if err < 0: 1440 exc = EXCEPTIONS.get(err, DataException) 1441 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1442 else: 1443 return err
1444
1445 - def clear(self):
1446 """ 1447 Clears the data object. 1448 """ 1449 pn_data_clear(self._data)
1450
1451 - def rewind(self):
1452 """ 1453 Clears current node and sets the parent to the root node. Clearing the 1454 current node sets it _before_ the first node, calling next() will advance to 1455 the first node. 1456 """ 1457 assert self._data is not None 1458 pn_data_rewind(self._data)
1459
1460 - def next(self):
1461 """ 1462 Advances the current node to its next sibling and returns its 1463 type. If there is no next sibling the current node remains 1464 unchanged and None is returned. 1465 """ 1466 found = pn_data_next(self._data) 1467 if found: 1468 return self.type() 1469 else: 1470 return None
1471
1472 - def prev(self):
1473 """ 1474 Advances the current node to its previous sibling and returns its 1475 type. If there is no previous sibling the current node remains 1476 unchanged and None is returned. 1477 """ 1478 found = pn_data_prev(self._data) 1479 if found: 1480 return self.type() 1481 else: 1482 return None
1483
1484 - def enter(self):
1485 """ 1486 Sets the parent node to the current node and clears the current node. 1487 Clearing the current node sets it _before_ the first child, 1488 call next() advances to the first child. 1489 """ 1490 return pn_data_enter(self._data)
1491
1492 - def exit(self):
1493 """ 1494 Sets the current node to the parent node and the parent node to 1495 its own parent. 1496 """ 1497 return pn_data_exit(self._data)
1498
1499 - def lookup(self, name):
1500 return pn_data_lookup(self._data, name)
1501
1502 - def narrow(self):
1503 pn_data_narrow(self._data)
1504
1505 - def widen(self):
1506 pn_data_widen(self._data)
1507
1508 - def type(self):
1509 """ 1510 Returns the type of the current node. 1511 """ 1512 dtype = pn_data_type(self._data) 1513 if dtype == -1: 1514 return None 1515 else: 1516 return dtype
1517
1518 - def encode(self):
1519 """ 1520 Returns a representation of the data encoded in AMQP format. 1521 """ 1522 size = 1024 1523 while True: 1524 cd, enc = pn_data_encode(self._data, size) 1525 if cd == PN_OVERFLOW: 1526 size *= 2 1527 elif cd >= 0: 1528 return enc 1529 else: 1530 self._check(cd)
1531
1532 - def decode(self, encoded):
1533 """ 1534 Decodes the first value from supplied AMQP data and returns the 1535 number of bytes consumed. 1536 1537 @type encoded: binary 1538 @param encoded: AMQP encoded binary data 1539 """ 1540 return self._check(pn_data_decode(self._data, encoded))
1541
1542 - def put_list(self):
1543 """ 1544 Puts a list value. Elements may be filled by entering the list 1545 node and putting element values. 1546 1547 >>> data = Data() 1548 >>> data.put_list() 1549 >>> data.enter() 1550 >>> data.put_int(1) 1551 >>> data.put_int(2) 1552 >>> data.put_int(3) 1553 >>> data.exit() 1554 """ 1555 self._check(pn_data_put_list(self._data))
1556
1557 - def put_map(self):
1558 """ 1559 Puts a map value. Elements may be filled by entering the map node 1560 and putting alternating key value pairs. 1561 1562 >>> data = Data() 1563 >>> data.put_map() 1564 >>> data.enter() 1565 >>> data.put_string("key") 1566 >>> data.put_string("value") 1567 >>> data.exit() 1568 """ 1569 self._check(pn_data_put_map(self._data))
1570
1571 - def put_array(self, described, element_type):
1572 """ 1573 Puts an array value. Elements may be filled by entering the array 1574 node and putting the element values. The values must all be of the 1575 specified array element type. If an array is described then the 1576 first child value of the array is the descriptor and may be of any 1577 type. 1578 1579 >>> data = Data() 1580 >>> 1581 >>> data.put_array(False, Data.INT) 1582 >>> data.enter() 1583 >>> data.put_int(1) 1584 >>> data.put_int(2) 1585 >>> data.put_int(3) 1586 >>> data.exit() 1587 >>> 1588 >>> data.put_array(True, Data.DOUBLE) 1589 >>> data.enter() 1590 >>> data.put_symbol("array-descriptor") 1591 >>> data.put_double(1.1) 1592 >>> data.put_double(1.2) 1593 >>> data.put_double(1.3) 1594 >>> data.exit() 1595 1596 @type described: bool 1597 @param described: specifies whether the array is described 1598 @type element_type: int 1599 @param element_type: the type of the array elements 1600 """ 1601 self._check(pn_data_put_array(self._data, described, element_type))
1602
1603 - def put_described(self):
1604 """ 1605 Puts a described value. A described node has two children, the 1606 descriptor and the value. These are specified by entering the node 1607 and putting the desired values. 1608 1609 >>> data = Data() 1610 >>> data.put_described() 1611 >>> data.enter() 1612 >>> data.put_symbol("value-descriptor") 1613 >>> data.put_string("the value") 1614 >>> data.exit() 1615 """ 1616 self._check(pn_data_put_described(self._data))
1617
1618 - def put_null(self):
1619 """ 1620 Puts a null value. 1621 """ 1622 self._check(pn_data_put_null(self._data))
1623
1624 - def put_bool(self, b):
1625 """ 1626 Puts a boolean value. 1627 1628 @param b: a boolean value 1629 """ 1630 self._check(pn_data_put_bool(self._data, b))
1631
1632 - def put_ubyte(self, ub):
1633 """ 1634 Puts an unsigned byte value. 1635 1636 @param ub: an integral value 1637 """ 1638 self._check(pn_data_put_ubyte(self._data, ub))
1639
1640 - def put_byte(self, b):
1641 """ 1642 Puts a signed byte value. 1643 1644 @param b: an integral value 1645 """ 1646 self._check(pn_data_put_byte(self._data, b))
1647
1648 - def put_ushort(self, us):
1649 """ 1650 Puts an unsigned short value. 1651 1652 @param us: an integral value. 1653 """ 1654 self._check(pn_data_put_ushort(self._data, us))
1655
1656 - def put_short(self, s):
1657 """ 1658 Puts a signed short value. 1659 1660 @param s: an integral value 1661 """ 1662 self._check(pn_data_put_short(self._data, s))
1663
1664 - def put_uint(self, ui):
1665 """ 1666 Puts an unsigned int value. 1667 1668 @param ui: an integral value 1669 """ 1670 self._check(pn_data_put_uint(self._data, ui))
1671
1672 - def put_int(self, i):
1673 """ 1674 Puts a signed int value. 1675 1676 @param i: an integral value 1677 """ 1678 self._check(pn_data_put_int(self._data, i))
1679
1680 - def put_char(self, c):
1681 """ 1682 Puts a char value. 1683 1684 @param c: a single character 1685 """ 1686 self._check(pn_data_put_char(self._data, ord(c)))
1687
1688 - def put_ulong(self, ul):
1689 """ 1690 Puts an unsigned long value. 1691 1692 @param ul: an integral value 1693 """ 1694 self._check(pn_data_put_ulong(self._data, ul))
1695
1696 - def put_long(self, l):
1697 """ 1698 Puts a signed long value. 1699 1700 @param l: an integral value 1701 """ 1702 self._check(pn_data_put_long(self._data, l))
1703
1704 - def put_timestamp(self, t):
1705 """ 1706 Puts a timestamp value. 1707 1708 @param t: an integral value 1709 """ 1710 self._check(pn_data_put_timestamp(self._data, t))
1711
1712 - def put_float(self, f):
1713 """ 1714 Puts a float value. 1715 1716 @param f: a floating point value 1717 """ 1718 self._check(pn_data_put_float(self._data, f))
1719
1720 - def put_double(self, d):
1721 """ 1722 Puts a double value. 1723 1724 @param d: a floating point value. 1725 """ 1726 self._check(pn_data_put_double(self._data, d))
1727
1728 - def put_decimal32(self, d):
1729 """ 1730 Puts a decimal32 value. 1731 1732 @param d: a decimal32 value 1733 """ 1734 self._check(pn_data_put_decimal32(self._data, d))
1735
1736 - def put_decimal64(self, d):
1737 """ 1738 Puts a decimal64 value. 1739 1740 @param d: a decimal64 value 1741 """ 1742 self._check(pn_data_put_decimal64(self._data, d))
1743
1744 - def put_decimal128(self, d):
1745 """ 1746 Puts a decimal128 value. 1747 1748 @param d: a decimal128 value 1749 """ 1750 self._check(pn_data_put_decimal128(self._data, d))
1751
1752 - def put_uuid(self, u):
1753 """ 1754 Puts a UUID value. 1755 1756 @param u: a uuid value 1757 """ 1758 self._check(pn_data_put_uuid(self._data, u.bytes))
1759
1760 - def put_binary(self, b):
1761 """ 1762 Puts a binary value. 1763 1764 @type b: binary 1765 @param b: a binary value 1766 """ 1767 self._check(pn_data_put_binary(self._data, b))
1768
1769 - def put_string(self, s):
1770 """ 1771 Puts a unicode value. 1772 1773 @type s: unicode 1774 @param s: a unicode value 1775 """ 1776 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1777
1778 - def put_symbol(self, s):
1779 """ 1780 Puts a symbolic value. 1781 1782 @type s: string 1783 @param s: the symbol name 1784 """ 1785 self._check(pn_data_put_symbol(self._data, s))
1786
1787 - def get_list(self):
1788 """ 1789 If the current node is a list, return the number of elements, 1790 otherwise return zero. List elements can be accessed by entering 1791 the list. 1792 1793 >>> count = data.get_list() 1794 >>> data.enter() 1795 >>> for i in range(count): 1796 ... type = data.next() 1797 ... if type == Data.STRING: 1798 ... print data.get_string() 1799 ... elif type == ...: 1800 ... ... 1801 >>> data.exit() 1802 """ 1803 return pn_data_get_list(self._data)
1804
1805 - def get_map(self):
1806 """ 1807 If the current node is a map, return the number of child elements, 1808 otherwise return zero. Key value pairs can be accessed by entering 1809 the map. 1810 1811 >>> count = data.get_map() 1812 >>> data.enter() 1813 >>> for i in range(count/2): 1814 ... type = data.next() 1815 ... if type == Data.STRING: 1816 ... print data.get_string() 1817 ... elif type == ...: 1818 ... ... 1819 >>> data.exit() 1820 """ 1821 return pn_data_get_map(self._data)
1822
1823 - def get_array(self):
1824 """ 1825 If the current node is an array, return a tuple of the element 1826 count, a boolean indicating whether the array is described, and 1827 the type of each element, otherwise return (0, False, None). Array 1828 data can be accessed by entering the array. 1829 1830 >>> # read an array of strings with a symbolic descriptor 1831 >>> count, described, type = data.get_array() 1832 >>> data.enter() 1833 >>> data.next() 1834 >>> print "Descriptor:", data.get_symbol() 1835 >>> for i in range(count): 1836 ... data.next() 1837 ... print "Element:", data.get_string() 1838 >>> data.exit() 1839 """ 1840 count = pn_data_get_array(self._data) 1841 described = pn_data_is_array_described(self._data) 1842 type = pn_data_get_array_type(self._data) 1843 if type == -1: 1844 type = None 1845 return count, described, type
1846
1847 - def is_described(self):
1848 """ 1849 Checks if the current node is a described value. The descriptor 1850 and value may be accessed by entering the described value. 1851 1852 >>> # read a symbolically described string 1853 >>> assert data.is_described() # will error if the current node is not described 1854 >>> data.enter() 1855 >>> data.next() 1856 >>> print data.get_symbol() 1857 >>> data.next() 1858 >>> print data.get_string() 1859 >>> data.exit() 1860 """ 1861 return pn_data_is_described(self._data)
1862
1863 - def is_null(self):
1864 """ 1865 Checks if the current node is a null. 1866 """ 1867 return pn_data_is_null(self._data)
1868
1869 - def get_bool(self):
1870 """ 1871 If the current node is a boolean, returns its value, returns False 1872 otherwise. 1873 """ 1874 return pn_data_get_bool(self._data)
1875
1876 - def get_ubyte(self):
1877 """ 1878 If the current node is an unsigned byte, returns its value, 1879 returns 0 otherwise. 1880 """ 1881 return pn_data_get_ubyte(self._data)
1882
1883 - def get_byte(self):
1884 """ 1885 If the current node is a signed byte, returns its value, returns 0 1886 otherwise. 1887 """ 1888 return pn_data_get_byte(self._data)
1889
1890 - def get_ushort(self):
1891 """ 1892 If the current node is an unsigned short, returns its value, 1893 returns 0 otherwise. 1894 """ 1895 return pn_data_get_ushort(self._data)
1896
1897 - def get_short(self):
1898 """ 1899 If the current node is a signed short, returns its value, returns 1900 0 otherwise. 1901 """ 1902 return pn_data_get_short(self._data)
1903
1904 - def get_uint(self):
1905 """ 1906 If the current node is an unsigned int, returns its value, returns 1907 0 otherwise. 1908 """ 1909 return pn_data_get_uint(self._data)
1910
1911 - def get_int(self):
1912 """ 1913 If the current node is a signed int, returns its value, returns 0 1914 otherwise. 1915 """ 1916 return pn_data_get_int(self._data)
1917
1918 - def get_char(self):
1919 """ 1920 If the current node is a char, returns its value, returns 0 1921 otherwise. 1922 """ 1923 return char(unichr(pn_data_get_char(self._data)))
1924
1925 - def get_ulong(self):
1926 """ 1927 If the current node is an unsigned long, returns its value, 1928 returns 0 otherwise. 1929 """ 1930 return ulong(pn_data_get_ulong(self._data))
1931
1932 - def get_long(self):
1933 """ 1934 If the current node is an signed long, returns its value, returns 1935 0 otherwise. 1936 """ 1937 return pn_data_get_long(self._data)
1938
1939 - def get_timestamp(self):
1940 """ 1941 If the current node is a timestamp, returns its value, returns 0 1942 otherwise. 1943 """ 1944 return timestamp(pn_data_get_timestamp(self._data))
1945
1946 - def get_float(self):
1947 """ 1948 If the current node is a float, returns its value, raises 0 1949 otherwise. 1950 """ 1951 return pn_data_get_float(self._data)
1952
1953 - def get_double(self):
1954 """ 1955 If the current node is a double, returns its value, returns 0 1956 otherwise. 1957 """ 1958 return pn_data_get_double(self._data)
1959 1960 # XXX: need to convert
1961 - def get_decimal32(self):
1962 """ 1963 If the current node is a decimal32, returns its value, returns 0 1964 otherwise. 1965 """ 1966 return pn_data_get_decimal32(self._data)
1967 1968 # XXX: need to convert
1969 - def get_decimal64(self):
1970 """ 1971 If the current node is a decimal64, returns its value, returns 0 1972 otherwise. 1973 """ 1974 return pn_data_get_decimal64(self._data)
1975 1976 # XXX: need to convert
1977 - def get_decimal128(self):
1978 """ 1979 If the current node is a decimal128, returns its value, returns 0 1980 otherwise. 1981 """ 1982 return pn_data_get_decimal128(self._data)
1983
1984 - def get_uuid(self):
1985 """ 1986 If the current node is a UUID, returns its value, returns None 1987 otherwise. 1988 """ 1989 if pn_data_type(self._data) == Data.UUID: 1990 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 1991 else: 1992 return None
1993
1994 - def get_binary(self):
1995 """ 1996 If the current node is binary, returns its value, returns "" 1997 otherwise. 1998 """ 1999 return pn_data_get_binary(self._data)
2000
2001 - def get_string(self):
2002 """ 2003 If the current node is a string, returns its value, returns "" 2004 otherwise. 2005 """ 2006 return pn_data_get_string(self._data).decode("utf8")
2007
2008 - def get_symbol(self):
2009 """ 2010 If the current node is a symbol, returns its value, returns "" 2011 otherwise. 2012 """ 2013 return symbol(pn_data_get_symbol(self._data))
2014
2015 - def copy(self, src):
2016 self._check(pn_data_copy(self._data, src._data))
2017
2018 - def format(self):
2019 sz = 16 2020 while True: 2021 err, result = pn_data_format(self._data, sz) 2022 if err == PN_OVERFLOW: 2023 sz *= 2 2024 continue 2025 else: 2026 self._check(err) 2027 return result
2028
2029 - def dump(self):
2030 pn_data_dump(self._data)
2031
2032 - def put_dict(self, d):
2033 self.put_map() 2034 self.enter() 2035 try: 2036 for k, v in d.items(): 2037 self.put_object(k) 2038 self.put_object(v) 2039 finally: 2040 self.exit()
2041
2042 - def get_dict(self):
2043 if self.enter(): 2044 try: 2045 result = {} 2046 while self.next(): 2047 k = self.get_object() 2048 if self.next(): 2049 v = self.get_object() 2050 else: 2051 v = None 2052 result[k] = v 2053 finally: 2054 self.exit() 2055 return result
2056
2057 - def put_sequence(self, s):
2058 self.put_list() 2059 self.enter() 2060 try: 2061 for o in s: 2062 self.put_object(o) 2063 finally: 2064 self.exit()
2065
2066 - def get_sequence(self):
2067 if self.enter(): 2068 try: 2069 result = [] 2070 while self.next(): 2071 result.append(self.get_object()) 2072 finally: 2073 self.exit() 2074 return result
2075
2076 - def get_py_described(self):
2077 if self.enter(): 2078 try: 2079 self.next() 2080 descriptor = self.get_object() 2081 self.next() 2082 value = self.get_object() 2083 finally: 2084 self.exit() 2085 return Described(descriptor, value)
2086
2087 - def put_py_described(self, d):
2088 self.put_described() 2089 self.enter() 2090 try: 2091 self.put_object(d.descriptor) 2092 self.put_object(d.value) 2093 finally: 2094 self.exit()
2095
2096 - def get_py_array(self):
2097 """ 2098 If the current node is an array, return an Array object 2099 representing the array and its contents. Otherwise return None. 2100 This is a convenience wrapper around get_array, enter, etc. 2101 """ 2102 2103 count, described, type = self.get_array() 2104 if type is None: return None 2105 if self.enter(): 2106 try: 2107 if described: 2108 self.next() 2109 descriptor = self.get_object() 2110 else: 2111 descriptor = UNDESCRIBED 2112 elements = [] 2113 while self.next(): 2114 elements.append(self.get_object()) 2115 finally: 2116 self.exit() 2117 return Array(descriptor, type, *elements)
2118
2119 - def put_py_array(self, a):
2120 described = a.descriptor != UNDESCRIBED 2121 self.put_array(described, a.type) 2122 self.enter() 2123 try: 2124 if described: 2125 self.put_object(a.descriptor) 2126 for e in a.elements: 2127 self.put_object(e) 2128 finally: 2129 self.exit()
2130 2131 put_mappings = { 2132 None.__class__: lambda s, _: s.put_null(), 2133 bool: put_bool, 2134 dict: put_dict, 2135 list: put_sequence, 2136 tuple: put_sequence, 2137 unicode: put_string, 2138 bytes: put_binary, 2139 symbol: put_symbol, 2140 int: put_long, 2141 char: put_char, 2142 long: put_long, 2143 ulong: put_ulong, 2144 timestamp: put_timestamp, 2145 float: put_double, 2146 uuid.UUID: put_uuid, 2147 Described: put_py_described, 2148 Array: put_py_array 2149 } 2150 get_mappings = { 2151 NULL: lambda s: None, 2152 BOOL: get_bool, 2153 BYTE: get_byte, 2154 UBYTE: get_ubyte, 2155 SHORT: get_short, 2156 USHORT: get_ushort, 2157 INT: get_int, 2158 UINT: get_uint, 2159 CHAR: get_char, 2160 LONG: get_long, 2161 ULONG: get_ulong, 2162 TIMESTAMP: get_timestamp, 2163 FLOAT: get_float, 2164 DOUBLE: get_double, 2165 DECIMAL32: get_decimal32, 2166 DECIMAL64: get_decimal64, 2167 DECIMAL128: get_decimal128, 2168 UUID: get_uuid, 2169 BINARY: get_binary, 2170 STRING: get_string, 2171 SYMBOL: get_symbol, 2172 DESCRIBED: get_py_described, 2173 ARRAY: get_py_array, 2174 LIST: get_sequence, 2175 MAP: get_dict 2176 } 2177 2178
2179 - def put_object(self, obj):
2180 putter = self.put_mappings[obj.__class__] 2181 putter(self, obj)
2182
2183 - def get_object(self):
2184 type = self.type() 2185 if type is None: return None 2186 getter = self.get_mappings.get(type) 2187 if getter: 2188 return getter(self) 2189 else: 2190 return UnmappedType(str(type))
2191
2192 -class ConnectionException(ProtonException):
2193 pass
2194
2195 -class Endpoint(object):
2196 2197 LOCAL_UNINIT = PN_LOCAL_UNINIT 2198 REMOTE_UNINIT = PN_REMOTE_UNINIT 2199 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2200 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2201 LOCAL_CLOSED = PN_LOCAL_CLOSED 2202 REMOTE_CLOSED = PN_REMOTE_CLOSED 2203
2204 - def _init(self):
2205 self.condition = None
2206
2207 - def _update_cond(self):
2208 obj2cond(self.condition, self._get_cond_impl())
2209 2210 @property
2211 - def remote_condition(self):
2212 return cond2obj(self._get_remote_cond_impl())
2213 2214 # the following must be provided by subclasses
2215 - def _get_cond_impl(self):
2216 assert False, "Subclass must override this!"
2217
2218 - def _get_remote_cond_impl(self):
2219 assert False, "Subclass must override this!"
2220
2221 - def _get_handler(self):
2222 import reactor 2223 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2224 if ractor: 2225 on_error = ractor.on_error 2226 else: 2227 on_error = None 2228 record = self._get_attachments() 2229 return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
2230
2231 - def _set_handler(self, handler):
2232 import reactor 2233 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2234 if ractor: 2235 on_error = ractor.on_error 2236 else: 2237 on_error = None 2238 impl = _chandler(handler, on_error) 2239 record = self._get_attachments() 2240 pn_record_set_handler(record, impl) 2241 pn_decref(impl)
2242 2243 handler = property(_get_handler, _set_handler) 2244 2245 @property
2246 - def transport(self):
2247 return self.connection.transport
2248
2249 -class Condition:
2250
2251 - def __init__(self, name, description=None, info=None):
2252 self.name = name 2253 self.description = description 2254 self.info = info
2255
2256 - def __repr__(self):
2257 return "Condition(%s)" % ", ".join([repr(x) for x in 2258 (self.name, self.description, self.info) 2259 if x])
2260
2261 - def __eq__(self, o):
2262 if not isinstance(o, Condition): return False 2263 return self.name == o.name and \ 2264 self.description == o.description and \ 2265 self.info == o.info
2266
2267 -def obj2cond(obj, cond):
2268 pn_condition_clear(cond) 2269 if obj: 2270 pn_condition_set_name(cond, str(obj.name)) 2271 pn_condition_set_description(cond, obj.description) 2272 info = Data(pn_condition_info(cond)) 2273 if obj.info: 2274 info.put_object(obj.info)
2275
2276 -def cond2obj(cond):
2277 if pn_condition_is_set(cond): 2278 return Condition(pn_condition_get_name(cond), 2279 pn_condition_get_description(cond), 2280 dat2obj(pn_condition_info(cond))) 2281 else: 2282 return None
2283
2284 -def dat2obj(dimpl):
2285 if dimpl: 2286 d = Data(dimpl) 2287 d.rewind() 2288 d.next() 2289 obj = d.get_object() 2290 d.rewind() 2291 return obj
2292
2293 -def obj2dat(obj, dimpl):
2294 if obj is not None: 2295 d = Data(dimpl) 2296 d.put_object(obj)
2297
2298 -def secs2millis(secs):
2299 return long(secs*1000)
2300
2301 -def millis2secs(millis):
2302 return float(millis)/1000.0
2303
2304 -def timeout2millis(secs):
2305 if secs is None: return PN_MILLIS_MAX 2306 return secs2millis(secs)
2307
2308 -def millis2timeout(millis):
2309 if millis == PN_MILLIS_MAX: return None 2310 return millis2secs(millis)
2311
2312 -def unicode2utf8(string):
2313 if string is None: 2314 return None 2315 if isinstance(string, unicode): 2316 return string.encode('utf8') 2317 elif isinstance(string, str): 2318 return string 2319 else: 2320 raise TypeError("Unrecognized string type: %r" % string)
2321
2322 -def utf82unicode(string):
2323 if string is None: 2324 return None 2325 if isinstance(string, unicode): 2326 return string 2327 elif isinstance(string, str): 2328 return string.decode('utf8') 2329 else: 2330 raise TypeError("Unrecognized string type")
2331
2332 -class Connection(Wrapper, Endpoint):
2333 """ 2334 A representation of an AMQP connection 2335 """ 2336 2337 @staticmethod
2338 - def wrap(impl):
2339 if impl is None: 2340 return None 2341 else: 2342 return Connection(impl)
2343
2344 - def __init__(self, impl = pn_connection):
2345 Wrapper.__init__(self, impl, pn_connection_attachments)
2346
2347 - def _init(self):
2348 Endpoint._init(self) 2349 self.offered_capabilities = None 2350 self.desired_capabilities = None 2351 self.properties = None
2352
2353 - def _get_attachments(self):
2354 return pn_connection_attachments(self._impl)
2355 2356 @property
2357 - def connection(self):
2358 return self
2359 2360 @property
2361 - def transport(self):
2362 return Transport.wrap(pn_connection_transport(self._impl))
2363
2364 - def _check(self, err):
2365 if err < 0: 2366 exc = EXCEPTIONS.get(err, ConnectionException) 2367 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 2368 else: 2369 return err
2370
2371 - def _get_cond_impl(self):
2372 return pn_connection_condition(self._impl)
2373
2374 - def _get_remote_cond_impl(self):
2375 return pn_connection_remote_condition(self._impl)
2376
2377 - def collect(self, collector):
2378 if collector is None: 2379 pn_connection_collect(self._impl, None) 2380 else: 2381 pn_connection_collect(self._impl, collector._impl) 2382 self._collector = weakref.ref(collector)
2383
2384 - def _get_container(self):
2385 return utf82unicode(pn_connection_get_container(self._impl))
2386 - def _set_container(self, name):
2387 return pn_connection_set_container(self._impl, unicode2utf8(name))
2388 2389 container = property(_get_container, _set_container) 2390
2391 - def _get_hostname(self):
2392 return utf82unicode(pn_connection_get_hostname(self._impl))
2393 - def _set_hostname(self, name):
2394 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2395 2396 hostname = property(_get_hostname, _set_hostname) 2397 2398 @property
2399 - def remote_container(self):
2400 """The container identifier specified by the remote peer for this connection.""" 2401 return pn_connection_remote_container(self._impl)
2402 2403 @property
2404 - def remote_hostname(self):
2405 """The hostname specified by the remote peer for this connection.""" 2406 return pn_connection_remote_hostname(self._impl)
2407 2408 @property
2410 """The capabilities offered by the remote peer for this connection.""" 2411 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2412 2413 @property
2415 """The capabilities desired by the remote peer for this connection.""" 2416 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2417 2418 @property
2419 - def remote_properties(self):
2420 """The properties specified by the remote peer for this connection.""" 2421 return dat2obj(pn_connection_remote_properties(self._impl))
2422
2423 - def open(self):
2424 """ 2425 Opens the connection. 2426 2427 In more detail, this moves the local state of the connection to 2428 the ACTIVE state and triggers an open frame to be sent to the 2429 peer. A connection is fully active once both peers have opened it. 2430 """ 2431 obj2dat(self.offered_capabilities, 2432 pn_connection_offered_capabilities(self._impl)) 2433 obj2dat(self.desired_capabilities, 2434 pn_connection_desired_capabilities(self._impl)) 2435 obj2dat(self.properties, pn_connection_properties(self._impl)) 2436 pn_connection_open(self._impl)
2437
2438 - def close(self):
2439 """ 2440 Closes the connection. 2441 2442 In more detail, this moves the local state of the connection to 2443 the CLOSED state and triggers a close frame to be sent to the 2444 peer. A connection is fully closed once both peers have closed it. 2445 """ 2446 self._update_cond() 2447 pn_connection_close(self._impl)
2448 2449 @property
2450 - def state(self):
2451 """ 2452 The state of the connection as a bit field. The state has a local 2453 and a remote component. Each of these can be in one of three 2454 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 2455 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 2456 REMOTE_ACTIVE and REMOTE_CLOSED. 2457 """ 2458 return pn_connection_state(self._impl)
2459
2460 - def session(self):
2461 """ 2462 Returns a new session on this connection. 2463 """ 2464 return Session(pn_session(self._impl))
2465
2466 - def session_head(self, mask):
2467 return Session.wrap(pn_session_head(self._impl, mask))
2468 2471 2472 @property
2473 - def work_head(self):
2474 return Delivery.wrap(pn_work_head(self._impl))
2475 2476 @property
2477 - def error(self):
2478 return pn_error_code(pn_connection_error(self._impl))
2479
2480 - def free(self):
2481 pn_connection_release(self._impl)
2482
2483 -class SessionException(ProtonException):
2484 pass
2485
2486 -class Session(Wrapper, Endpoint):
2487 2488 @staticmethod
2489 - def wrap(impl):
2490 if impl is None: 2491 return None 2492 else: 2493 return Session(impl)
2494
2495 - def __init__(self, impl):
2496 Wrapper.__init__(self, impl, pn_session_attachments)
2497
2498 - def _get_attachments(self):
2499 return pn_session_attachments(self._impl)
2500
2501 - def _get_cond_impl(self):
2502 return pn_session_condition(self._impl)
2503
2504 - def _get_remote_cond_impl(self):
2505 return pn_session_remote_condition(self._impl)
2506
2507 - def _get_incoming_capacity(self):
2508 return pn_session_get_incoming_capacity(self._impl)
2509
2510 - def _set_incoming_capacity(self, capacity):
2511 pn_session_set_incoming_capacity(self._impl, capacity)
2512 2513 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2514 2515 @property
2516 - def outgoing_bytes(self):
2517 return pn_session_outgoing_bytes(self._impl)
2518 2519 @property
2520 - def incoming_bytes(self):
2521 return pn_session_incoming_bytes(self._impl)
2522
2523 - def open(self):
2524 pn_session_open(self._impl)
2525
2526 - def close(self):
2527 self._update_cond() 2528 pn_session_close(self._impl)
2529
2530 - def next(self, mask):
2531 return Session.wrap(pn_session_next(self._impl, mask))
2532 2533 @property
2534 - def state(self):
2535 return pn_session_state(self._impl)
2536 2537 @property
2538 - def connection(self):
2539 return Connection.wrap(pn_session_connection(self._impl))
2540
2541 - def sender(self, name):
2542 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2543
2544 - def receiver(self, name):
2545 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2546
2547 - def free(self):
2548 pn_session_free(self._impl)
2549
2550 -class LinkException(ProtonException):
2551 pass
2552 2734
2735 -class Terminus(object):
2736 2737 UNSPECIFIED = PN_UNSPECIFIED 2738 SOURCE = PN_SOURCE 2739 TARGET = PN_TARGET 2740 COORDINATOR = PN_COORDINATOR 2741 2742 NONDURABLE = PN_NONDURABLE 2743 CONFIGURATION = PN_CONFIGURATION 2744 DELIVERIES = PN_DELIVERIES 2745 2746 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2747 DIST_MODE_COPY = PN_DIST_MODE_COPY 2748 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2749
2750 - def __init__(self, impl):
2751 self._impl = impl
2752
2753 - def _check(self, err):
2754 if err < 0: 2755 exc = EXCEPTIONS.get(err, LinkException) 2756 raise exc("[%s]" % err) 2757 else: 2758 return err
2759
2760 - def _get_type(self):
2761 return pn_terminus_get_type(self._impl)
2762 - def _set_type(self, type):
2763 self._check(pn_terminus_set_type(self._impl, type))
2764 type = property(_get_type, _set_type) 2765
2766 - def _get_address(self):
2767 return utf82unicode(pn_terminus_get_address(self._impl))
2768 - def _set_address(self, address):
2769 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2770 address = property(_get_address, _set_address) 2771
2772 - def _get_durability(self):
2773 return pn_terminus_get_durability(self._impl)
2774 - def _set_durability(self, seconds):
2775 self._check(pn_terminus_set_durability(self._impl, seconds))
2776 durability = property(_get_durability, _set_durability) 2777
2778 - def _get_expiry_policy(self):
2779 return pn_terminus_get_expiry_policy(self._impl)
2780 - def _set_expiry_policy(self, seconds):
2781 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2782 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2783
2784 - def _get_timeout(self):
2785 return pn_terminus_get_timeout(self._impl)
2786 - def _set_timeout(self, seconds):
2787 self._check(pn_terminus_set_timeout(self._impl, seconds))
2788 timeout = property(_get_timeout, _set_timeout) 2789
2790 - def _is_dynamic(self):
2791 return pn_terminus_is_dynamic(self._impl)
2792 - def _set_dynamic(self, dynamic):
2793 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2794 dynamic = property(_is_dynamic, _set_dynamic) 2795
2796 - def _get_distribution_mode(self):
2797 return pn_terminus_get_distribution_mode(self._impl)
2798 - def _set_distribution_mode(self, mode):
2799 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2800 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2801 2802 @property
2803 - def properties(self):
2804 return Data(pn_terminus_properties(self._impl))
2805 2806 @property
2807 - def capabilities(self):
2808 return Data(pn_terminus_capabilities(self._impl))
2809 2810 @property
2811 - def outcomes(self):
2812 return Data(pn_terminus_outcomes(self._impl))
2813 2814 @property
2815 - def filter(self):
2816 return Data(pn_terminus_filter(self._impl))
2817
2818 - def copy(self, src):
2819 self._check(pn_terminus_copy(self._impl, src._impl))
2820
2821 -class Sender(Link):
2822 """ 2823 A link over which messages are sent. 2824 """ 2825
2826 - def offered(self, n):
2827 pn_link_offered(self._impl, n)
2828
2829 - def stream(self, bytes):
2830 """ 2831 Send specified bytes as part of the current delivery 2832 """ 2833 return self._check(pn_link_send(self._impl, bytes))
2834
2835 - def send(self, obj, tag=None):
2836 """ 2837 Send specified object over this sender; the object is expected to 2838 have a send() method on it that takes the sender and an optional 2839 tag as arguments. 2840 2841 Where the object is a Message, this will send the message over 2842 this link, creating a new delivery for the purpose. 2843 """ 2844 if hasattr(obj, 'send'): 2845 return obj.send(self, tag=tag) 2846 else: 2847 # treat object as bytes 2848 return self.stream(obj)
2849
2850 - def delivery_tag(self):
2851 if not hasattr(self, 'tag_generator'): 2852 def simple_tags(): 2853 count = 1 2854 while True: 2855 yield str(count) 2856 count += 1
2857 self.tag_generator = simple_tags() 2858 return self.tag_generator.next()
2859
2860 -class Receiver(Link):
2861 """ 2862 A link over which messages are received. 2863 """ 2864
2865 - def flow(self, n):
2866 """Increases the credit issued to the remote sender by the specified number of messages.""" 2867 pn_link_flow(self._impl, n)
2868
2869 - def recv(self, limit):
2870 n, bytes = pn_link_recv(self._impl, limit) 2871 if n == PN_EOS: 2872 return None 2873 else: 2874 self._check(n) 2875 return bytes
2876
2877 - def drain(self, n):
2878 pn_link_drain(self._impl, n)
2879
2880 - def draining(self):
2881 return pn_link_draining(self._impl)
2882
2883 -class NamedInt(int):
2884 2885 values = {} 2886
2887 - def __new__(cls, i, name):
2888 ni = super(NamedInt, cls).__new__(cls, i) 2889 cls.values[i] = ni 2890 return ni
2891
2892 - def __init__(self, i, name):
2893 self.name = name
2894
2895 - def __repr__(self):
2896 return self.name
2897
2898 - def __str__(self):
2899 return self.name
2900 2901 @classmethod
2902 - def get(cls, i):
2903 return cls.values.get(i, i)
2904
2905 -class DispositionType(NamedInt):
2906 values = {}
2907
2908 -class Disposition(object):
2909 2910 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 2911 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 2912 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 2913 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 2914 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 2915
2916 - def __init__(self, impl, local):
2917 self._impl = impl 2918 self.local = local 2919 self._data = None 2920 self._condition = None 2921 self._annotations = None
2922 2923 @property
2924 - def type(self):
2925 return DispositionType.get(pn_disposition_type(self._impl))
2926
2927 - def _get_section_number(self):
2928 return pn_disposition_get_section_number(self._impl)
2929 - def _set_section_number(self, n):
2930 pn_disposition_set_section_number(self._impl, n)
2931 section_number = property(_get_section_number, _set_section_number) 2932
2933 - def _get_section_offset(self):
2934 return pn_disposition_get_section_offset(self._impl)
2935 - def _set_section_offset(self, n):
2936 pn_disposition_set_section_offset(self._impl, n)
2937 section_offset = property(_get_section_offset, _set_section_offset) 2938
2939 - def _get_failed(self):
2940 return pn_disposition_is_failed(self._impl)
2941 - def _set_failed(self, b):
2942 pn_disposition_set_failed(self._impl, b)
2943 failed = property(_get_failed, _set_failed) 2944
2945 - def _get_undeliverable(self):
2946 return pn_disposition_is_undeliverable(self._impl)
2947 - def _set_undeliverable(self, b):
2948 pn_disposition_set_undeliverable(self._impl, b)
2949 undeliverable = property(_get_undeliverable, _set_undeliverable) 2950
2951 - def _get_data(self):
2952 if self.local: 2953 return self._data 2954 else: 2955 return dat2obj(pn_disposition_data(self._impl))
2956 - def _set_data(self, obj):
2957 if self.local: 2958 self._data = obj 2959 else: 2960 raise AttributeError("data attribute is read-only")
2961 data = property(_get_data, _set_data) 2962
2963 - def _get_annotations(self):
2964 if self.local: 2965 return self._annotations 2966 else: 2967 return dat2obj(pn_disposition_annotations(self._impl))
2968 - def _set_annotations(self, obj):
2969 if self.local: 2970 self._annotations = obj 2971 else: 2972 raise AttributeError("annotations attribute is read-only")
2973 annotations = property(_get_annotations, _set_annotations) 2974
2975 - def _get_condition(self):
2976 if self.local: 2977 return self._condition 2978 else: 2979 return cond2obj(pn_disposition_condition(self._impl))
2980 - def _set_condition(self, obj):
2981 if self.local: 2982 self._condition = obj 2983 else: 2984 raise AttributeError("condition attribute is read-only")
2985 condition = property(_get_condition, _set_condition)
2986
2987 -class Delivery(Wrapper):
2988 """ 2989 Tracks and/or records the delivery of a message over a link. 2990 """ 2991 2992 RECEIVED = Disposition.RECEIVED 2993 ACCEPTED = Disposition.ACCEPTED 2994 REJECTED = Disposition.REJECTED 2995 RELEASED = Disposition.RELEASED 2996 MODIFIED = Disposition.MODIFIED 2997 2998 @staticmethod
2999 - def wrap(impl):
3000 if impl is None: 3001 return None 3002 else: 3003 return Delivery(impl)
3004
3005 - def __init__(self, impl):
3006 Wrapper.__init__(self, impl, pn_delivery_attachments)
3007
3008 - def _init(self):
3009 self.local = Disposition(pn_delivery_local(self._impl), True) 3010 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3011 3012 @property
3013 - def tag(self):
3014 """The identifier for the delivery.""" 3015 return pn_delivery_tag(self._impl)
3016 3017 @property
3018 - def writable(self):
3019 """Returns true for an outgoing delivery to which data can now be written.""" 3020 return pn_delivery_writable(self._impl)
3021 3022 @property
3023 - def readable(self):
3024 """Returns true for an incoming delivery that has data to read.""" 3025 return pn_delivery_readable(self._impl)
3026 3027 @property
3028 - def updated(self):
3029 """Returns true if the state of the delivery has been updated 3030 (e.g. it has been settled and/or accepted, rejected etc).""" 3031 return pn_delivery_updated(self._impl)
3032
3033 - def update(self, state):
3034 """ 3035 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. 3036 """ 3037 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 3038 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 3039 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 3040 pn_delivery_update(self._impl, state)
3041 3042 @property
3043 - def pending(self):
3044 return pn_delivery_pending(self._impl)
3045 3046 @property
3047 - def partial(self):
3048 """ 3049 Returns true for an incoming delivery if not all the data is 3050 yet available. 3051 """ 3052 return pn_delivery_partial(self._impl)
3053 3054 @property
3055 - def local_state(self):
3056 """Returns the local state of the delivery.""" 3057 return DispositionType.get(pn_delivery_local_state(self._impl))
3058 3059 @property
3060 - def remote_state(self):
3061 """ 3062 Returns the state of the delivery as indicated by the remote 3063 peer. 3064 """ 3065 return DispositionType.get(pn_delivery_remote_state(self._impl))
3066 3067 @property
3068 - def settled(self):
3069 """ 3070 Returns true if the delivery has been settled by the remote peer. 3071 """ 3072 return pn_delivery_settled(self._impl)
3073
3074 - def settle(self):
3075 """ 3076 Settles the delivery locally. This indicates the aplication 3077 considers the delivery complete and does not wish to receive any 3078 further events about it. Every delivery should be settled locally. 3079 """ 3080 pn_delivery_settle(self._impl)
3081 3082 @property
3083 - def work_next(self):
3084 return Delivery.wrap(pn_work_next(self._impl))
3085 3086 @property 3092 3093 @property
3094 - def session(self):
3095 """ 3096 Returns the session over which the delivery was sent or received. 3097 """ 3098 return self.link.session
3099 3100 @property
3101 - def connection(self):
3102 """ 3103 Returns the connection over which the delivery was sent or received. 3104 """ 3105 return self.session.connection
3106 3107 @property
3108 - def transport(self):
3109 return self.connection.transport
3110
3111 -class TransportException(ProtonException):
3112 pass
3113
3114 -class Transport(Wrapper):
3115 3116 TRACE_OFF = PN_TRACE_OFF 3117 TRACE_DRV = PN_TRACE_DRV 3118 TRACE_FRM = PN_TRACE_FRM 3119 TRACE_RAW = PN_TRACE_RAW 3120 3121 CLIENT = 1 3122 SERVER = 2 3123 3124 @staticmethod
3125 - def wrap(impl):
3126 if impl is None: 3127 return None 3128 else: 3129 return Transport(_impl=impl)
3130
3131 - def __init__(self, mode=None, _impl = pn_transport):
3132 Wrapper.__init__(self, _impl, pn_transport_attachments) 3133 if mode == Transport.SERVER: 3134 pn_transport_set_server(self._impl) 3135 elif mode is None or mode==Transport.CLIENT: 3136 pass 3137 else: 3138 raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
3139
3140 - def _init(self):
3141 self._sasl = None 3142 self._ssl = None
3143
3144 - def _check(self, err):
3145 if err < 0: 3146 exc = EXCEPTIONS.get(err, TransportException) 3147 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) 3148 else: 3149 return err
3150
3151 - def bind(self, connection):
3152 """Assign a connection to the transport""" 3153 self._check(pn_transport_bind(self._impl, connection._impl))
3154
3155 - def unbind(self):
3156 """Release the connection""" 3157 self._check(pn_transport_unbind(self._impl))
3158
3159 - def trace(self, n):
3160 pn_transport_trace(self._impl, n)
3161
3162 - def tick(self, now):
3163 """Process any timed events (like heartbeat generation). 3164 now = seconds since epoch (float). 3165 """ 3166 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3167
3168 - def capacity(self):
3169 c = pn_transport_capacity(self._impl) 3170 if c >= PN_EOS: 3171 return c 3172 else: 3173 return self._check(c)
3174
3175 - def push(self, bytes):
3176 n = self._check(pn_transport_push(self._impl, bytes)) 3177 if n != len(bytes): 3178 raise OverflowError("unable to process all bytes")
3179
3180 - def close_tail(self):
3181 self._check(pn_transport_close_tail(self._impl))
3182
3183 - def pending(self):
3184 p = pn_transport_pending(self._impl) 3185 if p >= PN_EOS: 3186 return p 3187 else: 3188 return self._check(p)
3189
3190 - def peek(self, size):
3191 cd, out = pn_transport_peek(self._impl, size) 3192 if cd == PN_EOS: 3193 return None 3194 else: 3195 self._check(cd) 3196 return out
3197
3198 - def pop(self, size):
3199 pn_transport_pop(self._impl, size)
3200
3201 - def close_head(self):
3202 self._check(pn_transport_close_head(self._impl))
3203 3204 @property
3205 - def closed(self):
3206 return pn_transport_closed(self._impl)
3207 3208 # AMQP 1.0 max-frame-size
3209 - def _get_max_frame_size(self):
3210 return pn_transport_get_max_frame(self._impl)
3211
3212 - def _set_max_frame_size(self, value):
3213 pn_transport_set_max_frame(self._impl, value)
3214 3215 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3216 doc=""" 3217 Sets the maximum size for received frames (in bytes). 3218 """) 3219 3220 @property
3221 - def remote_max_frame_size(self):
3222 return pn_transport_get_remote_max_frame(self._impl)
3223
3224 - def _get_channel_max(self):
3225 return pn_transport_get_channel_max(self._impl)
3226
3227 - def _set_channel_max(self, value):
3228 pn_transport_set_channel_max(self._impl, value)
3229 3230 channel_max = property(_get_channel_max, _set_channel_max, 3231 doc=""" 3232 Sets the maximum channel that may be used on the transport. 3233 """) 3234 3235 @property
3236 - def remote_channel_max(self):
3237 return pn_transport_remote_channel_max(self._impl)
3238 3239 # AMQP 1.0 idle-time-out
3240 - def _get_idle_timeout(self):
3241 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3242
3243 - def _set_idle_timeout(self, sec):
3244 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3245 3246 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 3247 doc=""" 3248 The idle timeout of the connection (float, in seconds). 3249 """) 3250 3251 @property
3252 - def remote_idle_timeout(self):
3253 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3254 3255 @property
3256 - def frames_output(self):
3257 return pn_transport_get_frames_output(self._impl)
3258 3259 @property
3260 - def frames_input(self):
3261 return pn_transport_get_frames_input(self._impl)
3262
3263 - def sasl(self):
3264 return SASL(self)
3265
3266 - def ssl(self, domain=None, session_details=None):
3267 # SSL factory (singleton for this transport) 3268 if not self._ssl: 3269 self._ssl = SSL(self, domain, session_details) 3270 return self._ssl
3271 3272 @property
3273 - def condition(self):
3274 return cond2obj(pn_transport_condition(self._impl))
3275 3276 @property
3277 - def connection(self):
3278 return Connection.wrap(pn_transport_connection(self._impl))
3279
3280 -class SASLException(TransportException):
3281 pass
3282
3283 -class SASL(Wrapper):
3284 3285 OK = PN_SASL_OK 3286 AUTH = PN_SASL_AUTH 3287 SKIPPED = PN_SASL_SKIPPED 3288
3289 - def __init__(self, transport):
3290 Wrapper.__init__(self, transport._impl, pn_transport_attachments) 3291 self._sasl = pn_sasl(transport._impl)
3292
3293 - def _check(self, err):
3294 if err < 0: 3295 exc = EXCEPTIONS.get(err, SASLException) 3296 raise exc("[%s]" % (err)) 3297 else: 3298 return err
3299
3300 - def mechanisms(self, mechs):
3301 pn_sasl_mechanisms(self._sasl, mechs)
3302 3303 # @deprecated
3304 - def client(self):
3305 pn_sasl_client(self._sasl)
3306 3307 # @deprecated
3308 - def server(self):
3309 pn_sasl_server(self._sasl)
3310
3311 - def allow_skip(self, allow):
3312 pn_sasl_allow_skip(self._sasl, allow)
3313
3314 - def plain(self, user, password):
3315 pn_sasl_plain(self._sasl, user, password)
3316
3317 - def send(self, data):
3318 self._check(pn_sasl_send(self._sasl, data, len(data)))
3319
3320 - def recv(self):
3321 sz = 16 3322 while True: 3323 n, data = pn_sasl_recv(self._sasl, sz) 3324 if n == PN_OVERFLOW: 3325 sz *= 2 3326 continue 3327 elif n == PN_EOS: 3328 return None 3329 else: 3330 self._check(n) 3331 return data
3332 3333 @property
3334 - def outcome(self):
3335 outcome = pn_sasl_outcome(self._sasl) 3336 if outcome == PN_SASL_NONE: 3337 return None 3338 else: 3339 return outcome
3340
3341 - def done(self, outcome):
3342 pn_sasl_done(self._sasl, outcome)
3343 3344 STATE_IDLE = PN_SASL_IDLE 3345 STATE_STEP = PN_SASL_STEP 3346 STATE_PASS = PN_SASL_PASS 3347 STATE_FAIL = PN_SASL_FAIL 3348 3349 @property
3350 - def state(self):
3351 return pn_sasl_state(self._sasl)
3352
3353 3354 -class SSLException(TransportException):
3355 pass
3356
3357 -class SSLUnavailable(SSLException):
3358 pass
3359
3360 -class SSLDomain(object):
3361 3362 MODE_CLIENT = PN_SSL_MODE_CLIENT 3363 MODE_SERVER = PN_SSL_MODE_SERVER 3364 VERIFY_PEER = PN_SSL_VERIFY_PEER 3365 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 3366 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 3367
3368 - def __init__(self, mode):
3369 self._domain = pn_ssl_domain(mode) 3370 if self._domain is None: 3371 raise SSLUnavailable()
3372
3373 - def _check(self, err):
3374 if err < 0: 3375 exc = EXCEPTIONS.get(err, SSLException) 3376 raise exc("SSL failure.") 3377 else: 3378 return err
3379
3380 - def set_credentials(self, cert_file, key_file, password):
3381 return self._check( pn_ssl_domain_set_credentials(self._domain, 3382 cert_file, key_file, 3383 password) )
3384 - def set_trusted_ca_db(self, certificate_db):
3385 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 3386 certificate_db) )
3387 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3388 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 3389 verify_mode, 3390 trusted_CAs) )
3391
3392 - def allow_unsecured_client(self):
3393 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3394
3395 - def __del__(self):
3396 pn_ssl_domain_free(self._domain)
3397
3398 -class SSL(object):
3399 3400 @staticmethod
3401 - def present():
3402 return pn_ssl_present()
3403
3404 - def _check(self, err):
3405 if err < 0: 3406 exc = EXCEPTIONS.get(err, SSLException) 3407 raise exc("SSL failure.") 3408 else: 3409 return err
3410
3411 - def __new__(cls, transport, domain, session_details=None):
3412 """Enforce a singleton SSL object per Transport""" 3413 if transport._ssl: 3414 # unfortunately, we've combined the allocation and the configuration in a 3415 # single step. So catch any attempt by the application to provide what 3416 # may be a different configuration than the original (hack) 3417 ssl = transport._ssl 3418 if (domain and (ssl._domain is not domain) or 3419 session_details and (ssl._session_details is not session_details)): 3420 raise SSLException("Cannot re-configure existing SSL object!") 3421 else: 3422 obj = super(SSL, cls).__new__(cls) 3423 obj._domain = domain 3424 obj._session_details = session_details 3425 session_id = None 3426 if session_details: 3427 session_id = session_details.get_session_id() 3428 obj._ssl = pn_ssl( transport._impl ) 3429 if obj._ssl is None: 3430 raise SSLUnavailable() 3431 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3432 transport._ssl = obj 3433 return transport._ssl
3434
3435 - def cipher_name(self):
3436 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3437 if rc: 3438 return name 3439 return None
3440
3441 - def protocol_name(self):
3442 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3443 if rc: 3444 return name 3445 return None
3446 3447 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3448 RESUME_NEW = PN_SSL_RESUME_NEW 3449 RESUME_REUSED = PN_SSL_RESUME_REUSED 3450
3451 - def resume_status(self):
3452 return pn_ssl_resume_status( self._ssl )
3453
3454 - def _set_peer_hostname(self, hostname):
3455 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3456 - def _get_peer_hostname(self):
3457 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3458 self._check(err) 3459 return utf82unicode(name)
3460 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3461 doc=""" 3462 Manage the expected name of the remote peer. Used to authenticate the remote. 3463 """)
3464
3465 3466 -class SSLSessionDetails(object):
3467 """ Unique identifier for the SSL session. Used to resume previous session on a new 3468 SSL connection. 3469 """ 3470
3471 - def __init__(self, session_id):
3472 self._session_id = session_id
3473
3474 - def get_session_id(self):
3475 return self._session_id
3476 3477 3478 wrappers = { 3479 "pn_void": lambda x: pn_void2py(x), 3480 "pn_pyref": lambda x: pn_void2py(x), 3481 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)), 3482 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), 3483 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), 3484 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), 3485 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), 3486 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) 3487 }
3488 3489 -class Collector:
3490
3491 - def __init__(self):
3492 self._impl = pn_collector()
3493
3494 - def put(self, obj, etype):
3495 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3496
3497 - def peek(self):
3498 return Event.wrap(pn_collector_peek(self._impl))
3499
3500 - def pop(self):
3501 ev = self.peek() 3502 pn_collector_pop(self._impl)
3503
3504 - def __del__(self):
3505 pn_collector_free(self._impl) 3506 del self._impl
3507
3508 -class EventType(object):
3509 3510 _lock = threading.Lock() 3511 _extended = 10000 3512 TYPES = {} 3513
3514 - def __init__(self, name=None, number=None, method=None):
3515 if name is None and number is None: 3516 raise TypeError("extended events require a name") 3517 try: 3518 self._lock.acquire() 3519 if name is None: 3520 name = pn_event_type_name(number) 3521 3522 if number is None: 3523 number = EventType._extended 3524 EventType._extended += 1 3525 3526 if method is None: 3527 method = "on_%s" % name 3528 3529 self.name = name 3530 self.number = number 3531 self.method = method 3532 3533 self.TYPES[number] = self 3534 finally: 3535 self._lock.release()
3536
3537 - def __repr__(self):
3538 return self.name
3539
3540 -def dispatch(handler, method, *args):
3541 m = getattr(handler, method, None) 3542 if m: 3543 return m(*args) 3544 elif hasattr(handler, "on_unhandled"): 3545 return handler.on_unhandled(method, *args)
3546
3547 -class EventBase(object):
3548
3549 - def __init__(self, clazz, context, type):
3550 self.clazz = clazz 3551 self.context = context 3552 self.type = type
3553
3554 - def dispatch(self, handler):
3555 return dispatch(handler, self.type.method, self)
3556
3557 -def _none(x): return None
3558 3559 DELEGATED = Constant("DELEGATED")
3560 3561 -def _core(number, method):
3562 return EventType(number=number, method=method)
3563
3564 -class Event(Wrapper, EventBase):
3565 3566 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") 3567 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") 3568 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") 3569 3570 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") 3571 3572 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") 3573 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound") 3574 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound") 3575 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") 3576 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") 3577 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") 3578 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") 3579 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final") 3580 3581 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init") 3582 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open") 3583 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") 3584 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") 3585 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") 3586 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final") 3587 3588 LINK_INIT = _core(PN_LINK_INIT, "on_link_init") 3589 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open") 3590 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close") 3591 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach") 3592 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open") 3593 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") 3594 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") 3595 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow") 3596 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final") 3597 3598 DELIVERY = _core(PN_DELIVERY, "on_delivery") 3599 3600 TRANSPORT = _core(PN_TRANSPORT, "on_transport") 3601 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error") 3602 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") 3603 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") 3604 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") 3605 3606 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") 3607 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") 3608 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") 3609 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") 3610 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") 3611 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") 3612 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") 3613 3614 @staticmethod
3615 - def wrap(impl, number=None):
3616 if impl is None: 3617 return None 3618 3619 if number is None: 3620 number = pn_event_type(impl) 3621 3622 event = Event(impl, number) 3623 3624 if isinstance(event.context, EventBase): 3625 return event.context 3626 else: 3627 return event
3628
3629 - def __init__(self, impl, number):
3630 Wrapper.__init__(self, impl, pn_event_attachments) 3631 self.__dict__["type"] = EventType.TYPES[number]
3632
3633 - def _init(self):
3634 pass
3635 3636 @property
3637 - def clazz(self):
3638 cls = pn_event_class(self._impl) 3639 if cls: 3640 return pn_class_name(cls) 3641 else: 3642 return None
3643 3644 @property
3645 - def context(self):
3646 """Returns the context object associated with the event. The type of this depend on the type of event.""" 3647 return wrappers[self.clazz](pn_event_context(self._impl))
3648
3649 - def dispatch(self, handler, type=None):
3650 type = type or self.type 3651 if isinstance(handler, WrappedHandler): 3652 pn_handler_dispatch(handler._impl, self._impl, type.number) 3653 else: 3654 result = dispatch(handler, type.method, self) 3655 if result != DELEGATED and hasattr(handler, "handlers"): 3656 for h in handler.handlers: 3657 self.dispatch(h, type)
3658 3659 3660 @property
3661 - def reactor(self):
3662 """Returns the reactor associated with the event.""" 3663 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3664 3665 @property
3666 - def transport(self):
3667 """Returns the transport associated with the event, or null if none is associated with it.""" 3668 return Transport.wrap(pn_event_transport(self._impl))
3669 3670 @property
3671 - def connection(self):
3672 """Returns the connection associated with the event, or null if none is associated with it.""" 3673 return Connection.wrap(pn_event_connection(self._impl))
3674 3675 @property
3676 - def session(self):
3677 """Returns the session associated with the event, or null if none is associated with it.""" 3678 return Session.wrap(pn_event_session(self._impl))
3679 3680 @property 3684 3685 @property
3686 - def sender(self):
3687 """Returns the sender link associated with the event, or null if 3688 none is associated with it. This is essentially an alias for 3689 link(), that does an additional checkon the type of the 3690 link.""" 3691 l = self.link 3692 if l and l.is_sender: 3693 return l 3694 else: 3695 return None
3696 3697 @property
3698 - def receiver(self):
3699 """Returns the receiver link associated with the event, or null if 3700 none is associated with it. This is essentially an alias for 3701 link(), that does an additional checkon the type of the link.""" 3702 l = self.link 3703 if l and l.is_receiver: 3704 return l 3705 else: 3706 return None
3707 3708 @property
3709 - def delivery(self):
3710 """Returns the delivery associated with the event, or null if none is associated with it.""" 3711 return Delivery.wrap(pn_event_delivery(self._impl))
3712
3713 - def __repr__(self):
3714 return "%s(%s)" % (self.type, self.context)
3715
3716 -class Handler(object):
3717
3718 - def on_unhandled(self, method, *args):
3719 pass
3720
3721 -class _cadapter:
3722
3723 - def __init__(self, handler, on_error=None):
3724 self.handler = handler 3725 self.on_error = on_error
3726
3727 - def dispatch(self, cevent, ctype):
3728 ev = Event.wrap(cevent, ctype) 3729 ev.dispatch(self.handler)
3730
3731 - def exception(self, exc, val, tb):
3732 if self.on_error is None: 3733 raise exc, val, tb 3734 else: 3735 self.on_error((exc, val, tb))
3736
3737 -class WrappedHandler(Wrapper):
3738 3739 @staticmethod
3740 - def wrap(impl, on_error=None):
3741 if impl is None: 3742 return None 3743 else: 3744 handler = WrappedHandler(impl) 3745 handler.__dict__["on_error"] = on_error 3746 return handler
3747
3748 - def __init__(self, impl_or_constructor):
3749 Wrapper.__init__(self, impl_or_constructor)
3750
3751 - def _on_error(self, info):
3752 on_error = getattr(self, "on_error", None) 3753 if on_error is None: 3754 raise info[0], info[1], info[2] 3755 else: 3756 on_error(info)
3757
3758 - def add(self, handler):
3759 if handler is None: return 3760 impl = _chandler(handler, self._on_error) 3761 pn_handler_add(self._impl, impl) 3762 pn_decref(impl)
3763
3764 - def clear(self):
3765 pn_handler_clear(self._impl)
3766
3767 -def _chandler(obj, on_error=None):
3768 if obj is None: 3769 return None 3770 elif isinstance(obj, WrappedHandler): 3771 impl = obj._impl 3772 pn_incref(impl) 3773 return impl 3774 else: 3775 return pn_pyhandler(_cadapter(obj, on_error))
3776
3777 -class Url(object):
3778 """ 3779 Simple URL parser/constructor, handles URLs of the form: 3780 3781 <scheme>://<user>:<password>@<host>:<port>/<path> 3782 3783 All components can be None if not specifeid in the URL string. 3784 3785 The port can be specified as a service name, e.g. 'amqp' in the 3786 URL string but Url.port always gives the integer value. 3787 3788 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' 3789 @ivar user: Username 3790 @ivar password: Password 3791 @ivar host: Host name, ipv6 literal or ipv4 dotted quad. 3792 @ivar port: Integer port. 3793 @ivar host_port: Returns host:port 3794 """ 3795 3796 AMQPS = "amqps" 3797 AMQP = "amqp" 3798
3799 - class Port(int):
3800 """An integer port number that can be constructed from a service name string""" 3801
3802 - def __new__(cls, value):
3803 """@param value: integer port number or string service name.""" 3804 port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) 3805 setattr(port, 'name', str(value)) 3806 return port
3807
3808 - def __eq__(self, x): return str(self) == x or int(self) == x
3809 - def __ne__(self, x): return not self == x
3810 - def __str__(self): return str(self.name)
3811 3812 @staticmethod
3813 - def _port_int(value):
3814 """Convert service, an integer or a service name, into an integer port number.""" 3815 try: 3816 return int(value) 3817 except ValueError: 3818 try: 3819 return socket.getservbyname(value) 3820 except socket.error: 3821 # Not every system has amqp/amqps defined as a service 3822 if value == Url.AMQPS: return 5671 3823 elif value == Url.AMQP: return 5672 3824 else: 3825 raise ValueError("Not a valid port number or service name: '%s'" % value)
3826
3827 - def __init__(self, url=None, defaults=True, **kwargs):
3828 """ 3829 @param url: URL string to parse. 3830 @param defaults: If true, fill in missing default values in the URL. 3831 If false, you can fill them in later by calling self.defaults() 3832 @param kwargs: scheme, user, password, host, port, path. 3833 If specified, replaces corresponding part in url string. 3834 """ 3835 if url: 3836 self._url = pn_url_parse(str(url)) 3837 if not self._url: raise ValueError("Invalid URL '%s'" % url) 3838 else: 3839 self._url = pn_url() 3840 for k in kwargs: # Let kwargs override values parsed from url 3841 getattr(self, k) # Check for invalid kwargs 3842 setattr(self, k, kwargs[k]) 3843 if defaults: self.defaults()
3844
3845 - class PartDescriptor(object):
3846 - def __init__(self, part):
3847 self.getter = globals()["pn_url_get_%s" % part] 3848 self.setter = globals()["pn_url_set_%s" % part]
3849 - def __get__(self, obj, type=None): return self.getter(obj._url)
3850 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3851 3852 scheme = PartDescriptor('scheme') 3853 username = PartDescriptor('username') 3854 password = PartDescriptor('password') 3855 host = PartDescriptor('host') 3856 path = PartDescriptor('path') 3857
3858 - def _get_port(self):
3859 portstr = pn_url_get_port(self._url) 3860 return portstr and Url.Port(portstr)
3861
3862 - def _set_port(self, value):
3863 if value is None: pn_url_set_port(self._url, None) 3864 else: pn_url_set_port(self._url, str(Url.Port(value)))
3865 3866 port = property(_get_port, _set_port) 3867
3868 - def __str__(self): return pn_url_str(self._url)
3869
3870 - def __repr__(self): return "Url(%r)" % str(self)
3871
3872 - def __eq__(self, x): return str(self) == str(x)
3873 - def __ne__(self, x): return not self == x
3874
3875 - def __del__(self):
3876 pn_url_free(self._url); 3877 del self._url
3878
3879 - def defaults(self):
3880 """ 3881 Fill in missing values (scheme, host or port) with defaults 3882 @return: self 3883 """ 3884 self.scheme = self.scheme or self.AMQP 3885 self.host = self.host or '0.0.0.0' 3886 self.port = self.port or self.Port(self.scheme) 3887 return self
3888 3889 __all__ = [ 3890 "API_LANGUAGE", 3891 "IMPLEMENTATION_LANGUAGE", 3892 "ABORTED", 3893 "ACCEPTED", 3894 "AUTOMATIC", 3895 "PENDING", 3896 "MANUAL", 3897 "REJECTED", 3898 "RELEASED", 3899 "MODIFIED", 3900 "SETTLED", 3901 "UNDESCRIBED", 3902 "Array", 3903 "Collector", 3904 "Condition", 3905 "Connection", 3906 "Data", 3907 "Delivery", 3908 "Disposition", 3909 "Described", 3910 "Endpoint", 3911 "Event", 3912 "Handler", 3913 "Link", 3914 "Message", 3915 "MessageException", 3916 "Messenger", 3917 "MessengerException", 3918 "ProtonException", 3919 "VERSION_MAJOR", 3920 "VERSION_MINOR", 3921 "Receiver", 3922 "SASL", 3923 "Sender", 3924 "Session", 3925 "SSL", 3926 "SSLDomain", 3927 "SSLSessionDetails", 3928 "SSLUnavailable", 3929 "SSLException", 3930 "Terminus", 3931 "Timeout", 3932 "Interrupt", 3933 "Transport", 3934 "TransportException", 3935 "Url", 3936 "char", 3937 "dispatch", 3938 "symbol", 3939 "timestamp", 3940 "ulong" 3941 ] 3942